Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,18 @@ public class HoodieErrorTableConfig extends HoodieConfig {
.defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
.withDocumentation("The config specifies the failure strategy if error table write fails. "
+ "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));

public static final ConfigProperty<Boolean> ERROR_TABLE_PERSIST_SOURCE_RDD = ConfigProperty
.key("hoodie.errortable.source.rdd.persist")
.defaultValue(false)
.withDocumentation("Enabling this config, persists the sourceRDD to disk which helps in faster processing of data table + error table write DAG");

public static final ConfigProperty<Boolean> ENABLE_ERROR_TABLE_WRITE_UNIFICATION = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we really need a flag for this. This seems like it will be more performant for users with the error table writer enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error table write implementation can't do a union if they implement the upsertAndCommit method so still need to support it behind a feature flag to avoid breaking things for existing users.

.key("hoodie.errortable.write.union.enable")
.defaultValue(false)
.withDocumentation("Enable error table union with data table when writing for improved commit performance. "
+ "By default it is disabled meaning data table and error table writes are sequential");

public enum ErrorWriteFailureStrategy {
ROLLBACK_COMMIT("Rollback the corresponding base table write commit for which the error events were triggered"),
LOG_ERROR("Error is logged but the base table write succeeds");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.config;

import org.apache.hudi.common.config.TypedProperties;

import org.junit.jupiter.api.Test;

import static org.apache.hudi.config.HoodieErrorTableConfig.ENABLE_ERROR_TABLE_WRITE_UNIFICATION;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class TestHoodieErrorTableConfig {

@Test
void testErrorAndBaseTableUnionConfig() {
// documentation is not null
assertNotNull(ENABLE_ERROR_TABLE_WRITE_UNIFICATION.doc());
assertNotEquals("", ENABLE_ERROR_TABLE_WRITE_UNIFICATION.doc());

// disabled by default
assertFalse(ENABLE_ERROR_TABLE_WRITE_UNIFICATION.defaultValue());

// enabled when set to true
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.errortable.write.union.enable", "true");
assertTrue(props.getBoolean(ENABLE_ERROR_TABLE_WRITE_UNIFICATION.key(), ENABLE_ERROR_TABLE_WRITE_UNIFICATION.defaultValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand Down Expand Up @@ -85,4 +86,8 @@ public BaseErrorTableWriter(HoodieStreamer.Config cfg,
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract boolean upsertAndCommit(String baseTableInstantTime, Option<String> commitedInstantTime);

public abstract JavaRDD<WriteStatus> upsert(String errorTableInstantTime, String baseTableInstantTime, Option<String> commitedInstantTime);

public abstract boolean commit(String errorTableInstantTime, JavaRDD<WriteStatus> writeStatuses);
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
import static org.apache.hudi.config.HoodieErrorTableConfig.ENABLE_ERROR_TABLE_WRITE_UNIFICATION;
import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_ENABLED;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
Expand All @@ -156,6 +157,7 @@
import static org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import static org.apache.hudi.utilities.streamer.StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo;

/**
* Sync's one batch of data to hoodie table.
Expand Down Expand Up @@ -257,6 +259,7 @@ public class StreamSync implements Serializable, Closeable {
private transient HoodieMetrics hoodieMetrics;

private final boolean autoGenerateRecordKeys;
private final boolean isErrorTableWriteUnificationEnabled;

@VisibleForTesting
StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
Expand All @@ -277,6 +280,7 @@ public class StreamSync implements Serializable, Closeable {
this.conf = conf;

this.errorTableWriter = errorTableWriter;
this.isErrorTableWriteUnificationEnabled = getBooleanWithAltKeys(props, ENABLE_ERROR_TABLE_WRITE_UNIFICATION);
this.formatAdapter = formatAdapter;
this.transformer = transformer;
}
Expand Down Expand Up @@ -317,6 +321,7 @@ public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
cfg, sparkSession, props, hoodieSparkContext, fs, Option.of(metrics));
this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(props);
}
this.isErrorTableWriteUnificationEnabled = getBooleanWithAltKeys(props, ENABLE_ERROR_TABLE_WRITE_UNIFICATION);
initializeMetaClient();
Source source = UtilHelpers.createSource(cfg.sourceClassName, props, hoodieSparkContext.jsc(), sparkSession, metrics, streamContext);
this.formatAdapter = new SourceFormatAdapter(source, this.errorTableWriter, Option.of(props));
Expand Down Expand Up @@ -802,9 +807,16 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri
Option<String> scheduledCompactionInstant = Option.empty();
// write to hudi and fetch result
WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, instantTime, useRowWriter);
JavaRDD<WriteStatus> writeStatusRDD = writeClientWriteResult.getWriteStatusRDD();
Map<String, List<String>> partitionToReplacedFileIds = writeClientWriteResult.getPartitionToReplacedFileIds();

// write to error table
JavaRDD<WriteStatus> dataTableWriteStatusRDD = writeClientWriteResult.getWriteStatusRDD();
JavaRDD<WriteStatus> writeStatusRDD = dataTableWriteStatusRDD;
String errorTableInstantTime = writeClient.createNewInstantTime();
Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt = Option.empty();
if (errorTableWriter.isPresent() && isErrorTableWriteUnificationEnabled) {
errorTableWriteStatusRDDOpt = errorTableWriter.map(w -> w.upsert(errorTableInstantTime, instantTime, getLatestCommittedInstant()));
writeStatusRDD = errorTableWriteStatusRDDOpt.map(errorTableWriteStatus -> errorTableWriteStatus.union(dataTableWriteStatusRDD)).orElse(dataTableWriteStatusRDD);
}
// process write status
long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
Expand All @@ -824,9 +836,13 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri
}
String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
if (errorTableWriter.isPresent()) {
boolean errorTableSuccess = true;
// Commit the error events triggered so far to the error table
Option<String> commitedInstantTime = StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
boolean errorTableSuccess = errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
if (isErrorTableWriteUnificationEnabled && errorTableWriteStatusRDDOpt.isPresent()) {
errorTableSuccess = errorTableWriter.get().commit(errorTableInstantTime, errorTableWriteStatusRDDOpt.get());
} else if (!isErrorTableWriteUnificationEnabled) {
errorTableSuccess = errorTableWriter.get().upsertAndCommit(instantTime, getLatestCommittedInstant());
}
if (!errorTableSuccess) {
switch (errorWriteFailureStrategy) {
case ROLLBACK_COMMIT:
Expand All @@ -841,7 +857,7 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri
}
}
}
boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty());
boolean success = writeClient.commit(instantTime, dataTableWriteStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty());
releaseResourcesInvoked = true;
if (success) {
LOG.info("Commit " + instantTime + " successful!");
Expand All @@ -864,7 +880,7 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri
} else {
LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("Printing out the top 100 errors");
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
dataTableWriteStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
LOG.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
Expand All @@ -878,7 +894,7 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri

// Send DeltaStreamer Metrics
metrics.updateStreamerMetrics(overallTimeNanos);
return Pair.of(scheduledCompactionInstant, writeStatusRDD);
return Pair.of(scheduledCompactionInstant, dataTableWriteStatusRDD);
} finally {
if (!releaseResourcesInvoked) {
releaseResources(instantTime);
Expand Down Expand Up @@ -1306,6 +1322,18 @@ public Option<String> getClusteringInstantOpt() {
}
}

private Option<String> getLatestCommittedInstant() {
try {
// If timelineLayout version changes, initialize the meta client again.
if (commitsTimelineOpt.get().getTimelineLayoutVersion() != writeClient.getConfig().getWriteVersion().getTimelineLayoutVersion()) {
initializeMetaClientAndRefreshTimeline();
}
return getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
} catch (IOException e) {
throw new HoodieIOException("Failed to load meta client", e);
}
}

class WriteClientWriteResult {
private Map<String, List<String>> partitionToReplacedFileIds = Collections.emptyMap();
private final JavaRDD<WriteStatus> writeStatusRDD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,23 @@ protected static void prepareParquetDFSFiles(int numRecords) throws IOException
}

protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath) throws IOException {
prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null).close();
}

protected static void prepareParquetDFSMultiFiles(int numRecords, String baseParquetPath, int numFiles) throws IOException {
if (numFiles <= 0) {
throw new IllegalArgumentException("Number of files must be greater than zero");
}
int recordsPerFile = numRecords / numFiles;
int extraRecords = numRecords % numFiles;
for (int i = 0; i < numFiles; i++) {
String fileName = String.format("%05d", i) + ".parquet";
// Add remaining records to the last file
int recordsInThisFile = (i == numFiles - 1) ? recordsPerFile + extraRecords : recordsPerFile;
if (recordsInThisFile > 0) {
prepareParquetDFSFiles(recordsInThisFile, baseParquetPath, fileName, false, null, null).close();
}
}
}

protected static HoodieTestDataGenerator prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.utilities.deltastreamer;

import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Tuple3;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;

import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class TestHoodieDeltaStreamerErrorTableWriteFlow extends TestHoodieDeltaStreamerSchemaEvolutionBase {
protected void testBase(Tuple3<Integer, Integer, Integer> sourceGenInfo) throws Exception {
int totalRecords = sourceGenInfo.f0;
int errorRecords = sourceGenInfo.f1;
int numFiles = sourceGenInfo.f2;
boolean shouldCreateMultipleSourceFiles = numFiles > 1;

PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++;

if (totalRecords > 0) {
if (shouldCreateMultipleSourceFiles) {
prepareParquetDFSMultiFiles(totalRecords - errorRecords, PARQUET_SOURCE_ROOT, numFiles);
} else {
prepareParquetDFSFiles(totalRecords - errorRecords, PARQUET_SOURCE_ROOT);
}

// Add error data to the source
if (errorRecords > 0) {
String errorDataSourceRoot = basePath + "parquetErrorFilesDfs" + testNum++;
prepareParquetDFSFiles(errorRecords, errorDataSourceRoot);
Dataset<Row> df = sparkSession.read().parquet(errorDataSourceRoot);
df = df.withColumn("_row_key", functions.lit(""));
// add error records to PARQUET_SOURCE_ROOT
addParquetData(df, false);
}
} else {
fs.mkdirs(new Path(PARQUET_SOURCE_ROOT));
}

tableName = "test_parquet_table" + testNum;
tableBasePath = basePath + tableName;
this.deltaStreamer = new HoodieDeltaStreamer(getDeltaStreamerConfig(), jsc);
this.deltaStreamer.sync();

// base table validation
Dataset<Row> baseDf = sparkSession.read().format("hudi").load(tableBasePath);
assertEquals(totalRecords - errorRecords, baseDf.count());

HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()))
.setBasePath(tableBasePath).build();
assertEquals(1, metaClient.getActiveTimeline().getInstants().size());

// error table validation
if (withErrorTable) {
TestHoodieDeltaStreamerSchemaEvolutionExtensive.validateErrorTable(errorRecords, this.writeErrorTableInParallelWithBaseTable);
}
}

protected static Stream<Arguments> testErrorTableWriteFlowArgs() {
Stream.Builder<Arguments> b = Stream.builder();
// totalRecords, numErrorRecords, numSourceFiles, WriteOperationType, shouldWriteErrorTableInUnionWithBaseTable

// empty source, error table union enabled, INSERT
b.add(Arguments.of(0, 0, 0, WriteOperationType.INSERT, true));
// empty source, error table union disabled, INSERT
b.add(Arguments.of(0, 0, 0, WriteOperationType.INSERT, false));
// non-empty source, error table union enabled, INSERT
b.add(Arguments.of(100, 5, 1, WriteOperationType.INSERT, true));
// non-empty source, error table union disabled, INSERT
b.add(Arguments.of(100, 5, 1, WriteOperationType.INSERT, false));
// non-empty source, error table union enabled, UPSERT
b.add(Arguments.of(100, 5, 1, WriteOperationType.UPSERT, true));
// non-empty source, error table union disabled, UPSERT
b.add(Arguments.of(100, 5, 1, WriteOperationType.UPSERT, false));
// non-empty source, error table union enabled, BULK_INSERT
b.add(Arguments.of(100, 5, 1, WriteOperationType.BULK_INSERT, true));
// non-empty source, error table union disabled, BULK_INSERT
b.add(Arguments.of(100, 5, 1, WriteOperationType.BULK_INSERT, false));
return b.build();
}

@ParameterizedTest
@MethodSource("testErrorTableWriteFlowArgs")
void testErrorTableWriteFlow(
int totalRecords,
int numErrorRecords,
int numSourceFiles,
WriteOperationType wopType,
boolean writeErrorTableInParallel) throws Exception {
this.withErrorTable = true;
this.writeErrorTableInParallelWithBaseTable = writeErrorTableInParallel;
this.writeOperationType = wopType;
this.useSchemaProvider = false;
this.useTransformer = false;
this.tableType = "COPY_ON_WRITE";
this.shouldCluster = false;
this.shouldCompact = false;
this.rowWriterEnable = false;
this.addFilegroups = false;
this.multiLogFiles = false;
this.dfsSourceLimitBytes = 100000000; // set source limit to 100mb
testBase(Tuple3.of(totalRecords, numErrorRecords, numSourceFiles));
}
}
Loading
Loading