Skip to content

Commit

Permalink
Merge branch '7.0.x' into 7.1.x
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Jul 12, 2022
2 parents 3d4e3a5 + 691683e commit 19ee9b9
Show file tree
Hide file tree
Showing 16 changed files with 1,088 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ protected SchemaMessageDeserializer<Object> createDeserializer(
}

@Override
protected void writeTo(byte[] data, PrintStream output) throws IOException {
Object object = deserializer.deserialize(data);
protected void writeTo(String topic, byte[] data, PrintStream output) throws IOException {
Object object = deserializer.deserialize(topic, data);
try {
AvroSchemaUtils.toJson(object, output);
} catch (AvroRuntimeException e) {
Expand Down Expand Up @@ -129,8 +129,8 @@ public Object deserializeKey(String topic, byte[] payload) {
}

@Override
public Object deserialize(byte[] payload) throws SerializationException {
return super.deserialize(payload);
public Object deserialize(String topic, byte[] payload) throws SerializationException {
return super.deserialize(topic, isKey, payload, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ protected SchemaMessageDeserializer<JsonNode> createDeserializer(


@Override
protected void writeTo(byte[] data, PrintStream output) throws IOException {
JsonNode object = deserializer.deserialize(data);
protected void writeTo(String topic, byte[] data, PrintStream output) throws IOException {
JsonNode object = deserializer.deserialize(topic, data);
output.print(objectMapper.writeValueAsString(object));
}

Expand Down Expand Up @@ -126,8 +126,8 @@ public Object deserializeKey(String topic, byte[] payload) {
}

@Override
public JsonNode deserialize(byte[] payload) throws SerializationException {
return super.deserialize(payload);
public JsonNode deserialize(String topic, byte[] payload) throws SerializationException {
return (JsonNode) super.deserialize(false, topic, isKey, payload);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,6 @@ public ProtobufSchema fromConnectSchema(Schema schema) {
String[] split = splitName(fullName);
String namespace = split[0];
String name = split[1];
ctx.add(fullName);
Descriptor descriptor = descriptorFromConnectSchema(ctx, namespace, name, schema);
ProtobufSchema resultSchema = new ProtobufSchema(descriptor);
fromConnectSchemaCache.put(schema, resultSchema);
Expand Down Expand Up @@ -793,8 +792,8 @@ private FieldDefinition fieldDefinitionFromConnectSchema(
oneofDefinitionFromConnectSchema(ctx, schema, message, fieldSchema, unionName);
return null;
} else {
if (!ctx.contains(fieldSchemaName)) {
ctx.add(fieldSchemaName);
if (!ctx.contains(message.getName(), type)) {
ctx.add(message.getName(), type);
message.addMessageDefinition(messageDefinitionFromConnectSchema(
ctx,
schema,
Expand Down Expand Up @@ -1842,20 +1841,25 @@ public void put(String messageName, SchemaBuilder builder) {
* Class that holds the context for performing {@code fromConnectSchema}
*/
private static class FromConnectContext {
private final Set<String> structNames;
private final Map<String, Set<String>> messageNames;
private int defaultSchemaNameIndex = 0;

public FromConnectContext() {
this.structNames = new HashSet<>();
this.messageNames = new HashMap<>();
}

public boolean contains(String structName) {
return structName != null ? structNames.contains(structName) : false;
public boolean contains(String parent, String child) {
Set<String> children = messageNames.get(parent);
if (child == null || children == null) {
return false;
}
return children.contains(child);
}

public void add(String structName) {
if (structName != null) {
structNames.add(structName);
public void add(String parent, String child) {
if (child != null) {
Set<String> children = messageNames.computeIfAbsent(parent, k -> new HashSet<>());
children.add(child);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.connect.protobuf.test.KeyValueOptional.KeyValueOptionalMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.protobuf.test.KeyTimestampValueOuterClass.KeyTimestampValue;
import io.confluent.kafka.serializers.protobuf.test.TestMessageProtos.TestMessage2;
import io.confluent.kafka.serializers.protobuf.test.TimestampValueOuterClass.TimestampValue;
import java.util.List;
Expand Down Expand Up @@ -83,6 +84,11 @@ public class ProtobufConverterTest {
private static final TimestampValue TIMESTAMP_VALUE = TimestampValue.newBuilder()
.setValue(Timestamp.newBuilder().setSeconds(1000).build())
.build();
private static final KeyTimestampValue KEY_TIMESTAMP_VALUE = KeyTimestampValue.newBuilder()
.setKey(123)
.setValue(TIMESTAMP_VALUE)
.setValue2(Timestamp.newBuilder().setSeconds(2000).build())
.build();
private static final KeyValueOptionalMessage KEY_VALUE_OPT = KeyValueOptionalMessage.newBuilder()
.setKey(123)
.setValue("")
Expand Down Expand Up @@ -375,6 +381,67 @@ public void testFromConnectDataWithReferenceUsingLatest() throws Exception {
assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
}

@Test
public void testFromConnectDataWithNestedReferenceUsingLatest() throws Exception {
final byte[] expected = KEY_TIMESTAMP_VALUE.toByteArray();

Map<String, Object> config = new HashMap<>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost");
config.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, "true");
config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, "false");
config.put(AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT, "false");
converter.configure(config, false);
schemaRegistry.register("google/protobuf/timestamp.proto", new ProtobufSchema(Timestamp.getDescriptor()));
SchemaReference ref1 = new SchemaReference("google/protobuf/timestamp.proto", "google/protobuf/timestamp.proto", 1);
schemaRegistry.register("TimestampValue.proto", new ProtobufSchema(TimestampValue.getDescriptor(), ImmutableList.of(ref1)));
SchemaReference ref2 = new SchemaReference("TimestampValue.proto", "TimestampValue.proto", 1);
schemaRegistry.register("my-topic-value", new ProtobufSchema(KeyTimestampValue.getDescriptor(), ImmutableList.of(ref1, ref2)));

String tsFullName = "io.confluent.kafka.serializers.protobuf.test.TimestampValue";
Schema timestampSchema =
getTimestampBuilder().optional().parameter(PROTOBUF_TYPE_TAG, String.valueOf(1)).build();
SchemaBuilder tsBuilder = SchemaBuilder.struct();
tsBuilder.name(tsFullName);
tsBuilder.field(
"value",
timestampSchema
);
tsBuilder.parameter(PROTOBUF_TYPE_TAG, String.valueOf(2));
Schema tsSchema = tsBuilder.version(1).build();
Struct tsStruct = new Struct(tsSchema);
tsStruct.put("value", getTimestampStruct(timestampSchema, 1000L, 0));

Schema timestampSchema2 =
getTimestampBuilder().optional().parameter(PROTOBUF_TYPE_TAG, String.valueOf(3)).build();
String keyTsFullName = "io.confluent.kafka.serializers.protobuf.test.KeyTimestampValue";
SchemaBuilder keyTsBuilder = SchemaBuilder.struct();
keyTsBuilder.name(keyTsFullName);
keyTsBuilder.field(
"key",
SchemaBuilder.int32().parameter(PROTOBUF_TYPE_TAG, String.valueOf(1)).build()
);
keyTsBuilder.field(
"value",
tsSchema
);
keyTsBuilder.field(
"value2",
timestampSchema2
);
Schema keyTsSchema = keyTsBuilder.version(1).build();
Struct keyTsStruct = new Struct(keyTsSchema);
keyTsStruct.put("key", 123);
keyTsStruct.put("value", tsStruct);
keyTsStruct.put("value2", getTimestampStruct(timestampSchema2, 2000L, 0));

byte[] result = converter.fromConnectData("my-topic",
keyTsSchema,
keyTsStruct
);

assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
}

@Test
public void testFromConnectDataWithOptionalForNullablesUsingLatest() throws Exception {
final byte[] expected = KEY_VALUE_OPT.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2040,7 +2040,6 @@ public void testFromConnectRecursiveSchema() {
assertEquals(expectedKey.getNumber(), actualKey.getNumber());
assertEquals(expectedValue.getType(), actualValue.getType());
assertEquals(expectedValue.getNumber(), actualValue.getNumber());
assertEquals(actual, actualKeyValue.getMessageType()); // Checks recursive reference
assertEquals(expectedKeyValue.getNumber(), actualKeyValue.getNumber());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ static Meta toMeta(String doc, Map<String, String> params) {
public static class Builder {
// --- public ---

public String getName() {
return mFileDescProtoBuilder.getName();
}

/**
* Builds a dynamic schema
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ private EnumDefinition(EnumDescriptorProto enumType) {
public static class Builder {
// --- public ---

public String getName() {
return mEnumTypeBuilder.getName();
}

public Builder addValue(String name, int num) {
return addValue(name, num, null, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ private MessageDefinition(DescriptorProto msgType) {
public static class Builder {
// --- public ---

public String getName() {
return mMsgTypeBuilder.getName();
}

public Builder addField(
String label,
String type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public void init(Properties props) {
}

@Override
protected void writeTo(byte[] data, PrintStream output) throws IOException {
Message object = deserializer.deserialize(data);
protected void writeTo(String topic, byte[] data, PrintStream output) throws IOException {
Message object = deserializer.deserialize(topic, data);
try {
JsonFormat.Printer printer = JsonFormat.printer()
.includingDefaultValueFields()
Expand Down Expand Up @@ -144,8 +144,8 @@ public Object deserializeKey(String topic, byte[] payload) {
}

@Override
public Message deserialize(byte[] payload) throws SerializationException {
return super.deserialize(payload);
public Message deserialize(String topic, byte[] payload) throws SerializationException {
return (Message) super.deserialize(false, topic, isKey, payload);
}
}
}

0 comments on commit 19ee9b9

Please sign in to comment.