Skip to content

Commit

Permalink
[HUDI - 738] Add validation to DeltaStreamer to fail fast when filter…
Browse files Browse the repository at this point in the history
…Dupes is enabled on UPSERT mode. (#1505)

Summary:
This fix ensures for UPSERT operation, '--filter-dupes' is disabled and fails fast if not. Otherwise it would drop all updates silently and only take in new records.
  • Loading branch information
Bhavani Sudha Saktheeswaran committed Apr 10, 2020
1 parent f5f34bb commit 8c7cef3
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 12 deletions.
Expand Up @@ -173,9 +173,6 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, Sche
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));

this.hiveConf = hiveConf;
if (cfg.filterDupes) {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
}

// If schemaRegistry already resolved, setup write-client
setupWriteClient();
Expand Down Expand Up @@ -348,8 +345,6 @@ private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpo
Option<String> scheduledCompactionInstant = Option.empty();
// filter dupes if needed
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig());
}

Expand Down
Expand Up @@ -388,17 +388,15 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveCo
tableType = HoodieTableType.valueOf(cfg.tableType);
}

ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");

this.props = properties != null ? properties : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);

if (cfg.filterDupes) {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
}

deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf,
this::onInitializingWriteClient);

}

public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf)
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
Expand Down Expand Up @@ -66,6 +67,8 @@ public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) throw
this.jssc = jssc;
String commonPropsFile = config.propsFilePath;
String configFolder = config.configFolder;
ValidationUtils.checkArgument(!config.filterDupes || config.operation != HoodieDeltaStreamer.Operation.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
Expand Down
Expand Up @@ -661,7 +661,7 @@ public void testFilterDupes() throws Exception {
// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
cfg.operation = Operation.INSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
Expand All @@ -674,7 +674,7 @@ public void testFilterDupes() throws Exception {
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
cfg2.filterDupes = true;
cfg2.filterDupes = false;
cfg2.sourceLimit = 2000;
cfg2.operation = Operation.UPSERT;
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
Expand All @@ -690,6 +690,16 @@ public void testFilterDupes() throws Exception {
.fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
System.out.println("New Commit Metadata=" + commitMetadata);
assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());

// Try UPSERT with filterDupes true. Expect exception
cfg2.filterDupes = true;
cfg2.operation = Operation.UPSERT;
try {
new HoodieDeltaStreamer(cfg2, jsc).sync();
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
}

}

@Test
Expand Down

0 comments on commit 8c7cef3

Please sign in to comment.