Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4483] fix checkstyle in integ-test module #6523

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion hudi-integ-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@
<dockerCompose.envFile>${project.basedir}/compose_env</dockerCompose.envFile>
<dockerCompose.file>${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml</dockerCompose.file>
<docker.compose.skip>${skipITs}</docker.compose.skip>
<checkstyle.skip>true</checkstyle.skip>
<main.basedir>${project.parent.basedir}</main.basedir>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.utilities.schema.SchemaProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.hudi.integ.testsuite;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -31,12 +29,7 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -46,7 +39,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -134,7 +133,7 @@ public static void main(String[] args) throws Exception {
AtomicBoolean jobFailed = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
List<Long> waitTimes = new ArrayList<>();
for (int i = 0;i < jobIndex ;i++) {
for (int i = 0; i < jobIndex; i++) {
if (i == 0) {
waitTimes.add(0L);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) {

public abstract RDD<GenericRecord> getNextBatch() throws Exception;

public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception ;
public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception;

public abstract Option<String> startCommit();

Expand All @@ -132,7 +132,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) {

public abstract JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception;

public abstract void inlineClustering() throws Exception ;
public abstract void inlineClustering() throws Exception;

public abstract Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.integ.testsuite.SparkDataSourceContinuousIngest;
import org.apache.hudi.utilities.HoodieRepairTool;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInput
SerializableConfiguration configuration,
String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize,
int inputParallelism, boolean deleteOldInputData, boolean useHudiToGenerateUpdates) {
super(deltaOutputMode, deltaInputType, configuration);
super(deltaOutputMode, deltaInputType, configuration);
this.deltaBasePath = deltaBasePath;
this.schemaStr = schemaStr;
this.maxFileSize = maxFileSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ private static DagNode convertJsonToDagNode(JsonNode node, String type, String n
DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node))
.withName(name).build();
return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
}
catch (ClassNotFoundException e) {
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -231,8 +230,7 @@ private static List<Pair<String, Integer>> getQueries(Entry<String, JsonNode> en
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
queries = (List<Pair<String, Integer>>) getQueryMapper().readValue(flattened.toString(), List.class);
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
}
return queries;
Expand All @@ -244,8 +242,7 @@ private static List<String> getQuerySessionProperties(Entry<String, JsonNode> en
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
properties = (List<String>) getQueryEnginePropertyMapper().readValue(flattened.toString(), List.class);
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
}
return properties;
Expand All @@ -254,20 +251,15 @@ private static List<String> getQuerySessionProperties(Entry<String, JsonNode> en
private static Object getValue(JsonNode node) {
if (node.isInt()) {
return node.asInt();
}
else if (node.isLong()) {
} else if (node.isLong()) {
return node.asLong();
}
else if (node.isShort()) {
} else if (node.isShort()) {
return node.asInt();
}
else if (node.isBoolean()) {
} else if (node.isBoolean()) {
return node.asBoolean();
}
else if (node.isDouble()) {
} else if (node.isDouble()) {
return node.asDouble();
}
else if (node.isFloat()) {
} else if (node.isFloat()) {
return node.asDouble();
}
return node.textValue();
Expand All @@ -287,6 +279,7 @@ private static JsonNode createJsonNode(DagNode node, String type) throws IOExcep
case HIVE_PROPERTIES:
((ObjectNode) configNode).put(HIVE_PROPERTIES,
MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
break;
case PRESTO_QUERIES:
((ObjectNode) configNode).put(PRESTO_QUERIES,
MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
Expand Down Expand Up @@ -376,8 +369,7 @@ public List deserialize(JsonParser parser, DeserializationContext context) throw

if (fieldName.contains("query")) {
query = parser.getValueAsString();
}
else if (fieldName.contains("result")) {
} else if (fieldName.contains("result")) {
result = parser.getValueAsInt();
pairs.add(Pair.of(query, result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public void executeAndValidateQueries(List<Pair<String, Integer>> queriesWithRes
if (!res.next()) {
log.info("res.next() was False - typically this means the query returned no rows.");
assert 0 == queryAndResult.getRight();
}
else {
} else {
Integer result = res.getInt(1);
if (!queryAndResult.getRight().equals(result)) {
throw new AssertionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public abstract Dataset<Row> getDatasetToValidate(SparkSession session, Executio
public void execute(ExecutionContext context, int curItrCount) throws Exception {
int validateOnceEveryItr = config.validateOnceEveryIteration();
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
(itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount)
|| (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
FileSystem fs = new Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) {
Expand Down Expand Up @@ -142,8 +142,8 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
session.sql("REFRESH TABLE " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
"test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, "
+ "test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
"_hoodie_is_deleted", "test_suite_source_ordering_field");

Expand Down Expand Up @@ -178,9 +178,9 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu
FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
List<FileStatus> subDirList = Arrays.asList(subDirs);
subDirList.sort(Comparator.comparingLong(entry -> Long.parseLong(entry.getPath().getName())));
String latestSubDir = subDirList.get(subDirList.size() -1).getPath().getName();
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
String latestSubDir = subDirList.get(subDirList.size() - 1).getPath().getName();
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs();
long waitedSoFar = 0;
while (!(latestCheckpoint.isPresent() && latestCheckpoint.get().equals(latestSubDir))) {
Expand All @@ -191,11 +191,11 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu
latestCheckpoint = getLatestCheckpoint(commitTimeline);
waitedSoFar += 20000;
if (waitedSoFar >= maxWaitTime) {
throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir);
throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir);
}
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
}
}

Expand Down Expand Up @@ -223,7 +223,7 @@ private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session,
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
Dataset<Row> trimmedDf = inputDf;
if (!config.inputPartitonsToSkipWithValidate().isEmpty()) {
trimmedDf = inputDf.filter("instr("+partitionPathField+", \'"+ config.inputPartitonsToSkipWithValidate() +"\') != 1");
trimmedDf = inputDf.filter("instr(" + partitionPathField + ", \'" + config.inputPartitonsToSkipWithValidate() + "\') != 1");
}

ExpressionEncoder encoder = getEncoder(inputDf.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E
executeAndValidateQueries(this.config.getHiveQueries(), stmt);
stmt.close();
this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
}
catch (Exception e) {
} catch (Exception e) {
throw new HoodieValidationException("Hive query validation failed due to " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
*/
public class HiveSyncNode extends DagNode<Boolean> {


public HiveSyncNode(Config config) {
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
setSessionProperties(this.config.getPrestoProperties(), stmt);
executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
stmt.close();
}
catch (Exception e) {
} catch (Exception e) {
throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.sql.DriverManager;
import java.sql.Statement;

public class TrinoQueryNode extends BaseQueryNode{
public class TrinoQueryNode extends BaseQueryNode {

public TrinoQueryNode(DeltaConfig.Config config) {
this.config = config;
Expand All @@ -52,8 +52,7 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
setSessionProperties(this.config.getTrinoProperties(), stmt);
executeAndValidateQueries(this.config.getTrinoQueries(), stmt);
stmt.close();
}
catch (Exception e) {
} catch (Exception e) {
throw new HoodieValidationException("Trino query validation failed due to " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
Expand All @@ -36,14 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Node to validate data set sanity like total file versions retained, has cleaning happened, has archival happened, etc.
Expand Down Expand Up @@ -77,8 +70,8 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E
log.warn("Earliest commit to retain : " + earliestCommitToRetain);
long unCleanedInstants = metaClient.getActiveTimeline().filterCompletedInstants().filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain)).getInstants().count();
ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants +
" mismatched with max commits retained " + (maxCommitsRetained + 1));
ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants
+ " mismatched with max commits retained " + (maxCommitsRetained + 1));
}

if (config.validateArchival() || config.validateClean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.hudi.metrics.Metrics;

import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.integ.testsuite.generator;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -331,12 +330,13 @@ public boolean validate(GenericRecord record) {
return genericData.validate(baseSchema, record);
}

/*
/**
* Generates a sequential timestamp (daily increment), and updates the timestamp field of the record.
* Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism,
* to guarantee that at least numDatePartitions are created.
*
* @VisibleForTesting
*/
@VisibleForTesting
public GenericRecord updateTimestamp(GenericRecord record, String fieldName) {
long delta = TimeUnit.SECONDS.convert((partitionIndex++ % numDatePartitions) + startPartition, TimeUnit.DAYS);
record.put(fieldName, delta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import java.util.Arrays;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.PathFilter;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.writer.AvroFileDeltaInputWriter;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

/**
* A reader of {@link DeltaOutputMode#DFS} and {@link DeltaInputType#AVRO}.
Expand All @@ -46,7 +47,8 @@ public class DFSAvroDeltaInputReader extends DFSDeltaInputReader {
}
};

public DFSAvroDeltaInputReader(SparkSession sparkSession, String schemaStr, String basePath,
public DFSAvroDeltaInputReader(
SparkSession sparkSession, String schemaStr, String basePath,
Option<String> structName,
Option<String> nameSpace) {
this.sparkSession = sparkSession;
Expand Down