Skip to content

Commit 9bc13c2

Browse files
author
volodymyr.burenin
committed
Apparently reflection utils can't create a class if one of the parameters is null, forced to add another constructor.
1 parent c706d1c commit 9bc13c2

File tree

3 files changed

+14
-11
lines changed

3 files changed

+14
-11
lines changed

hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
2323
import org.apache.avro.Schema;
2424
import org.apache.hudi.common.config.TypedProperties;
25+
import org.apache.hudi.common.util.ReflectionUtils;
2526
import org.apache.hudi.exception.HoodieException;
26-
import org.apache.hudi.utilities.UtilHelpers;
2727
import org.apache.hudi.utilities.schema.SchemaProvider;
2828
import org.apache.kafka.common.errors.SerializationException;
2929

30-
import java.io.IOException;
3130
import java.util.Map;
3231
import java.util.Map.Entry;
3332
import java.util.Objects;
@@ -47,10 +46,10 @@ public void configure(Map<String, ?> configs, boolean isKey) {
4746
super.configure(configs, isKey);
4847
try {
4948
TypedProperties props = getConvertToTypedProperties(configs);
50-
SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
51-
props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
49+
String className = props.getString(SCHEMA_PROVIDER_CLASS_PROP);
50+
SchemaProvider schemaProvider = (SchemaProvider) ReflectionUtils.loadClass(className, props);
5251
sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
53-
} catch (IOException e) {
52+
} catch (Throwable e) {
5453
throw new HoodieException(e);
5554
}
5655
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,8 @@ public static class Config {
5656
private final String targetRegistryUrl;
5757
private final boolean noTargetSchema;
5858

59-
public String fetchSchemaFromRegistry(String registryUrl) throws IOException {
60-
URL registry = new URL(registryUrl);
61-
ObjectMapper mapper = new ObjectMapper();
62-
JsonNode node = mapper.readTree(registry.openStream());
63-
return node.get("schema").asText();
59+
public SchemaRegistryProvider(TypedProperties props) {
60+
this(props, null);
6461
}
6562

6663
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
@@ -73,6 +70,13 @@ public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
7370
this.noTargetSchema = targetRegistryUrl.equals("null");
7471
}
7572

73+
public String fetchSchemaFromRegistry(String registryUrl) throws IOException {
74+
URL registry = new URL(registryUrl);
75+
ObjectMapper mapper = new ObjectMapper();
76+
JsonNode node = mapper.readTree(registry.openStream());
77+
return node.get("schema").asText();
78+
}
79+
7680
private Schema getSchema(String registryUrl) throws IOException {
7781
Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
7882
if (injectKafkaFieldSchema) {

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*/
4545
public class AvroKafkaSource extends AvroSource {
4646

47-
private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.value.deserializer";
47+
private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.value.deserializer.class";
4848
private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
4949
private final KafkaOffsetGen offsetGen;
5050
private final HoodieDeltaStreamerMetrics metrics;

0 commit comments

Comments
 (0)