Skip to content

Commit

Permalink
Add validate CTAS table type
Browse files Browse the repository at this point in the history
  • Loading branch information
dongkelun committed Nov 8, 2022
2 parents 5c1cb21 + bc4c0fc commit 515e9f7
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 76 deletions.
5 changes: 2 additions & 3 deletions README.md
Expand Up @@ -87,7 +87,6 @@ Refer to the table below for building with different Spark and Scala versions.
|:--------------------------|:---------------------------------------------|:-------------------------------------------------|
| (empty) | hudi-spark-bundle_2.11 (legacy bundle name) | For Spark 2.4.4 and Scala 2.11 (default options) |
| `-Dspark2.4` | hudi-spark2.4-bundle_2.11 | For Spark 2.4.4 and Scala 2.11 (same as default) |
| `-Dspark2.4 -Dscala-2.12` | hudi-spark2.4-bundle_2.12 | For Spark 2.4.4 and Scala 2.12 |
| `-Dspark3.1 -Dscala-2.12` | hudi-spark3.1-bundle_2.12 | For Spark 3.1.x and Scala 2.12 |
| `-Dspark3.2 -Dscala-2.12` | hudi-spark3.2-bundle_2.12 | For Spark 3.2.x and Scala 2.12 |
| `-Dspark3.3 -Dscala-2.12` | hudi-spark3.3-bundle_2.12 | For Spark 3.3.x and Scala 2.12 |
Expand All @@ -102,8 +101,8 @@ mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12
# Build against Spark 3.1.x
mvn clean package -DskipTests -Dspark3.1 -Dscala-2.12
# Build against Spark 2.4.4 and Scala 2.12
mvn clean package -DskipTests -Dspark2.4 -Dscala-2.12
# Build against Spark 2.4.4 and Scala 2.11
mvn clean package -DskipTests -Dspark2.4
```

#### What about "spark-avro" module?
Expand Down
Expand Up @@ -279,34 +279,26 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
return createLogWriter(fileSlice, baseCommitTime, "");
return createLogWriter(fileSlice, baseCommitTime, null);
}

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime, String fileSuffix) throws IOException {
int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
long logFileSize = 0L;
String logWriteToken = writeToken + fileSuffix;
String rolloverLogWriteToken = writeToken + fileSuffix;
if (fileSlice.isPresent()) {
Option<HoodieLogFile> latestLogFileOpt = fileSlice.get().getLatestLogFile();
if (latestLogFileOpt.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOpt.get();
logVersion = latestLogFile.getLogVersion();
logFileSize = latestLogFile.getFileSize();
logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath());
}
}
Option<FileSlice> fileSlice, String baseCommitTime, String suffix) throws IOException {
Option<HoodieLogFile> latestLogFile = fileSlice.isPresent()
? fileSlice.get().getLatestLogFile()
: Option.empty();

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(logVersion)
.withFileSize(logFileSize)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(rolloverLogWriteToken)
.withLogWriteToken(logWriteToken)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withSuffix(suffix)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

Expand Down
Expand Up @@ -77,9 +77,10 @@
public class FSUtils {

private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1
// Archive log files are of this pattern - .commits_.archive.1_1-0-1
private static final Pattern LOG_FILE_PATTERN =
Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)(-cdc)?))?");
Pattern.compile("\\.(.+)_(.*)\\.(.+)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(-cdc)?)?");
private static final String LOG_FILE_PREFIX = ".";
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final long MIN_CLEAN_TO_KEEP = 10;
Expand Down
Expand Up @@ -141,6 +141,8 @@ class WriterBuilder {
private Path parentPath;
// Log File Write Token
private String logWriteToken;
// optional file suffix
private String suffix;
// Rollover Log file write token
private String rolloverLogWriteToken;

Expand All @@ -164,6 +166,11 @@ public WriterBuilder withLogWriteToken(String logWriteToken) {
return this;
}

public WriterBuilder withSuffix(String suffix) {
this.suffix = suffix;
return this;
}

public WriterBuilder withFs(FileSystem fs) {
this.fs = fs;
return this;
Expand Down Expand Up @@ -250,6 +257,14 @@ public Writer build() throws IOException {
logWriteToken = rolloverLogWriteToken;
}

if (suffix != null) {
// A little hacky to simplify the file name concatenation:
// patch the write token with an optional suffix
// instead of adding a new extension
logWriteToken = logWriteToken + suffix;
rolloverLogWriteToken = rolloverLogWriteToken + suffix;
}

Path logPath = new Path(parentPath,
FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken));
LOG.info("HoodieLogFile on path " + logPath);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
Expand Down Expand Up @@ -250,6 +251,42 @@ public void tesLogFileName() {
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath));
}

@Test
public void testCdcLogFileName() {
String partitionPath = "2022/11/04/";
String fileName = UUID.randomUUID().toString();
String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, "1-0-1") + HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
Path path = new Path(new Path(partitionPath), logFile);

assertTrue(FSUtils.isLogFile(path));
assertEquals("log", FSUtils.getFileExtensionFromLog(path));
assertEquals(fileName, FSUtils.getFileIdFromLogPath(path));
assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(path));
assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path));
assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path));
assertEquals(0, FSUtils.getStageIdFromLogPath(path));
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path));
assertEquals(2, FSUtils.getFileVersionFromLog(path));
}

@Test
public void testArchiveLogFileName() {
String partitionPath = "2022/11/04/";
String fileName = "commits";
String logFile = FSUtils.makeLogFileName(fileName, ".archive", "", 2, "1-0-1");
Path path = new Path(new Path(partitionPath), logFile);

assertFalse(FSUtils.isLogFile(path));
assertEquals("archive", FSUtils.getFileExtensionFromLog(path));
assertEquals(fileName, FSUtils.getFileIdFromLogPath(path));
assertEquals("", FSUtils.getBaseCommitTimeFromLogPath(path));
assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path));
assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path));
assertEquals(0, FSUtils.getStageIdFromLogPath(path));
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path));
assertEquals(2, FSUtils.getFileVersionFromLog(path));
}

/**
* Test Log File Comparisons when log files do not have write tokens.
*/
Expand Down
Expand Up @@ -136,7 +136,7 @@ public HoodieTableSource(
List<String> partitionKeys,
String defaultPartName,
Configuration conf) {
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null);
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null);
}

public HoodieTableSource(
Expand All @@ -148,7 +148,8 @@ public HoodieTableSource(
@Nullable FileIndex fileIndex,
@Nullable List<Map<String, String>> requiredPartitions,
@Nullable int[] requiredPos,
@Nullable Long limit) {
@Nullable Long limit,
@Nullable HoodieTableMetaClient metaClient) {
this.schema = schema;
this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
this.path = path;
Expand All @@ -164,7 +165,7 @@ public HoodieTableSource(
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient;
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}

Expand Down Expand Up @@ -215,7 +216,7 @@ public ChangelogMode getChangelogMode() {
@Override
public DynamicTableSource copy() {
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
conf, fileIndex, requiredPartitions, requiredPos, limit);
conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient);
}

@Override
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -178,9 +179,12 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
/**
* Create Hive field schemas from Flink table schema including the hoodie metadata fields.
*/
public static List<FieldSchema> toHiveFieldSchema(TableSchema schema) {
public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, boolean withOperationField) {
List<FieldSchema> columns = new ArrayList<>();
for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) {
Collection<String> metaFields = withOperationField
? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence
: HoodieRecord.HOODIE_META_COLUMNS;
for (String metaField : metaFields) {
columns.add(new FieldSchema(metaField, "string", null));
}
columns.addAll(createHiveColumns(schema));
Expand Down
Expand Up @@ -553,7 +553,8 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
// because since Hive 3.x, there is validation when altering table,
// when the metadata fields are synced through the hive sync tool,
// a compatability issue would be reported.
List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema());
boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false"));
List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField);

// Table columns and partition keys
CatalogTable catalogTable = (CatalogTable) table;
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.utils.TestConfigurations;
Expand Down Expand Up @@ -148,6 +149,14 @@ void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() {
assertEquals(expectedFilters, actualFilters);
}

@Test
void testHoodieSourceCachedMetaClient() {
HoodieTableSource tableSource = getEmptyStreamingSource();
HoodieTableMetaClient metaClient = tableSource.getMetaClient();
HoodieTableSource tableSourceCopy = (HoodieTableSource) tableSource.copy();
assertThat(metaClient, is(tableSourceCopy.getMetaClient()));
}

private HoodieTableSource getEmptyStreamingSource() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
Expand Down
7 changes: 3 additions & 4 deletions hudi-kafka-connect/demo/setupKafka.sh
@@ -1,3 +1,4 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -14,8 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

#!/bin/bash

#########################
# The command line help #
#########################
Expand Down Expand Up @@ -79,11 +78,11 @@ while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do
recreateTopic="N"
printf "Argument recreate-topic is N (reuse Kafka topic) \n"
;;
k)
f)
rawDataFile="$OPTARG"
printf "Argument raw-file is %s\n" "$rawDataFile"
;;
f)
k)
kafkaTopicName="$OPTARG"
printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
;;
Expand Down
16 changes: 15 additions & 1 deletion packaging/bundle-validation/Dockerfile-base
Expand Up @@ -16,7 +16,7 @@
#
FROM adoptopenjdk/openjdk8:alpine

RUN apk add --no-cache --upgrade bash
RUN apk add --no-cache --upgrade bash curl jq

RUN mkdir /opt/bundle-validation
ENV WORKDIR=/opt/bundle-validation
Expand All @@ -27,6 +27,8 @@ ARG HIVE_VERSION=3.1.3
ARG DERBY_VERSION=10.14.1.0
ARG SPARK_VERSION=3.1.3
ARG SPARK_HADOOP_VERSION=2.7
ARG CONFLUENT_VERSION=5.5.12
ARG KAFKA_CONNECT_HDFS_VERSION=10.1.13

RUN wget https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz -P "$WORKDIR" \
&& tar -xf $WORKDIR/hadoop-$HADOOP_VERSION.tar.gz -C $WORKDIR/ \
Expand All @@ -47,3 +49,15 @@ RUN wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK
&& tar -xf $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz -C $WORKDIR/ \
&& rm $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz
ENV SPARK_HOME=$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION

RUN wget https://packages.confluent.io/archive/${CONFLUENT_VERSION%.*}/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -P "$WORKDIR" \
&& tar -xf $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -C $WORKDIR/ \
&& rm $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
ENV CONFLUENT_HOME=$WORKDIR/confluent-$CONFLUENT_VERSION

RUN wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs/versions/$KAFKA_CONNECT_HDFS_VERSION/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -P "$WORKDIR" \
&& mkdir $WORKDIR/kafka-connectors \
&& unzip $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -d $WORKDIR/kafka-connectors/ \
&& rm $WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip \
&& printf "\nplugin.path=$WORKDIR/kafka-connectors\n" >> $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
ENV KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH=$WORKDIR/kafka-connectors/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION/lib
12 changes: 12 additions & 0 deletions packaging/bundle-validation/ci_run.sh
Expand Up @@ -36,36 +36,46 @@ if [[ ${SPARK_PROFILE} == 'spark2.4' ]]; then
DERBY_VERSION=10.10.2.0
SPARK_VERSION=2.4.8
SPARK_HADOOP_VERSION=2.7
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark248hive239
elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=3.1.3
DERBY_VERSION=10.14.1.0
SPARK_VERSION=3.1.3
SPARK_HADOOP_VERSION=2.7
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark313hive313
elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=3.1.3
DERBY_VERSION=10.14.1.0
SPARK_VERSION=3.2.2
SPARK_HADOOP_VERSION=2.7
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark322hive313
elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=3.1.3
DERBY_VERSION=10.14.1.0
SPARK_VERSION=3.3.0
SPARK_HADOOP_VERSION=2
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark330hive313
fi

# Copy bundle jars to temp dir for mounting
TMP_JARS_DIR=/tmp/jars/$(date +%s)
mkdir -p $TMP_JARS_DIR
cp ${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
cp ${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
cp ${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
cp ${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar $TMP_JARS_DIR/
echo 'Validating jars below:'
ls -l $TMP_JARS_DIR

Expand All @@ -83,6 +93,8 @@ docker build \
--build-arg DERBY_VERSION=$DERBY_VERSION \
--build-arg SPARK_VERSION=$SPARK_VERSION \
--build-arg SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION \
--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
--build-arg KAFKA_CONNECT_HDFS_VERSION=$KAFKA_CONNECT_HDFS_VERSION \
--build-arg IMAGE_TAG=$IMAGE_TAG \
-t hudi-ci-bundle-validation:$IMAGE_TAG \
.
Expand Down
20 changes: 20 additions & 0 deletions packaging/bundle-validation/kafka/config-sink.json
@@ -0,0 +1,20 @@
{
"name": "hudi-sink",
"config": {
"bootstrap.servers": "localhost:9092",
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
"tasks.max": "2",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"topics": "hudi-test-topic",
"hoodie.table.name": "hudi-test-topic",
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.base.path": "file:///tmp/hudi-kafka-test",
"hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest",
"hoodie.kafka.commit.interval.secs": 10
}
}

0 comments on commit 515e9f7

Please sign in to comment.