Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-738] Add validation to DeltaStreamer when filtetDupes is enabled on UPSERT mode #1505

Merged
merged 1 commit into from Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -173,9 +173,6 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, Sche
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));

this.hiveConf = hiveConf;
if (cfg.filterDupes) {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
}

// If schemaRegistry already resolved, setup write-client
setupWriteClient();
Expand Down Expand Up @@ -348,8 +345,6 @@ private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpo
Option<String> scheduledCompactionInstant = Option.empty();
// filter dupes if needed
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig());
}

Expand Down
Expand Up @@ -388,17 +388,15 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveCo
tableType = HoodieTableType.valueOf(cfg.tableType);
}

ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");

this.props = properties != null ? properties : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);

if (cfg.filterDupes) {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
}

deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf,
this::onInitializingWriteClient);

}

public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf)
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
Expand Down Expand Up @@ -66,6 +67,8 @@ public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) throw
this.jssc = jssc;
String commonPropsFile = config.propsFilePath;
String configFolder = config.configFolder;
ValidationUtils.checkArgument(!config.filterDupes || config.operation != HoodieDeltaStreamer.Operation.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
Expand Down
Expand Up @@ -661,7 +661,7 @@ public void testFilterDupes() throws Exception {
// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
cfg.operation = Operation.INSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
Expand All @@ -674,7 +674,7 @@ public void testFilterDupes() throws Exception {
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
cfg2.filterDupes = true;
cfg2.filterDupes = false;
cfg2.sourceLimit = 2000;
cfg2.operation = Operation.UPSERT;
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
Expand All @@ -690,6 +690,16 @@ public void testFilterDupes() throws Exception {
.fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
System.out.println("New Commit Metadata=" + commitMetadata);
assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());

// Try UPSERT with filterDupes true. Expect exception
cfg2.filterDupes = true;
cfg2.operation = Operation.UPSERT;
try {
new HoodieDeltaStreamer(cfg2, jsc).sync();
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
}

}

@Test
Expand Down