Skip to content

Conversation

@yuzelin
Copy link
Contributor

@yuzelin yuzelin commented Aug 18, 2022

[FLINK-28936][sql-gateway] Fix REST endpoint can not serialize CHAR(0) by introducing a new serializer for LogicalType

What is the purpose of the change

Fix the absence of LogicalType serializer.

Brief change log

  • Add LogicalTypeJsonSerializer in sql-gateway-api.
  • Add LogicalTypeJsonDeserializer in sql-gateway-api.

Verifying this change

This change added tests and can be verified as follows:

  • Added test prove that a LogicalType instance after serializing and deserializing is equal to original logical type.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@fsk119 fsk119 self-requested a review August 18, 2022 08:15
@flinkbot
Copy link
Collaborator

flinkbot commented Aug 18, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. I left some comments.


// Common fields
public static final String FIELD_NAME_TYPE_NAME = "type";
public static final String FIELD_NAME_NULLABLE = "nullable";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

miss FIELD_NAME_DESCRPITION

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, description is not a common field shared by all logical types, so I think we don't need this.

// ROW
public static final String FIELD_NAME_FIELDS = "fields";
public static final String FIELD_NAME_FIELD_NAME = "name";
public static final String FIELD_NAME_FIELD_TYPE = "fieldType";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing description.

Copy link
Contributor Author

@yuzelin yuzelin Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added code for serialization and deserialization for RowType#description and corresponding test.

case FLOAT:
case DOUBLE:
case DATE:
case NULL:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NullType is alway nullable. I think we dont't need to add "nullable": true in the json.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added codes to handle NullType.


private static List<LogicalType> generateTestData() {
List<LogicalType> types =
Arrays.asList(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add

 new TimeType(),
 new TimeType(3),
new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
  new TimestampType(false, TimestampKind.ROWTIME, 3),
  new ZonedTimestampType(),
  new ZonedTimestampType(3),
  new ZonedTimestampType(false, TimestampKind.ROWTIME, 3),
  new LocalZonedTimestampType(),
  new LocalZonedTimestampType(3),
  new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
  new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add necessary test case.

case VARBINARY:
return length == 0 ? VarBinaryType.ofEmptyLiteral() : new VarBinaryType(length);
default:
throw new TableException("Logical type with attribute 'length' expected.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Can't convert json '%s' to the logical type %s."

BTW, I think it's better to throw SqlGatewayException here.


private LogicalType deserializeRaw(JsonNode logicalTypeNode) {
String className = logicalTypeNode.get(FIELD_NAME_CLASS).asText();
String specialSerializer = logicalTypeNode.get(FIELD_NAME_SERIALIZER).asText();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to serializer. We don't have special serializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

@yuzelin yuzelin force-pushed the LogicalTypeSerDe branch 2 times, most recently from 2ed8067 to 5c4fe03 Compare August 23, 2022 12:34
@fsk119 fsk119 self-requested a review August 24, 2022 02:03
@yuzelin yuzelin force-pushed the LogicalTypeSerDe branch 2 times, most recently from 9d127f6 to 5988c33 Compare August 24, 2022 02:41
Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your update. I left some comments.


InetSocketAddress serverAddress = checkNotNull(sqlGatewayRestEndpoint.getServerAddress());
targetAddress = serverAddress.getHostName();
targetPort = serverAddress.getPort();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use NetUtils to get a random port? If other CI runs in the same machine, it may fail to bind the port.

Copy link
Contributor Author

@yuzelin yuzelin Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking the implementation, I found the port is 0 in default configuration used by sqlGatewayRestEndpoint.start(), and that means server would bind a random port.

private LogicalType deserializeInternal(JsonNode logicalTypeNode) {
LogicalTypeRoot typeRoot =
LogicalTypeRoot.valueOf(logicalTypeNode.get(FIELD_NAME_TYPE_NAME).asText());
// handle the special case of NullType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we describe the root reason here. For example, the NullType is always nullable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it clearer.

Comment on lines 78 to 89
// final constants for testing unsupported case
private final LogicalType unsupportedType =
new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR);
private final String serializerExceptionMessageFormat =
"Unable to serialize logical type '%s'. Please check the documentation for supported types.";
private final String unsupportedTypeString = "INTERVAL_DAY_TIME";
private final String json =
String.format(
"{\"%s\": \"%s\", \"%s\": %s}",
"type", unsupportedTypeString, "nullable", "true");
private final String deserializerExceptionMessageFormat =
"Unable to deserialize a logical type of type root '%s'. Please check the documentation for supported types.";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local variables are enough. Move into the test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.


private final ObjectMapper mapper = getObjectMapper();

// final constants for testing unsupported case
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove useless comments... I think variable names are straightforward for us.

Comment on lines 221 to 224
/**
* LogicalType isn't annotated with Jackson annotations, so it's necessary to register the
* customer serializer and deserializer when testing LogicalType Serde alone.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this comment. It's the Jackson's API, we don't need to tell others how to use jackson.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* LogicalType isn't annotated with Jackson annotations, so it's necessary to register the
* customer serializer and deserializer when testing LogicalType Serde alone.
*/
private ObjectMapper getObjectMapper() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to buildObjectMapper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

new LocalZonedTimestampType(),
new LocalZonedTimestampType(3),
new LocalZonedTimestampType(false, 3),
// LocalZonedTimestampType#eaquals doesn't compare TimestampKind
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment on lines 100 to 113
// test to deserialize unsupported JSON string
String unsupportedTypeString = "INTERVAL_DAY_TIME";
String json =
String.format(
"{\"%s\": \"%s\", \"%s\": %s}",
"type", unsupportedTypeString, "nullable", "true");
assertThatThrownBy(() -> mapper.readValue(json, LogicalType.class))
.satisfies(
FlinkAssertions.anyCauseMatches(
UnsupportedOperationException.class,
String.format(
"Unable to deserialize a logical type of type root '%s'. Please check the documentation for supported types.",
unsupportedTypeString)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to split into two test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved.

Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your update. LGTM

@fsk119 fsk119 merged commit 64f11ee into apache:master Aug 25, 2022
@yuzelin yuzelin deleted the LogicalTypeSerDe branch August 26, 2022 03:30
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants