Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-648][RFC-20] Implement error log/table for Datasource/DeltaStreamer/WriteClient/Compaction writes #3312

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 9 additions & 11 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,17 @@ public void rollbackFailedBootstrap() {
*/
public abstract O insert(I records, final String instantTime);

/**
* Insert the given Error HoodieRecords into the hudi error table
* <p>
* This API is intended to be used to write error records.
*
* @param records Error HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O insertError(I records, final String instantTime);

/**
* Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieErrorTableConfig;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
Expand Down Expand Up @@ -1943,6 +1944,37 @@ public boolean allowOperationMetadataField() {
return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD);
}

/**
* Error table configs.
*/
public boolean errorTableEnabled() {
return getBoolean(HoodieErrorTableConfig.ERROR_TABLE_ENABLE_PROP);
}

public String getErrorTableBasePath() {
return getString(HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH_PROP);
}

public String getErrorTableName() {
return getString(HoodieErrorTableConfig.ERROR_TABLE_NAME_PROP);
}

public int getErrorTableInsertParallelism() {
return getInt(HoodieErrorTableConfig.ERROR_TABLE_INSERT_PARALLELISM_PROP);
}

public int getErrorTableCleanerCommitsRetained() {
return getInt(HoodieErrorTableConfig.CLEANER_COMMITS_RETAINED_PROP);
}

public int getErrorTableMinCommitsToKeep() {
return getInt(HoodieErrorTableConfig.MIN_COMMITS_TO_KEEP_PROP);
}

public int getErrorTableMaxCommitsToKeep() {
return getInt(HoodieErrorTableConfig.MAX_COMMITS_TO_KEEP_PROP);
}

public String getFileIdPrefixProviderClassName() {
return getString(FILEID_PREFIX_PROVIDER_CLASS);
}
Expand Down Expand Up @@ -1984,6 +2016,7 @@ public static class Builder {
private boolean isMetricsJmxConfigSet = false;
private boolean isMetricsGraphiteConfigSet = false;
private boolean isLayoutConfigSet = false;
private boolean isErrorTableConfigSet = false;

public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
Expand Down Expand Up @@ -2335,6 +2368,12 @@ public Builder withProperties(Properties properties) {
return this;
}

public Builder withErrorTableConfig(HoodieErrorTableConfig errorTableConfig) {
writeConfig.getProps().putAll(errorTableConfig.getProps());
isErrorTableConfigSet = true;
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down Expand Up @@ -2371,6 +2410,8 @@ protected void setDefaults() {
writeConfig.setDefaultOnCondition(!isLayoutConfigSet,
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
writeConfig.setDefaultOnCondition(!isErrorTableConfigSet,
HoodieErrorTableConfig.newBuilder().fromProperties(writeConfig.getProps()).build());

// Async table services can update the metadata table and a lock provider is
// needed to guard against any concurrent table write operations. If user has
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.error;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieErrorTableConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
import static org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;

/**
* Writer implementation backed by an internal hudi table. Error records are saved within an internal COW table
* called Error table.
*/
public abstract class HoodieBackedErrorTableWriter<O> implements Serializable {

private static final Logger LOG = LogManager.getLogger(HoodieBackedErrorTableWriter.class);

protected HoodieWriteConfig errorTableWriteConfig;
protected HoodieWriteConfig datasetWriteConfig;
protected String tableName;

protected HoodieTableMetaClient metaClient;
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;
protected String basePath;

protected HoodieBackedErrorTableWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
this.datasetWriteConfig = writeConfig;
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(hadoopConf);

if (writeConfig.errorTableEnabled()) {
this.tableName = writeConfig.getTableName() + HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
this.basePath = getErrorTableBasePath(writeConfig);
this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
try {
bootstrapErrorTable(metaClient);
} catch (IOException e) {
throw new HoodieException("init hoodie error table fail!", e);
}
}
}

/**
* Create a {@code HoodieWriteConfig} to use for the Error Table.
*
* @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
*/
private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig writeConfig) {
int parallelism = writeConfig.getErrorTableInsertParallelism();
// Create the write config for the metadata table by borrowing options from the main write config.
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
.withEmbeddedTimelineServerEnabled(false)
.withPath(basePath)
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
.forTable(getErrorTableName(writeConfig))
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
writeConfig.getMetadataMaxCommitsToKeep()).build())
.withParallelism(parallelism, parallelism);

return builder.build();
}

public HoodieWriteConfig getWriteConfig() {
return errorTableWriteConfig;
}

/**
* Init if hudi error table not exit.
* @param datasetMetaClient
* @throws IOException
*/
private void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) throws IOException {

if (datasetMetaClient != null) {
boolean exists = datasetMetaClient.getFs().exists(new Path(errorTableWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
if (exists) {
return;
}
}

this.metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(tableName)
.setArchiveLogFolder("archived")
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
.initTable(new Configuration(hadoopConf.get()), errorTableWriteConfig.getBasePath());
}

private String getErrorTableBasePath(HoodieWriteConfig writeConfig) {

if (StringUtils.isNullOrEmpty(writeConfig.getErrorTableBasePath())) {
return writeConfig.getBasePath() + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "errors";
}
return writeConfig.getErrorTableBasePath();
}

private String getErrorTableName(HoodieWriteConfig writeConfig) {

return StringUtils.isNullOrEmpty(writeConfig.getErrorTableName())
? writeConfig.getTableName() + HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX : writeConfig.getErrorTableName();
}

public abstract void commit(O data, String schema, String tableName);

public HoodieRecord createErrorRecord(HoodieRecord hoodieRecord, HashMap<HoodieKey, Throwable> errorsMap, String schema, String tableName) {

String uuid = UUID.randomUUID().toString();
long timeMillis = System.currentTimeMillis();
String ts = String.valueOf(timeMillis);
DateTimeZone dateTimeZone = null;
String partitionPath = new DateTime(timeMillis, dateTimeZone).toString("yyyy/MM/dd");

HoodieKey hoodieKey = hoodieRecord.getKey();

HoodieRecordLocation hoodieRecordLocation = null;
if (hoodieRecord.getNewLocation().isPresent()) {
hoodieRecordLocation = (HoodieRecordLocation) hoodieRecord.getNewLocation().get();
}

String instancTime = hoodieRecordLocation == null ? "" : hoodieRecordLocation.getInstantTime();
String fileId = hoodieRecordLocation == null ? "" : hoodieRecordLocation.getFileId();
String message = errorsMap.get(hoodieKey).toString();

OverwriteWithLatestAvroSchemaPayload data = (OverwriteWithLatestAvroSchemaPayload) hoodieRecord.getData();
GenericRecord genericRecord = null;
try {
genericRecord = HoodieAvroUtils.bytesToAvro(data.recordBytes, new Schema.Parser().parse(data.getSchema()));
} catch (IOException e) {
LOG.error("Serialization failed", e);
}
Map<String, String> context = new HashMap<>();
context.put(ERROR_COMMIT_TIME_METADATA_FIELD, instancTime);
context.put(ERROR_RECORD_KEY_METADATA_FIELD, hoodieKey.getRecordKey());
context.put(ERROR_PARTITION_PATH_METADATA_FIELD, hoodieRecord.getPartitionPath());
context.put(ERROR_FILE_ID_FIELD, fileId);
context.put(ERROR_TABLE_NAME, tableName != null ? tableName : "");

GenericRecord errorGenericRecord = new GenericData.Record(new Schema.Parser().parse(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA));

errorGenericRecord.put(ERROR_RECORD_UUID, uuid);
errorGenericRecord.put(ERROR_RECORD_TS, ts);
errorGenericRecord.put(ERROR_RECORD_SCHEMA, schema);
errorGenericRecord.put(ERROR_RECORD_RECORD, genericRecord != null ? genericRecord.toString() : "");
errorGenericRecord.put(ERROR_RECORD_MESSAGE, message);
errorGenericRecord.put(ERROR_RECORD_CONTEXT, context);

HoodieAvroPayload hoodieAvroPayload = new HoodieAvroPayload(Option.of(errorGenericRecord));

HoodieKey errorHoodieKey = new HoodieKey(uuid, partitionPath);
return new HoodieAvroRecord(errorHoodieKey, hoodieAvroPayload);
}

public List<HoodieRecord> createErrorRecord(List<HoodieRecord> failedRecords, HashMap<HoodieKey, Throwable> errorsMap, String schema, String tableName) {
List<HoodieRecord> errorHoodieRecords = new ArrayList<>();
for (HoodieRecord hoodieRecord : failedRecords) {
errorHoodieRecords.add(createErrorRecord(hoodieRecord, errorsMap, schema, tableName));
}
return errorHoodieRecords;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, Hoo
long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
long totalLogFilesSize = metadata.getTotalLogFilesSize();
long totalWriteErrors = metadata.fetchTotalWriteErrors();
Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
Expand All @@ -180,6 +181,7 @@ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, Hoo
Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
Metrics.registerGauge(getMetricsName(actionType, "totalWriteErrors"), totalWriteErrors);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Commonly used assertion functions.
Expand All @@ -41,6 +42,14 @@ public static void assertNoWriteErrors(List<WriteStatus> statuses) {
assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId())));
}

/**
* Assert no failures in writing hoodie files.
*/
public static void assertHasWriteErrors(List<WriteStatus> statuses) {
assertAll(statuses.stream().map(status -> () ->
assertTrue(status.hasErrors(), "Errors found in write of " + status.getFileId())));
}

/**
* Assert each file size equal to its source of truth.
*
Expand Down