Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4418] Add support for ProtoKafkaSource #6135

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f63696c
Add support for ProtoKafkaSource
the-other-tim-brown Jul 18, 2022
ec6177e
update comments and names
the-other-tim-brown Jul 18, 2022
d4d4d72
try bumping avro in profiles
the-other-tim-brown Jul 19, 2022
376eedd
try bumping parquet-avro version
the-other-tim-brown Jul 20, 2022
03f2c7a
don't rely directly on newer avro APIs
the-other-tim-brown Jul 21, 2022
51bbad4
simplify code
the-other-tim-brown Jul 21, 2022
d36fed6
remove unused dependency
the-other-tim-brown Jul 21, 2022
ffc9419
remove duplicate dependency
the-other-tim-brown Jul 21, 2022
10eb307
add in KafkaSource as abstract class to better share code
the-other-tim-brown Aug 19, 2022
a9adc8d
fix compile issues
the-other-tim-brown Aug 19, 2022
5475f67
DRY up testing, prevent TestJsonKafkaSourcePostProcessor from running…
the-other-tim-brown Aug 19, 2022
b9c9118
fix style issues
the-other-tim-brown Aug 19, 2022
14115a6
update handling of unsigned ints
the-other-tim-brown Aug 22, 2022
1879403
Merge remote-tracking branch 'upstream/master' into HUDI-4418-proto-k…
the-other-tim-brown Aug 23, 2022
f70abbc
remove guava transitive dependency from proto-java-util
the-other-tim-brown Aug 23, 2022
1943960
fix handling of unsigned integer fields in conversion to avro
the-other-tim-brown Aug 24, 2022
9a4df85
Merge remote-tracking branch 'upstream/master' into HUDI-4418-proto-k…
the-other-tim-brown Aug 25, 2022
794ed48
Merge remote-tracking branch 'upstream/master' into HUDI-4418-proto-k…
the-other-tim-brown Aug 28, 2022
1f268aa
move configs into the parent pom, use singleton list, remove unused l…
the-other-tim-brown Aug 29, 2022
e6e1a68
move shared constants into the KafkaSource, address issue with untype…
the-other-tim-brown Aug 29, 2022
75bd5b0
add comments to describe caches
the-other-tim-brown Aug 29, 2022
95a7f4a
add casting back in
the-other-tim-brown Aug 29, 2022
fc344fa
fix execution configuration for protoc plugin
the-other-tim-brown Aug 29, 2022
55997e7
simplify namespace to use proto package
the-other-tim-brown Aug 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions hudi-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>${protoc.version}</protocVersion>
<inputDirectories>
<include>src/main/resources</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

Expand Down
21 changes: 21 additions & 0 deletions hudi-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
codope marked this conversation as resolved.
Show resolved Hide resolved
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
</plugin>
</plugins>

<resources>
Expand All @@ -73,6 +77,23 @@
</build>

<dependencies>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
codope marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- H2 database for JdbcbaseSchemaProvider -->
<dependency>
<groupId>com.h2database</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;

import com.google.protobuf.Message;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -62,15 +60,15 @@ public SourceFormatAdapter(Source source) {
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case AVRO:
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
return ((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromJson)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> {
SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
Expand All @@ -85,6 +83,12 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
})
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case PROTO: {
InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
Expand All @@ -96,9 +100,9 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
return ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
codope marked this conversation as resolved.
Show resolved Hide resolved
return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
case AVRO: {
InputBatch<JavaRDD<GenericRecord>> r = ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<GenericRecord>> r = ((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
return new InputBatch<>(
Option
Expand All @@ -111,14 +115,29 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
return new InputBatch<>(
Option.ofNullable(
r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case PROTO: {
InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(
Option
.ofNullable(
r.getBatch()
.map(rdd -> rdd.map(convertor::fromProtoMessage))
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
source.getSparkSession())
)
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.hudi.utilities.schema;

import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;

import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Collections;

/**
* A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
*/
public class ProtoClassBasedSchemaProvider extends SchemaProvider {
/**
* Configs supported.
*/
public static class Config {
public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
}

private final String schemaString;

/**
* To be lazily inited on executors.
*/
private transient Schema schema;

public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
Config.PROTO_SCHEMA_CLASS_NAME));
String className = config.getString(Config.PROTO_SCHEMA_CLASS_NAME);
boolean flattenWrappedPrimitives = props.getBoolean(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES, false);
try {
schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), flattenWrappedPrimitives).toString();
} catch (Exception e) {
throw new HoodieException(String.format("Error reading proto source schema for class: %s", className), e);
}
}

@Override
public Schema getSourceSchema() {
if (schema == null) {
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaString);
}
return schema;
}

@Override
public Schema getTargetSchema() {
return getSourceSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -46,24 +43,17 @@
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
*/
public class AvroKafkaSource extends AvroSource {
public class AvroKafkaSource extends KafkaSource<GenericRecord> {

private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
// these are native kafka's config. do not change the config names.
private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
// These are settings used to pass things to KafkaAvroDeserializer
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";

private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
private final SchemaProvider schemaProvider;
private final String deserializerClassName;

public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
super(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO, metrics);

props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
Expand All @@ -82,29 +72,11 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa
LOG.error(error);
throw new HoodieException(error, e);
}

this.schemaProvider = schemaProvider;
this.metrics = metrics;
offsetGen = new KafkaOffsetGen(props);
this.offsetGen = new KafkaOffsetGen(props);
}

@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
try {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
}
}

private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
Expand All @@ -117,11 +89,4 @@ private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
}

@Override
public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
Expand All @@ -45,40 +40,18 @@
/**
* Read json kafka data.
*/
public class JsonKafkaSource extends JsonSource {

private static final Logger LOG = LogManager.getLogger(JsonKafkaSource.class);

private final KafkaOffsetGen offsetGen;

private final HoodieDeltaStreamerMetrics metrics;
public class JsonKafkaSource extends KafkaSource<String> {

public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(properties, sparkContext, sparkSession, schemaProvider);
this.metrics = metrics;
super(properties, sparkContext, sparkSession, schemaProvider, SourceType.JSON, metrics);
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
offsetGen = new KafkaOffsetGen(properties);
this.offsetGen = new KafkaOffsetGen(props);
}

@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
try {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
}
}

private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<String> jsonStringRDD = KafkaUtils.createRDD(sparkContext,
offsetGen.getKafkaParams(),
offsetRanges,
Expand All @@ -104,12 +77,4 @@ private JavaRDD<String> postProcess(JavaRDD<String> jsonStringRDD) {

return processor.process(jsonStringRDD);
}

@Override
public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}
Loading