Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KeyValueSchema to satisfy org.apache.kafka.connect.connector.ConnectRecord #2885

Merged
merged 2 commits into from
Oct 31, 2018

Conversation

jiazhai
Copy link
Member

@jiazhai jiazhai commented Oct 29, 2018

Kafka org.apache.kafka.connect.connector.ConnectRecord has both key and value schema.
Currently we need a KeyValueSchema to achieve it and support debezium.
There will be another PR to do the convert between KeyValueSchema and org.apache.kafka.connect.data.Schema.

@jiazhai jiazhai self-assigned this Oct 29, 2018
@jiazhai jiazhai requested a review from sijie October 29, 2018 15:40
@sijie sijie added type/feature The PR added a new feature or issue requested a new feature component/schemaregistry area/connector labels Oct 29, 2018
@sijie sijie added this to the 2.3.0 milestone Oct 29, 2018
/**
* Key Value Schema whose underneath schemas are AvroSchema.
*/
static Schema<?> KeyValue(Class key, Class value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I think it might be better to do following:

  • be able to construct keyvalue schema with SchemaType
  • make the default as JSON, since we are using JSON as default schema for other places.
static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value, SchemaType type) {
   checkArgument(SchemaType.JSON == type || SchemaType.AVRO == type);
   ...
} 

static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value) {
   return KeyValue(key, value, SchemaType.JSON);
   ...
}

return new KeyValueSchema<>(AvroSchema.of(key), AvroSchema.of(value));
}

static Schema<?> KeyValue(Schema key, Schema value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value);

@Slf4j
public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
@Getter
private Schema<K> keySchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

@Getter
private Schema<K> keySchema;
@Getter
private Schema<V> valueSchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finall?

return value;
}

public void setKey(K key) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to setKey or setValue? If not, I would start with making key and value immutable.

@jiazhai
Copy link
Member Author

jiazhai commented Oct 30, 2018

rerun java8 tests
for
org.apache.pulsar.functions.worker.PulsarWorkerAssignmentTest.testFunctionAssignmentsWithRestart


// schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
// [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
private SchemaInfo schemaInfo;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

@sijie
Copy link
Member

sijie commented Oct 30, 2018

run integration tests

@sijie sijie merged commit 2591ccd into apache:master Oct 31, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connector type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants