From 7a1a6837e0c7be2cb401fbe6be8bbbb72064feae Mon Sep 17 00:00:00 2001 From: chao chen <59957056+waywtdcc@users.noreply.github.com> Date: Mon, 7 Nov 2022 10:13:57 +0800 Subject: [PATCH 1/6] [HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Flink table (#7056) * sync `_hoodie_operation` meta field if changelog mode is enabled. --- .../org/apache/hudi/table/catalog/HiveSchemaUtils.java | 8 ++++++-- .../org/apache/hudi/table/catalog/HoodieHiveCatalog.java | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index a057c02f2cca3..4383b42e9f8d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -178,9 +179,12 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { /** * Create Hive field schemas from Flink table schema including the hoodie metadata fields. */ - public static List toHiveFieldSchema(TableSchema schema) { + public static List toHiveFieldSchema(TableSchema schema, boolean withOperationField) { List columns = new ArrayList<>(); - for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) { + Collection metaFields = withOperationField + ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence + : HoodieRecord.HOODIE_META_COLUMNS; + for (String metaField : metaFields) { columns.add(new FieldSchema(metaField, "string", null)); } columns.addAll(createHiveColumns(schema)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index d6e70f16eaf7c..85f82d53d9102 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -553,7 +553,8 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, // because since Hive 3.x, there is validation when altering table, // when the metadata fields are synced through the hive sync tool, // a compatability issue would be reported. - List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema()); + boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false")); + List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField); // Table columns and partition keys CatalogTable catalogTable = (CatalogTable) table; From 547a2b014ed1a8d7dacf4818acad3cfe0c8fefac Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sun, 6 Nov 2022 23:55:02 -0800 Subject: [PATCH 2/6] [MINOR] Removing spark2 scala12 combinations from readme (#7112) --- README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 31598a0567258..bbbe412b8c574 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,6 @@ Refer to the table below for building with different Spark and Scala versions. |:--------------------------|:---------------------------------------------|:-------------------------------------------------| | (empty) | hudi-spark-bundle_2.11 (legacy bundle name) | For Spark 2.4.4 and Scala 2.11 (default options) | | `-Dspark2.4` | hudi-spark2.4-bundle_2.11 | For Spark 2.4.4 and Scala 2.11 (same as default) | -| `-Dspark2.4 -Dscala-2.12` | hudi-spark2.4-bundle_2.12 | For Spark 2.4.4 and Scala 2.12 | | `-Dspark3.1 -Dscala-2.12` | hudi-spark3.1-bundle_2.12 | For Spark 3.1.x and Scala 2.12 | | `-Dspark3.2 -Dscala-2.12` | hudi-spark3.2-bundle_2.12 | For Spark 3.2.x and Scala 2.12 | | `-Dspark3.3 -Dscala-2.12` | hudi-spark3.3-bundle_2.12 | For Spark 3.3.x and Scala 2.12 | @@ -102,8 +101,8 @@ mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12 # Build against Spark 3.1.x mvn clean package -DskipTests -Dspark3.1 -Dscala-2.12 -# Build against Spark 2.4.4 and Scala 2.12 -mvn clean package -DskipTests -Dspark2.4 -Dscala-2.12 +# Build against Spark 2.4.4 and Scala 2.11 +mvn clean package -DskipTests -Dspark2.4 ``` #### What about "spark-avro" module? From c1e1825eb12bd9d902b866c0fcca38facd5a355d Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 7 Nov 2022 17:49:21 +0800 Subject: [PATCH 3/6] [HUDI-5153] Fix the write token name resolution of cdc log file (#7128) --- .../org/apache/hudi/io/HoodieWriteHandle.java | 30 ++++++--------- .../org/apache/hudi/common/fs/FSUtils.java | 5 ++- .../common/table/log/HoodieLogFormat.java | 15 ++++++++ .../apache/hudi/common/fs/TestFSUtils.java | 37 +++++++++++++++++++ 4 files changed, 66 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 807f14ca2883c..3cdc6e82d02a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -279,34 +279,26 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho protected HoodieLogFormat.Writer createLogWriter( Option fileSlice, String baseCommitTime) throws IOException { - return createLogWriter(fileSlice, baseCommitTime, ""); + return createLogWriter(fileSlice, baseCommitTime, null); } protected HoodieLogFormat.Writer createLogWriter( - Option fileSlice, String baseCommitTime, String fileSuffix) throws IOException { - int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION; - long logFileSize = 0L; - String logWriteToken = writeToken + fileSuffix; - String rolloverLogWriteToken = writeToken + fileSuffix; - if (fileSlice.isPresent()) { - Option latestLogFileOpt = fileSlice.get().getLatestLogFile(); - if (latestLogFileOpt.isPresent()) { - HoodieLogFile latestLogFile = latestLogFileOpt.get(); - logVersion = latestLogFile.getLogVersion(); - logFileSize = latestLogFile.getFileSize(); - logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath()); - } - } + Option fileSlice, String baseCommitTime, String suffix) throws IOException { + Option latestLogFile = fileSlice.isPresent() + ? fileSlice.get().getLatestLogFile() + : Option.empty(); + return HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath)) .withFileId(fileId) .overBaseCommit(baseCommitTime) - .withLogVersion(logVersion) - .withFileSize(logFileSize) + .withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) + .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L)) .withSizeThreshold(config.getLogFileMaxSize()) .withFs(fs) - .withRolloverLogWriteToken(rolloverLogWriteToken) - .withLogWriteToken(logWriteToken) + .withRolloverLogWriteToken(writeToken) + .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) + .withSuffix(suffix) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index eb5fbe917985c..e73fbee3df0e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -77,9 +77,10 @@ public class FSUtils { private static final Logger LOG = LogManager.getLogger(FSUtils.class); - // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1 + // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1 + // Archive log files are of this pattern - .commits_.archive.1_1-0-1 private static final Pattern LOG_FILE_PATTERN = - Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)(-cdc)?))?"); + Pattern.compile("\\.(.+)_(.*)\\.(.+)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(-cdc)?)?"); private static final String LOG_FILE_PREFIX = "."; private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final long MIN_CLEAN_TO_KEEP = 10; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 569b4a23b683b..6958b7930bded 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -141,6 +141,8 @@ class WriterBuilder { private Path parentPath; // Log File Write Token private String logWriteToken; + // optional file suffix + private String suffix; // Rollover Log file write token private String rolloverLogWriteToken; @@ -164,6 +166,11 @@ public WriterBuilder withLogWriteToken(String logWriteToken) { return this; } + public WriterBuilder withSuffix(String suffix) { + this.suffix = suffix; + return this; + } + public WriterBuilder withFs(FileSystem fs) { this.fs = fs; return this; @@ -250,6 +257,14 @@ public Writer build() throws IOException { logWriteToken = rolloverLogWriteToken; } + if (suffix != null) { + // A little hacky to simplify the file name concatenation: + // patch the write token with an optional suffix + // instead of adding a new extension + logWriteToken = logWriteToken + suffix; + rolloverLogWriteToken = rolloverLogWriteToken + suffix; + } + Path logPath = new Path(parentPath, FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken)); LOG.info("HoodieLogFile on path " + logPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 481bb1dd452da..d8bdaca0782d9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.cdc.HoodieCDCUtils; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -250,6 +251,42 @@ public void tesLogFileName() { assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath)); } + @Test + public void testCdcLogFileName() { + String partitionPath = "2022/11/04/"; + String fileName = UUID.randomUUID().toString(); + String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, "1-0-1") + HoodieCDCUtils.CDC_LOGFILE_SUFFIX; + Path path = new Path(new Path(partitionPath), logFile); + + assertTrue(FSUtils.isLogFile(path)); + assertEquals("log", FSUtils.getFileExtensionFromLog(path)); + assertEquals(fileName, FSUtils.getFileIdFromLogPath(path)); + assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(path)); + assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path)); + assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path)); + assertEquals(0, FSUtils.getStageIdFromLogPath(path)); + assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path)); + assertEquals(2, FSUtils.getFileVersionFromLog(path)); + } + + @Test + public void testArchiveLogFileName() { + String partitionPath = "2022/11/04/"; + String fileName = "commits"; + String logFile = FSUtils.makeLogFileName(fileName, ".archive", "", 2, "1-0-1"); + Path path = new Path(new Path(partitionPath), logFile); + + assertFalse(FSUtils.isLogFile(path)); + assertEquals("archive", FSUtils.getFileExtensionFromLog(path)); + assertEquals(fileName, FSUtils.getFileIdFromLogPath(path)); + assertEquals("", FSUtils.getBaseCommitTimeFromLogPath(path)); + assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path)); + assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path)); + assertEquals(0, FSUtils.getStageIdFromLogPath(path)); + assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path)); + assertEquals(2, FSUtils.getFileVersionFromLog(path)); + } + /** * Test Log File Comparisons when log files do not have write tokens. */ From df7b767a332ee21c3139fd224ef4e142fe70ca49 Mon Sep 17 00:00:00 2001 From: Shizhi Chen <107476116+chenshzh@users.noreply.github.com> Date: Mon, 7 Nov 2022 17:51:40 +0800 Subject: [PATCH 4/6] [HUDI-5066] Support flink hoodie source metaclient cache (#7017) --- .../java/org/apache/hudi/table/HoodieTableSource.java | 9 +++++---- .../org/apache/hudi/table/TestHoodieTableSource.java | 9 +++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 8571390f53093..31aba2b2db132 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -136,7 +136,7 @@ public HoodieTableSource( List partitionKeys, String defaultPartName, Configuration conf) { - this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null); + this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null); } public HoodieTableSource( @@ -148,7 +148,8 @@ public HoodieTableSource( @Nullable FileIndex fileIndex, @Nullable List> requiredPartitions, @Nullable int[] requiredPos, - @Nullable Long limit) { + @Nullable Long limit, + @Nullable HoodieTableMetaClient metaClient) { this.schema = schema; this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(); this.path = path; @@ -164,7 +165,7 @@ public HoodieTableSource( : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); - this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); + this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient; this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); } @@ -215,7 +216,7 @@ public ChangelogMode getChangelogMode() { @Override public DynamicTableSource copy() { return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, - conf, fileIndex, requiredPartitions, requiredPos, limit); + conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index d8093793fcccc..10a7e44373573 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.utils.TestConfigurations; @@ -148,6 +149,14 @@ void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() { assertEquals(expectedFilters, actualFilters); } + @Test + void testHoodieSourceCachedMetaClient() { + HoodieTableSource tableSource = getEmptyStreamingSource(); + HoodieTableMetaClient metaClient = tableSource.getMetaClient(); + HoodieTableSource tableSourceCopy = (HoodieTableSource) tableSource.copy(); + assertThat(metaClient, is(tableSourceCopy.getMetaClient())); + } + private HoodieTableSource getEmptyStreamingSource() { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); From 0ef9a86374816e70e0961c5b1133313745cad0fc Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 7 Nov 2022 23:32:39 +0800 Subject: [PATCH 5/6] [HUDI-5132] Add hadoop-mr bundle validation (#7157) --- packaging/bundle-validation/ci_run.sh | 1 + .../write.scala} | 10 +---- packaging/bundle-validation/validate.sh | 39 ++++++++++++++----- 3 files changed, 32 insertions(+), 18 deletions(-) rename packaging/bundle-validation/{spark/validate.scala => spark_hadoop_mr/write.scala} (87%) diff --git a/packaging/bundle-validation/ci_run.sh b/packaging/bundle-validation/ci_run.sh index c2582d4452022..4cc795f405902 100755 --- a/packaging/bundle-validation/ci_run.sh +++ b/packaging/bundle-validation/ci_run.sh @@ -63,6 +63,7 @@ fi # Copy bundle jars to temp dir for mounting TMP_JARS_DIR=/tmp/jars/$(date +%s) mkdir -p $TMP_JARS_DIR +cp ${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ cp ${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ diff --git a/packaging/bundle-validation/spark/validate.scala b/packaging/bundle-validation/spark_hadoop_mr/write.scala similarity index 87% rename from packaging/bundle-validation/spark/validate.scala rename to packaging/bundle-validation/spark_hadoop_mr/write.scala index 01faa38509809..6c4745493e6cf 100644 --- a/packaging/bundle-validation/spark/validate.scala +++ b/packaging/bundle-validation/spark_hadoop_mr/write.scala @@ -47,11 +47,5 @@ df.write.format("hudi"). save(basePath) spark.sql("desc " + tableName).show -val actual = spark.sql("select * from " + tableName).count -if (expected == actual) { - System.out.println($"bundle combination passed sanity run.") - System.exit(0) -} else { - System.err.println($"bundle combination failed sanity run:\n\tshould have written $expected records in $database.$tableName") - System.exit(1) -} + +System.exit(0) diff --git a/packaging/bundle-validation/validate.sh b/packaging/bundle-validation/validate.sh index ee5255cf0d8ad..67abdd5d65658 100755 --- a/packaging/bundle-validation/validate.sh +++ b/packaging/bundle-validation/validate.sh @@ -28,31 +28,50 @@ WORKDIR=/opt/bundle-validation JARS_DIR=${WORKDIR}/jars # link the jar names to easier to use names +ln -sf $JARS_DIR/hudi-hadoop-mr*.jar $JARS_DIR/hadoop-mr.jar ln -sf $JARS_DIR/hudi-spark*.jar $JARS_DIR/spark.jar ln -sf $JARS_DIR/hudi-utilities-bundle*.jar $JARS_DIR/utilities.jar ln -sf $JARS_DIR/hudi-utilities-slim*.jar $JARS_DIR/utilities-slim.jar ## -# Function to test the spark bundle with hive sync. +# Function to test the spark & hadoop-mr bundles with hive sync. # # env vars (defined in container): # HIVE_HOME: path to the hive directory # DERBY_HOME: path to the derby directory # SPARK_HOME: path to the spark directory ## -test_spark_bundle () { - echo "::warning::validate.sh setting up hive metastore for spark bundle validation" +test_spark_hadoop_mr_bundles () { + echo "::warning::validate.sh setting up hive metastore for spark & hadoop-mr bundles validation" $DERBY_HOME/bin/startNetworkServer -h 0.0.0.0 & - $HIVE_HOME/bin/hiveserver2 & - echo "::warning::validate.sh hive metastore setup complete. Testing" - $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar < $WORKDIR/spark/validate.scala - if [ "$?" -ne 0 ]; then - echo "::error::validate.sh failed hive testing" + $HIVE_HOME/bin/hiveserver2 --hiveconf hive.aux.jars.path=$JARS_DIR/hadoop-mr.jar & + echo "::warning::validate.sh Writing sample data via Spark DataSource and run Hive Sync..." + $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar < $WORKDIR/spark_hadoop_mr/write.scala + + echo "::warning::validate.sh Query and validate the results using Spark SQL" + # save Spark SQL query results + $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar \ + -i <(echo 'spark.sql("select * from trips").coalesce(1).write.csv("/tmp/sparksql/trips/results"); System.exit(0)') + numRecordsSparkSQL=$(cat /tmp/sparksql/trips/results/*.csv | wc -l) + if [ "$numRecordsSparkSQL" -ne 10 ]; then + echo "::error::validate.sh Spark SQL validation failed." + exit 1 + fi + echo "::warning::validate.sh Query and validate the results using HiveQL" + # save HiveQL query results + hiveqlresultsdir=/tmp/hiveql/trips/results + mkdir -p $hiveqlresultsdir + $HIVE_HOME/bin/beeline --hiveconf hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat \ + -u jdbc:hive2://localhost:10000/default --showHeader=false --outputformat=csv2 \ + -e 'select * from trips' >> $hiveqlresultsdir/results.csv + numRecordsHiveQL=$(cat $hiveqlresultsdir/*.csv | wc -l) + if [ "$numRecordsHiveQL" -ne 10 ]; then + echo "::error::validate.sh HiveQL validation failed." exit 1 fi - echo "::warning::validate.sh spark bundle validation successful" + echo "::warning::validate.sh spark & hadoop-mr bundles validation was successful." } @@ -112,7 +131,7 @@ test_utilities_bundle () { } -test_spark_bundle +test_spark_hadoop_mr_bundles if [ "$?" -ne 0 ]; then exit 1 fi From bc4c0fcb3d619375a91cc7cfeee3a23d811a77b0 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 8 Nov 2022 02:25:49 +0800 Subject: [PATCH 6/6] [HUDI-2673] Add kafka connect bundle to validation test (#7131) --- hudi-kafka-connect/demo/setupKafka.sh | 7 ++- packaging/bundle-validation/Dockerfile-base | 16 +++++- packaging/bundle-validation/ci_run.sh | 11 +++++ .../bundle-validation/kafka/config-sink.json | 20 ++++++++ packaging/bundle-validation/kafka/consume.sh | 34 +++++++++++++ packaging/bundle-validation/kafka/produce.sh | 30 ++++++++++++ packaging/bundle-validation/validate.sh | 49 +++++++++++++++++-- 7 files changed, 157 insertions(+), 10 deletions(-) create mode 100644 packaging/bundle-validation/kafka/config-sink.json create mode 100755 packaging/bundle-validation/kafka/consume.sh create mode 100755 packaging/bundle-validation/kafka/produce.sh diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh index 5c618b2a70266..e4e8d2e382ed1 100755 --- a/hudi-kafka-connect/demo/setupKafka.sh +++ b/hudi-kafka-connect/demo/setupKafka.sh @@ -1,3 +1,4 @@ +#!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,8 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -#!/bin/bash - ######################### # The command line help # ######################### @@ -79,11 +78,11 @@ while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do recreateTopic="N" printf "Argument recreate-topic is N (reuse Kafka topic) \n" ;; - k) + f) rawDataFile="$OPTARG" printf "Argument raw-file is %s\n" "$rawDataFile" ;; - f) + k) kafkaTopicName="$OPTARG" printf "Argument kafka-topic is %s\n" "$kafkaTopicName" ;; diff --git a/packaging/bundle-validation/Dockerfile-base b/packaging/bundle-validation/Dockerfile-base index 1e782e08d5292..81df6ce0c029d 100644 --- a/packaging/bundle-validation/Dockerfile-base +++ b/packaging/bundle-validation/Dockerfile-base @@ -16,7 +16,7 @@ # FROM adoptopenjdk/openjdk8:alpine -RUN apk add --no-cache --upgrade bash +RUN apk add --no-cache --upgrade bash curl jq RUN mkdir /opt/bundle-validation ENV WORKDIR=/opt/bundle-validation @@ -27,6 +27,8 @@ ARG HIVE_VERSION=3.1.3 ARG DERBY_VERSION=10.14.1.0 ARG SPARK_VERSION=3.1.3 ARG SPARK_HADOOP_VERSION=2.7 +ARG CONFLUENT_VERSION=5.5.12 +ARG KAFKA_CONNECT_HDFS_VERSION=10.1.13 RUN wget https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz -P "$WORKDIR" \ && tar -xf $WORKDIR/hadoop-$HADOOP_VERSION.tar.gz -C $WORKDIR/ \ @@ -47,3 +49,15 @@ RUN wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK && tar -xf $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz -C $WORKDIR/ \ && rm $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz ENV SPARK_HOME=$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION + +RUN wget https://packages.confluent.io/archive/${CONFLUENT_VERSION%.*}/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -P "$WORKDIR" \ + && tar -xf $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -C $WORKDIR/ \ + && rm $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz +ENV CONFLUENT_HOME=$WORKDIR/confluent-$CONFLUENT_VERSION + +RUN wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs/versions/$KAFKA_CONNECT_HDFS_VERSION/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -P "$WORKDIR" \ + && mkdir $WORKDIR/kafka-connectors \ + && unzip $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -d $WORKDIR/kafka-connectors/ \ + && rm $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip \ + && printf "\nplugin.path=$WORKDIR/kafka-connectors\n" >> $CONFLUENT_HOME/etc/kafka/connect-distributed.properties +ENV KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH=$WORKDIR/kafka-connectors/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION/lib diff --git a/packaging/bundle-validation/ci_run.sh b/packaging/bundle-validation/ci_run.sh index 4cc795f405902..46ef80964bed2 100755 --- a/packaging/bundle-validation/ci_run.sh +++ b/packaging/bundle-validation/ci_run.sh @@ -36,6 +36,8 @@ if [[ ${SPARK_PROFILE} == 'spark2.4' ]]; then DERBY_VERSION=10.10.2.0 SPARK_VERSION=2.4.8 SPARK_HADOOP_VERSION=2.7 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark248hive239 elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then HADOOP_VERSION=2.7.7 @@ -43,6 +45,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then DERBY_VERSION=10.14.1.0 SPARK_VERSION=3.1.3 SPARK_HADOOP_VERSION=2.7 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark313hive313 elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then HADOOP_VERSION=2.7.7 @@ -50,6 +54,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then DERBY_VERSION=10.14.1.0 SPARK_VERSION=3.2.2 SPARK_HADOOP_VERSION=2.7 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark322hive313 elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then HADOOP_VERSION=2.7.7 @@ -57,6 +63,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then DERBY_VERSION=10.14.1.0 SPARK_VERSION=3.3.0 SPARK_HADOOP_VERSION=2 + CONFLUENT_VERSION=5.5.12 + KAFKA_CONNECT_HDFS_VERSION=10.1.13 IMAGE_TAG=spark330hive313 fi @@ -67,6 +75,7 @@ cp ${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSI cp ${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ +cp ${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/ echo 'Validating jars below:' ls -l $TMP_JARS_DIR @@ -84,6 +93,8 @@ docker build \ --build-arg DERBY_VERSION=$DERBY_VERSION \ --build-arg SPARK_VERSION=$SPARK_VERSION \ --build-arg SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION \ +--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \ +--build-arg KAFKA_CONNECT_HDFS_VERSION=$KAFKA_CONNECT_HDFS_VERSION \ --build-arg IMAGE_TAG=$IMAGE_TAG \ -t hudi-ci-bundle-validation:$IMAGE_TAG \ . diff --git a/packaging/bundle-validation/kafka/config-sink.json b/packaging/bundle-validation/kafka/config-sink.json new file mode 100644 index 0000000000000..0318ec961a598 --- /dev/null +++ b/packaging/bundle-validation/kafka/config-sink.json @@ -0,0 +1,20 @@ +{ + "name": "hudi-sink", + "config": { + "bootstrap.servers": "localhost:9092", + "connector.class": "org.apache.hudi.connect.HoodieSinkConnector", + "tasks.max": "2", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter.schemas.enable": "false", + "topics": "hudi-test-topic", + "hoodie.table.name": "hudi-test-topic", + "hoodie.table.type": "COPY_ON_WRITE", + "hoodie.base.path": "file:///tmp/hudi-kafka-test", + "hoodie.datasource.write.recordkey.field": "volume", + "hoodie.datasource.write.partitionpath.field": "date", + "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", + "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest", + "hoodie.kafka.commit.interval.secs": 10 + } +} diff --git a/packaging/bundle-validation/kafka/consume.sh b/packaging/bundle-validation/kafka/consume.sh new file mode 100755 index 0000000000000..5ae67c3b61379 --- /dev/null +++ b/packaging/bundle-validation/kafka/consume.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties & +sleep 30 +curl -X POST -H "Content-Type:application/json" -d @/opt/bundle-validation/kafka/config-sink.json http://localhost:8083/connectors & +sleep 30 +curl -X DELETE http://localhost:8083/connectors/hudi-sink & +sleep 10 + +# validate +numCommits=$(ls /tmp/hudi-kafka-test/.hoodie/*.commit | wc -l) +if [ $numCommits -gt 0 ]; then + exit 0 +else + exit 1 +fi diff --git a/packaging/bundle-validation/kafka/produce.sh b/packaging/bundle-validation/kafka/produce.sh new file mode 100755 index 0000000000000..7f828b5d3b6f3 --- /dev/null +++ b/packaging/bundle-validation/kafka/produce.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +kafkaTopicName=hudi-test-topic + +# Setup the schema registry +SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' /opt/bundle-validation/data/stocks/schema.avsc | sed '/\/\*/,/*\//d' | jq tostring) +curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions + +# produce data +cat /opt/bundle-validation/data/stocks/data/batch_1.json /opt/bundle-validation/data/stocks/data/batch_2.json | $CONFLUENT_HOME/bin/kafka-console-producer \ +--bootstrap-server http://localhost:9092 \ +--topic ${kafkaTopicName} diff --git a/packaging/bundle-validation/validate.sh b/packaging/bundle-validation/validate.sh index 67abdd5d65658..7d40228651cb5 100755 --- a/packaging/bundle-validation/validate.sh +++ b/packaging/bundle-validation/validate.sh @@ -32,6 +32,7 @@ ln -sf $JARS_DIR/hudi-hadoop-mr*.jar $JARS_DIR/hadoop-mr.jar ln -sf $JARS_DIR/hudi-spark*.jar $JARS_DIR/spark.jar ln -sf $JARS_DIR/hudi-utilities-bundle*.jar $JARS_DIR/utilities.jar ln -sf $JARS_DIR/hudi-utilities-slim*.jar $JARS_DIR/utilities-slim.jar +ln -sf $JARS_DIR/hudi-kafka-connect-bundle*.jar $JARS_DIR/kafka-connect.jar ## @@ -131,26 +132,64 @@ test_utilities_bundle () { } +## +# Function to test the kafka-connect bundle. +# It runs zookeeper, kafka broker, schema registry, and connector worker. +# After producing and consuming data, it checks successful commit under `.hoodie/` +# +# 1st arg: path to the hudi-kafka-connect-bundle.jar (for writing data) +# +# env vars (defined in container): +# CONFLUENT_HOME: path to the confluent community directory +# KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH: path to install hudi-kafka-connect-bundle.jar +## +test_kafka_connect_bundle() { + KAFKA_CONNECT_JAR=$1 + cp $KAFKA_CONNECT_JAR $KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH + $CONFLUENT_HOME/bin/zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties & + $CONFLUENT_HOME/bin/kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties & + sleep 10 + $CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties & + sleep 10 + $CONFLUENT_HOME/bin/kafka-topics --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 + $WORKDIR/kafka/produce.sh + $WORKDIR/kafka/consume.sh +} + + +############################ +# Execute tests +############################ + +echo "::warning::validate.sh validating spark & hadoop-mr bundle" test_spark_hadoop_mr_bundles if [ "$?" -ne 0 ]; then exit 1 fi +echo "::warning::validate.sh done validating spark & hadoop-mr bundle" if [[ $SPARK_HOME == *"spark-2.4"* ]] || [[ $SPARK_HOME == *"spark-3.1"* ]] then - echo "::warning::validate.sh testing utilities bundle" + echo "::warning::validate.sh validating utilities bundle" test_utilities_bundle $JARS_DIR/utilities.jar if [ "$?" -ne 0 ]; then exit 1 fi - echo "::warning::validate.sh done testing utilities bundle" + echo "::warning::validate.sh done validating utilities bundle" else - echo "::warning::validate.sh skip testing utilities bundle for non-spark2.4 & non-spark3.1 build" + echo "::warning::validate.sh skip validating utilities bundle for non-spark2.4 & non-spark3.1 build" fi -echo "::warning::validate.sh testing utilities slim bundle" +echo "::warning::validate.sh validating utilities slim bundle" test_utilities_bundle $JARS_DIR/utilities-slim.jar $JARS_DIR/spark.jar if [ "$?" -ne 0 ]; then exit 1 fi -echo "::warning::validate.sh done testing utilities slim bundle" +echo "::warning::validate.sh done validating utilities slim bundle" + +echo "::warning::validate.sh validating kafka connect bundle" +test_kafka_connect_bundle $JARS_DIR/kafka-connect.jar +if [ "$?" -ne 0 ]; then + exit 1 +fi +echo "::warning::validate.sh done validating kafka connect bundle"