Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.config;

import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;

import javax.annotation.concurrent.Immutable;

import java.util.Arrays;

@Immutable
@ConfigClassProperty(name = "Error table Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that are required for Error table configs")
public class HoodieErrorTableConfig {
public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
.key("hoodie.errortable.enable")
.defaultValue(false)
.withDocumentation("Config to enable error table. If the config is enabled, "
+ "all the records with processing error in DeltaStreamer are transferred to error table.");

public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
.key("hoodie.errortable.base.path")
.noDefaultValue()
.withDocumentation("Base path for error table under which all error records "
+ "would be stored.");

public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
.key("hoodie.errortable.target.table.name")
.noDefaultValue()
.withDocumentation("Table name to be used for the error table");

public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.errortable.upsert.shuffle.parallelism")
.defaultValue(200)
.withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+ "hoodie.upsert.shuffle.parallelism config but applies to the error table.");

public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.errortable.insert.shuffle.parallelism")
.defaultValue(200)
.withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+ "hoodie.insert.shuffle.parallelism config but applies to the error table.");

public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
.key("hoodie.errortable.write.class")
.noDefaultValue()
.withDocumentation("Class which handles the error table writes. This config is used to configure "
+ "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+ "error table writer as a value for this config");

public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
.key("hoodie.errortable.validate.targetschema.enable")
.defaultValue(false)
.withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");

public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
.key("hoodie.errortable.write.failure.strategy")
.defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
.withDocumentation("The config specifies the failure strategy if error table write fails. "
+ "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));

public enum ErrorWriteFailureStrategy {
ROLLBACK_COMMIT("Rollback the corresponding base table write commit for which the error events were triggered"),
LOG_ERROR("Error is logged but the base table write succeeds");

private final String description;

ErrorWriteFailureStrategy(String description) {
this.description = description;
}

public String getDescription() {
return description;
}

@Override
public String toString() {
return super.toString() + " (" + description + ")\n";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLConfInjectingRDD
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -116,10 +116,79 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
}, SQLConf.get)
}

def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
sparkAdapter.createSparkRowSerDe(structType)
}

private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
new SQLConfInjectingRDD(rdd, conf)

def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()):
Tuple2[RDD[GenericRecord], RDD[String]] = {
var latestTableSchemaConverted: Option[Schema] = None

if (latestTableSchema.isPresent && reconcileToLatestSchema) {
latestTableSchemaConverted = Some(latestTableSchema.get())
} else {
// cases when users want to use latestTableSchema but have not turned on reconcileToLatestSchema explicitly
// for example, when using a Transformer implementation to transform source RDD to target RDD
latestTableSchemaConverted = if (latestTableSchema.isPresent) Some(latestTableSchema.get()) else None
}
safeCreateRDD(df, structName, recordNamespace, latestTableSchemaConverted);
}

def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, readerAvroSchemaOpt: Option[Schema]):
Tuple2[RDD[GenericRecord], RDD[String]] = {
val writerSchema = df.schema
val writerAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName, recordNamespace)
val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
// We check whether passed in reader schema is identical to writer schema to avoid costly serde loop of
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
// (and back)
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema

// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
// serializer is not able to digest it
val writerAvroSchemaStr = writerAvroSchema.toString
val readerAvroSchemaStr = readerAvroSchema.toString
// NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to [[Row]] conversion

if (!sameSchema) {
val rdds: RDD[Either[GenericRecord, InternalRow]] = df.queryExecution.toRdd.mapPartitions { rows =>
if (rows.isEmpty) {
Iterator.empty
} else {
val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
val convert = AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, writerAvroSchema, nullable = nullable)
val transform: InternalRow => Either[GenericRecord, InternalRow] = internalRow => try {
Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow), readerAvroSchema, true))
} catch {
case _: Throwable =>
Right(internalRow)
}
rows.map(transform)
}
}

val rowDeserializer = getCatalystRowSerDe(writerSchema)
val errorRDD = df.sparkSession.createDataFrame(
rdds.filter(_.isRight).map(_.right.get).map(ir => rowDeserializer.deserializeRow(ir)), writerSchema)

// going to follow up on improving performance of separating out events
(rdds.filter(_.isLeft).map(_.left.get), errorRDD.toJSON.rdd)
} else {
val rdd = df.queryExecution.toRdd.mapPartitions { rows =>
if (rows.isEmpty) {
Iterator.empty
} else {
val convert = AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, writerAvroSchema, nullable = nullable)
rows.map(convert)
}
}
(rdd, df.sparkSession.sparkContext.emptyRDD[String])
}
}

def getCatalystRowSerDe(structType: StructType): SparkRowSerDe = {
sparkAdapter.createSparkRowSerDe(structType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -787,7 +788,12 @@ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord,
* @return newRecord for new Schema
*/
public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>(),false);
return (GenericData.Record) newRecord;
}

public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols, boolean validate) {
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>(), validate);
return (GenericData.Record) newRecord;
}

Expand All @@ -805,12 +811,22 @@ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord,
* @param fieldNames track the full name of visited field when we travel new schema.
* @return newRecord for new Schema
*/
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {

private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames, boolean validate) {
if (oldRecord == null) {
return null;
}
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames, validate);
if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
}
return newRecord;
}

private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames, boolean validate) {
switch (newSchema.getType()) {
case RECORD:
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
Expand All @@ -823,15 +839,15 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null && !renameCols.containsKey(field.name())) {
Schema.Field oldField = oldSchema.getField(field.name());
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
} else {
String fieldFullName = createFullName(fieldNames);
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
// deal with rename
if (oldSchema.getField(fieldNameFromOldSchema) != null) {
// find rename
Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, validate));
} else {
// deal with default value
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
Expand All @@ -850,7 +866,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
List<Object> newArray = new ArrayList();
fieldNames.push("element");
for (Object element : array) {
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, validate));
}
fieldNames.pop();
return newArray;
Expand All @@ -860,12 +876,12 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
Map<Object, Object> newMap = new HashMap<>();
fieldNames.push("value");
for (Map.Entry<Object, Object> entry : map.entrySet()) {
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames, validate));
}
fieldNames.pop();
return newMap;
case UNION:
return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames, validate);
default:
return rewritePrimaryType(oldRecord, oldSchema, newSchema);
}
Expand Down Expand Up @@ -1083,10 +1099,43 @@ public static HoodieRecord createHoodieRecordFromAvro(
}
}

/**
* Given avro records, rewrites them with new schema.
*
* @param oldRecords oldRecords to be rewrite
* @param newSchema newSchema used to rewrite oldRecord
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @return a iterator of rewrote GeneriRcords
*/
public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols, boolean validate) {
if (oldRecords == null || newSchema == null) {
return Collections.emptyIterator();
}
return new Iterator<GenericRecord>() {
@Override
public boolean hasNext() {
return oldRecords.hasNext();
}

@Override
public GenericRecord next() {
return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, renameCols, validate);
}
};
}

public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols) {
return rewriteRecordWithNewSchema(oldRecords, newSchema, Collections.EMPTY_MAP, false);
}

public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema) {
return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP);
}

public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema, boolean validate) {
return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP, validate);
}

public static boolean gteqAvro1_9() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0;
}
Expand Down
Loading