From 94e93a427d1dd23fb2588d4ee28a109bacaeb043 Mon Sep 17 00:00:00 2001 From: cshuo Date: Wed, 15 Jan 2025 09:29:35 +0800 Subject: [PATCH] update --- .../org/apache/hudi/DataSourceOptions.scala | 14 ++ .../MergeOnReadIncrementalRelationV2.scala | 4 + .../hudi/functional/TestStreamingSource.scala | 122 +++++++++++++----- 3 files changed, 106 insertions(+), 34 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index c4b419b52858..08bcf9731ea8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -151,6 +151,20 @@ object DataSourceReadOptions { .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + "instead of the full table. This option allows using glob pattern to directly filter on path.") + val INCREMENTAL_READ_SKIP_COMPACT: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.datasource.read.incr.skip_compact") + .defaultValue(false) + .markAdvanced() + .withDocumentation("Whether to skip compaction instants and avoid reading compacted base files for streaming " + + "read to improve read performance.") + + val INCREMENTAL_READ_SKIP_CLUSTER: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.datasource.read.incr.skip_cluster") + .defaultValue(false) + .markAdvanced() + .withDocumentation("Whether to skip clustering instants to avoid reading base files of clustering operations " + + "for streaming read to improve read performance.") + val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala index 6222cd217f93..c74fbbdc6724 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala @@ -200,6 +200,10 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation { .metaClient(metaClient) .startCompletionTime(optParams(DataSourceReadOptions.START_COMMIT.key)) .endCompletionTime(optParams.getOrElse(DataSourceReadOptions.END_COMMIT.key, null)) + .skipClustering(optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.key(), + String.valueOf(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.defaultValue)).toBoolean) + .skipCompaction(optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SKIP_COMPACT.key(), + String.valueOf(DataSourceReadOptions.INCREMENTAL_READ_SKIP_COMPACT.defaultValue)).toBoolean) .rangeType(rangeType) .build() .analyze() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala index aeef270ad764..f1ff5570d2f1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala @@ -17,12 +17,13 @@ package org.apache.hudi.functional +import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.DataSourceReadOptions.{START_OFFSET, STREAMING_READ_TABLE_VERSION} import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion} import org.apache.hudi.common.table.timeline.HoodieTimeline -import org.apache.hudi.config.HoodieCompactionConfig +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig} import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE, INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE, WRITE_TABLE_VERSION} import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.util.JavaConversions @@ -210,41 +211,89 @@ class TestStreamingSource extends StreamTest { } } - test("test mor stream source with compaction") { - withTempDir { inputDir => - val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream" - val metaClient = HoodieTableMetaClient.newTableBuilder() - .setTableType(MERGE_ON_READ) - .setTableName(getTableName(tablePath)) - .setRecordKeyFields("id") - .setPreCombineField("ts") - .initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), tablePath) + test("Test mor streaming source with clustering") { + Array("true", "false").foreach(skipCluster => { + withTempDir { inputDir => + val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream_cluster" + val metaClient = HoodieTableMetaClient.newTableBuilder() + .setTableType(MERGE_ON_READ) + .setTableName(getTableName(tablePath)) + .setRecordKeyFields("id") + .setPreCombineField("ts") + .initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), tablePath) + + addData(tablePath, Seq(("1", "a1", "10", "000"))) + addData(tablePath, Seq(("2", "a1", "11", "001"))) + addData(tablePath, Seq(("3", "a1", "12", "002"))) + addData(tablePath, Seq(("4", "a1", "13", "003")), enableInlineCluster = true) + addData(tablePath, Seq(("5", "a1", "14", "004"))) + + val timestamp = + metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants() + .firstInstant().get().getCompletionTime - addData(tablePath, Seq(("1", "a1", "10", "000"))) - val df = spark.readStream - .format("org.apache.hudi") - .load(tablePath) - .select("id", "name", "price", "ts") + val df = spark.readStream + .format("org.apache.hudi") + .option(START_OFFSET.key(), timestamp) + .option(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.key(), skipCluster) + .load(tablePath) + .select("id", "name", "price", "ts") - addData(tablePath, - Seq(("1", "a2", "12", "000"), - ("2", "a3", "12", "000"))) - addData(tablePath, Seq(("2", "a5", "12", "000"), ("1", "a6", "12", "001"))) - // trigger compaction - addData(tablePath, Seq(("3", "a6", "12", "000")), true) + testStream(df)( + AssertOnQuery { q => q.processAllAvailable(); true }, + // Start after the first commit + CheckAnswerRows(Seq( + Row("2", "a1", "11", "001"), + Row("3", "a1", "12", "002"), + Row("4", "a1", "13", "003"), + Row("5", "a1", "14", "004")), lastOnly = true, isSorted = false) + ) + assertTrue(metaClient.reloadActiveTimeline + .filter(JavaConversions.getPredicate( + e => e.isCompleted && HoodieTimeline.REPLACE_COMMIT_ACTION.equals(e.getAction))) + .countInstants() > 0) + } + }) + } - testStream(df)( - AssertOnQuery {q => q.processAllAvailable(); true }, - CheckAnswerRows(Seq(Row("1", "a6", "12", "001"), - Row("2", "a5", "12", "000"), - Row("3", "a6", "12", "000")), lastOnly = true, isSorted = false), - StopStream - ) - assertTrue(metaClient.reloadActiveTimeline - .filter(JavaConversions.getPredicate( - e => e.isCompleted && HoodieTimeline.COMMIT_ACTION.equals(e.getAction))) - .countInstants() > 0) - } + test("test mor stream source with compaction") { + Array("true", "false").foreach(skipCompact => { + withTempDir { inputDir => + val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream" + val metaClient = HoodieTableMetaClient.newTableBuilder() + .setTableType(MERGE_ON_READ) + .setTableName(getTableName(tablePath)) + .setRecordKeyFields("id") + .setPreCombineField("ts") + .initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), tablePath) + + addData(tablePath, Seq(("1", "a1", "10", "000"))) + val df = spark.readStream + .format("org.apache.hudi") + .option(DataSourceReadOptions.INCREMENTAL_READ_SKIP_COMPACT.key(), skipCompact) + .load(tablePath) + .select("id", "name", "price", "ts") + + addData(tablePath, + Seq(("1", "a2", "12", "000"), + ("2", "a3", "12", "000"))) + addData(tablePath, Seq(("2", "a5", "12", "000"), ("1", "a6", "12", "001"))) + // trigger compaction + addData(tablePath, Seq(("3", "a6", "12", "000")), enableInlineCompaction = true) + + testStream(df)( + AssertOnQuery {q => q.processAllAvailable(); true }, + CheckAnswerRows(Seq(Row("1", "a6", "12", "001"), + Row("2", "a5", "12", "000"), + Row("3", "a6", "12", "000")), lastOnly = true, isSorted = false), + StopStream + ) + assertTrue(metaClient.reloadActiveTimeline + .filter(JavaConversions.getPredicate( + e => e.isCompleted && HoodieTimeline.COMMIT_ACTION.equals(e.getAction))) + .countInstants() > 0) + } + }) } test("Test checkpoint translation") { @@ -289,7 +338,10 @@ class TestStreamingSource extends StreamTest { } } - private def addData(inputPath: String, rows: Seq[(String, String, String, String)], enableInlineCompaction: Boolean = false) : Unit = { + private def addData(inputPath: String, + rows: Seq[(String, String, String, String)], + enableInlineCompaction: Boolean = false, + enableInlineCluster: Boolean = false) : Unit = { rows.toDF(columns: _*) .write .format("org.apache.hudi") @@ -297,6 +349,8 @@ class TestStreamingSource extends StreamTest { .option(TBL_NAME.key, getTableName(inputPath)) .option(HoodieCompactionConfig.INLINE_COMPACT.key(), enableInlineCompaction.toString) .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "2") + .option(HoodieClusteringConfig.INLINE_CLUSTERING.key(), enableInlineCluster.toString) + .option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2") .mode(SaveMode.Append) .save(inputPath) }