Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Protobuf in ksqlDB #4469

Merged
merged 2 commits into from
Feb 11, 2020
Merged

feat: support Protobuf in ksqlDB #4469

merged 2 commits into from
Feb 11, 2020

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Feb 6, 2020

fixes #278

Description

This PR introduces the PROTOBUF value format to ksqlDB:

  • added ProtobufFormat class and ProtobufSerdeFactory to handle most of the heavy lifting
  • conversion between ParsedSchema and org.apache.kafka.connect.Schema now happens in the Format interface (cf. SchemaRegistryTopicSchemaSupplier)
  • the testing framework no longer depends on AvroSchema in its interface (instead it will convert it to a ParsedSchema) (cf. TestExecutorUtil, Topic, SerdeUtil and TopicNode)
  • Added a new RecordFormatter for printing out PRINT <topic> commands with PROTOBUF format

Note on null and Default Values

Protobuf handles null values differently than AVRO and JSON. Namely, it doesn't have the true concept of a null value so the way it converts between PROTOBUF and Java (Connect) objects is undefined. For the most part, it will resolve a "missing field" to its default value:

For strings, the default value is the empty string.
For bytes, the default value is empty bytes.
For bools, the default value is false.
For numeric types, the default value is zero.
For enums, the default value is the first defined enum value, which must be 0.
For message fields, the field is not set. Its exact value is language-dependent. See the generated code guide for details.

See the QTT tests for clarification on these default values.

Testing done

Honestly, 90% of this PR affects only test code :)

  • add PROTOBUF to many QTT tests
  • Updated unit tests were applicable
ksql> CREATE STREAM proto (age INT, name VARCHAR, jobs ARRAY<VARCHAR>, address STRUCT<street VARCHAR>) WITH (kafka_topic='proto1', value_format='PROTOBUF', partitions=1);

 Message
----------------
 Stream created
----------------
ksql> INSERT INTO proto (age, name, jobs, address) VALUES (27, 'gordon', ARRAY['the flash'], STRUCT(street:='7th ave'));
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> SELECT * from PROTO EMIT CHANGES;
+--------------+--------------+--------------+--------------+--------------+--------------+
|ROWTIME       |ROWKEY        |AGE           |NAME          |JOBS          |ADDRESS       |
+--------------+--------------+--------------+--------------+--------------+--------------+
|1581025858985 |null          |27            |gordon        |[the flash]   |{STREET=7th av|
|              |              |              |              |              |e}            |


ksql> PRINT proto1 FROM BEGINNING;
Format:PROTOBUF
2/6/20 1:50:58 PM PST, null, {AGE: 27 NAME: "gordon" JOBS: "the flash" ADDRESS { STREET: "7th ave" }}
^CTopic printing ceased
ksql>

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra requested a review from a team as a code owner February 6, 2020 22:41
@agavra agavra changed the base branch from master to 5.5.x February 8, 2020 00:17
@rodesai rodesai self-assigned this Feb 10, 2020
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @agavra! Left a couple of minor comments inline. We should add some docs about how to use the feature, and call out the limitations. For example, I think the current implementation won't let users use their own sink schemas, which is more the norm for protobuf.

@agavra
Copy link
Contributor Author

agavra commented Feb 11, 2020

Thanks for the review @rodesai - I will add docs in a follow-up PR 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement protobuf format reader
2 participants