From 8c7cef3e50471d3645b207d97d515107764688c9 Mon Sep 17 00:00:00 2001 From: Bhavani Sudha Saktheeswaran Date: Fri, 10 Apr 2020 08:58:55 -0700 Subject: [PATCH] [HUDI - 738] Add validation to DeltaStreamer to fail fast when filterDupes is enabled on UPSERT mode. (#1505) Summary: This fix ensures for UPSERT operation, '--filter-dupes' is disabled and fails fast if not. Otherwise it would drop all updates silently and only take in new records. --- .../hudi/utilities/deltastreamer/DeltaSync.java | 5 ----- .../deltastreamer/HoodieDeltaStreamer.java | 8 +++----- .../HoodieMultiTableDeltaStreamer.java | 3 +++ .../hudi/utilities/TestHoodieDeltaStreamer.java | 14 ++++++++++++-- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c964c919ecbf..7ec7303946d6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -173,9 +173,6 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, Sche UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); this.hiveConf = hiveConf; - if (cfg.filterDupes) { - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - } // If schemaRegistry already resolved, setup write-client setupWriteClient(); @@ -348,8 +345,6 @@ private Option writeToSink(JavaRDD records, String checkpo Option scheduledCompactionInstant = Option.empty(); // filter dupes if needed if (cfg.filterDupes) { - // turn upserts to insert - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 836847815213..0325eaf48b34 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -388,17 +388,15 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveCo tableType = HoodieTableType.valueOf(cfg.tableType); } + ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT, + "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); + this.props = properties != null ? properties : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); LOG.info("Creating delta streamer with configs : " + props.toString()); this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); - if (cfg.filterDupes) { - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - } - deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf, this::onInitializingWriteClient); - } public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 74455f28c56f..d9c8f83ea493 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaRegistryProvider; @@ -66,6 +67,8 @@ public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) throw this.jssc = jssc; String commonPropsFile = config.propsFilePath; String configFolder = config.configFolder; + ValidationUtils.checkArgument(!config.filterDupes || config.operation != HoodieDeltaStreamer.Operation.UPSERT, + "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 7edf5345cc2e..8b661e729e12 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -661,7 +661,7 @@ public void testFilterDupes() throws Exception { // Generate the same 1000 records + 1000 new ones for upsert cfg.filterDupes = true; cfg.sourceLimit = 2000; - cfg.operation = Operation.UPSERT; + cfg.operation = Operation.INSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); @@ -674,7 +674,7 @@ public void testFilterDupes() throws Exception { HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true); HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT); - cfg2.filterDupes = true; + cfg2.filterDupes = false; cfg2.sourceLimit = 2000; cfg2.operation = Operation.UPSERT; cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); @@ -690,6 +690,16 @@ public void testFilterDupes() throws Exception { .fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class); System.out.println("New Commit Metadata=" + commitMetadata); assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty()); + + // Try UPSERT with filterDupes true. Expect exception + cfg2.filterDupes = true; + cfg2.operation = Operation.UPSERT; + try { + new HoodieDeltaStreamer(cfg2, jsc).sync(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.")); + } + } @Test