Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8630] [table] To support JSON schema to TypeInformation conversion #5491

Closed
wants to merge 2 commits into from

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Feb 14, 2018

What is the purpose of the change

This PR implements (almost) full support of the JSON type. It includes:

  • Schema to TypeInformation conversion
  • Support for number, integer, string, object, array types
  • Initial support for date, time, and timestamp format and mapping to Flink types
  • Support for base64 encoded byte arrays
  • Nested support

Brief change log

  • New module flink-json in flink-formats
  • New JsonSchemaConverter
  • Improved JsonRowSerialization/DeserializationSchema

Verifying this change

  • Improved existing tests
  • New unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? will document together with first JSON connector

Copy link
Member

@xccui xccui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @twalthr. It looks pretty good! I just had some minor comments.

Besides, as the (de)serialization procedures are applied for the byte[] and Row types, the JSON can only be considered as an intermediate type. I just wonder whether there are some cases where we must operate a "pure JSON string" instead of a jackson ObjectNode.
Thanks, Xingcan

}

@Override
public boolean isEndOfStream(ObjectNode nextElement) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overridden method seems to be redundant.

private ObjectMapper mapper;

@Override
public ObjectNode deserialize(byte[] message) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this method should be thread-safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The framework takes care of thread safety. Same as MapFunction etc.

/**
* Deserialization schema from JSON to Flink types.
*
* <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messages -> message

private static final String CONTENT_ENCODING_BASE64 = "base64";

/**
* Converts a JSON schema into Flink's type information. Throws an exception of the schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of -> if

* Converts a JSON schema into Flink's type information. It uses {@link Row} for representing
* objects and tuple arrays.
*
* <p>Note: This converter implements just a subset of the JSON schema specification.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the JSON Schema is still evolving. Shall we consider specifying a version for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point. I added a comment about the version (mostly draft-07). But since we only implement a subset of it and also include some keywords from older drafts it is hard to explain. I will add some examples to the docs to show what we support, this should help in those cases.

}

@Override
public byte[] serialize(Row row) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialize() method is also not thread-safe since it invokes the method such as SimpleDateFromat.format(). Not sure if it matters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The framework takes care of duplicating the class.

@twalthr
Copy link
Contributor Author

twalthr commented Feb 15, 2018

Thanks for the review @xccui. I agree that a pure string-based format would be helpful as well. For this we can simply use a string serialization schema later. In a long-term view we will need to implement scalar functions that can handle a json string and allow accessing such a string as type-safe as possible.

@twalthr
Copy link
Contributor Author

twalthr commented Feb 15, 2018

Merging...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants