Skip to content

Commit

Permalink
[fix][io] KCA sink: handle null values with KeyValue<Avro,Avro> schema (
Browse files Browse the repository at this point in the history
apache#19861)

Co-authored-by: Andrey Yegorov <andrey.yegorov@datastax.com>
  • Loading branch information
nicoloboschi and dlg99 authored Mar 21, 2023
1 parent ac574d5 commit 03f8b80
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {

if (nativeObject instanceof KeyValue) {
KeyValue kv = (KeyValue) nativeObject;
key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema);
value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema);
key = KafkaConnectData.getKafkaConnectDataFromSchema(kv.getKey(), keySchema);
value = KafkaConnectData.getKafkaConnectDataFromSchema(kv.getValue(), valueSchema);
} else if (nativeObject != null) {
throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ private static List<Object> arrayToList(Object nativeObject, Schema kafkaValueSc
return out;
}


public static Object getKafkaConnectDataFromSchema(Object nativeObject, Schema kafkaSchema) {
if (kafkaSchema != null && nativeObject == null) {
return null;
}
return getKafkaConnectData(nativeObject, kafkaSchema);
}

@SuppressWarnings("unchecked")
public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
if (kafkaSchema == null) {
Expand Down Expand Up @@ -380,6 +388,7 @@ private static Object defaultOrThrow(Schema kafkaSchema) {
if (kafkaSchema.isOptional()) {
return null;
}

throw new DataException("Invalid null value for required " + kafkaSchema.type() + " field");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.connect.avro.AvroData;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Time;
Expand All @@ -41,6 +44,76 @@

@Slf4j
public class PulsarSchemaToKafkaSchema {

private static class OptionalForcingSchema implements Schema {

Schema sourceSchema;

public OptionalForcingSchema(Schema sourceSchema) {
this.sourceSchema = sourceSchema;
}

@Override
public Type type() {
return sourceSchema.type();
}

@Override
public boolean isOptional() {
return true;
}

@Override
public Object defaultValue() {
return sourceSchema.defaultValue();
}

@Override
public String name() {
return sourceSchema.name();
}

@Override
public Integer version() {
return sourceSchema.version();
}

@Override
public String doc() {
return sourceSchema.doc();
}

@Override
public Map<String, String> parameters() {
return sourceSchema.parameters();
}

@Override
public Schema keySchema() {
return sourceSchema.keySchema();
}

@Override
public Schema valueSchema() {
return sourceSchema.valueSchema();
}

@Override
public List<Field> fields() {
return sourceSchema.fields();
}

@Override
public Field field(String s) {
return sourceSchema.field(s);
}

@Override
public Schema schema() {
return sourceSchema.schema();
}
}

private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
private static final ImmutableSet<String> kafkaLogicalSchemas;
private static final AvroData avroData = new AvroData(1000);
Expand Down Expand Up @@ -80,6 +153,11 @@ private static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
return parser.parse(schemaJson);
}

public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
Schema s = getKafkaConnectSchema(pulsarSchema);
return new OptionalForcingSchema(s);
}

public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
Expand Down Expand Up @@ -122,7 +200,7 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
getKafkaConnectSchema(kvSchema.getValueSchema()))
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
.build();
}
org.apache.avro.Schema avroSchema = parseAvroSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand All @@ -60,6 +63,7 @@
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -734,6 +738,56 @@ public void schemaKeyValueSchemaTest() throws Exception {
Assert.assertEquals(key, 11);
}

@Test
public void schemaKeyValueSchemaNullValueTest() throws Exception {
RecordSchemaBuilder builder = SchemaBuilder
.record("test");
builder.field("test").type(SchemaType.STRING);
GenericSchema<GenericRecord> schema = GenericAvroSchema.of(builder.build(SchemaType.AVRO));
KeyValue<Integer, String> kv = new KeyValue<>(11, null);
SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, schema), 11,
"INT32", null, "STRUCT");
Assert.assertNull(sinkRecord.value());
int key = (int) sinkRecord.key();
Assert.assertEquals(key, 11);
}

@Test
public void schemaKeyValueSchemaNullValueNoUnwrapTest() throws Exception {
props.put("unwrapKeyValueIfAvailable", "false");
JSONSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> jsonSchema = JSONSchema
.of(SchemaDefinition.<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>builder()
.withPojo(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class)
.withAlwaysAllowNull(true)
.build());
KeyValue<Integer, String> kv = new KeyValue<>(11, null);
Map<String, Object> expected = new HashMap();
expected.put("11", null);
SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, jsonSchema), "key",
"STRING", expected, "MAP");
Assert.assertNull(((Map)sinkRecord.value()).get(11));
String key =(String)sinkRecord.key();
Assert.assertEquals(key, "key");
}

@Test
public void schemaKeyValueSchemaNullValueNoUnwrapTestAvro() throws Exception {
props.put("unwrapKeyValueIfAvailable", "false");
RecordSchemaBuilder builder = SchemaBuilder
.record("test");
builder.property("op", "test");
builder.field("test").type(SchemaType.STRING);
GenericSchema<GenericRecord> schema = GenericAvroSchema.of(builder.build(SchemaType.AVRO));
KeyValue<Integer, String> kv = new KeyValue<>(11, null);
Map<String, Object> expected = new HashMap();
expected.put("11", null);
SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, schema), "key",
"STRING", expected, "MAP");
Assert.assertNull(((Map)sinkRecord.value()).get(11));
String key =(String)sinkRecord.key();
Assert.assertEquals(key, "key");
}

@Test
public void kafkaLogicalTypesTimestampTest() {
Schema schema = new TestSchema(SchemaInfoImpl.builder()
Expand Down

0 comments on commit 03f8b80

Please sign in to comment.