Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource #2069

Closed
wants to merge 5 commits into from

Conversation

uce
Copy link
Contributor

@uce uce commented Jun 3, 2016

Adds StreamTableSource variants for Kafka with syntactic sugar for parsing JSON streams.

KafkaJsonTableSource source = new Kafka08JsonTableSource(
    topic,
    props,
    new String[] { "id" }, // field names
    new Class<?>[] { Long.class }); // field types

tableEnvironment.registerTableSource("kafka-stream", source)

You can then continue to work with the stream:

Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000");
tableEnvironment.toDataStream(result, Row.class).print();

Limitations

  • Assumes flat JSON field access (we can easily extend this to use JSON pointers, allowing us to parse nested fields like /location/area as field names).
  • This does not extract any timestamp or watermarks (not an issue right now as the Table API currently does not support operations where this is needed).
  • API is kind of cumbersome and non Scalaesque for the Scala Table API.

@rmetzger
Copy link
Contributor

rmetzger commented Jun 3, 2016

Quickly checked the maven and kafka stuff. Looks good. Very clean, well documented and tested code.
Further review for the table api specific parts are still needed.

this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");

Preconditions.checkArgument(fieldNames.length == fieldNames.length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be fieldNames.length == fieldTypes.length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Good catch

@fhueske
Copy link
Contributor

fhueske commented Jun 10, 2016

Hi @uce, thanks for the PR. Looks really good. I added three comments inline.

Regarding the watermarks and timestamp handling, I am thinking about introducing an EventTimeTableSource interface that provides all required methods. A StreamTableSource can then implement this interface to support event-time. I will open a JIRA for this.

@uce
Copy link
Contributor Author

uce commented Jun 10, 2016

Thanks for the review Fabian. I've addressed all comments except the ObjectMapper configuration. The EvenTimeTableSource makes sense. We can make the Kafka source extend from that one later on. Do you think we can merge this now?

@fhueske
Copy link
Contributor

fhueske commented Jun 10, 2016

Hi @uce , the update looks good. I think we should add the new TableSources to the Table API documentation. Maybe adding a table to like to table.md?

| Class name | Maven dep | BatchSource | Streaming | Description
| CsvTableSouce | flink-table | Y | Y | ...
| Kafka08JsonTableSource | flink-connector-kafka-0.8 | N | Y | ...

@uce
Copy link
Contributor Author

uce commented Jun 21, 2016

Thanks for the suggestion Fabian. I've added the table to the docs and added an example about the Kafka JSON source. Furthermore, I've added the configuration flag for the missing field failure behaviour. I'll merge this after my local Travis build passes.

@uce
Copy link
Contributor Author

uce commented Jun 23, 2016

Test failure is unrelated, going to merge this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants