Skip to content

Conversation

@twalthr
Copy link
Contributor

@twalthr twalthr commented Oct 21, 2020

What is the purpose of the change

This updates the KafkaDynamicSource and KafkaDynamicSink to read and
write metadata according to FLIP-107. Reading and writing metadata of formats
is not supported yet.

This PR is based on #13618.

Brief change log

  • Implement SupportsReadingMetadata and SupportsWritingMetadata in source and sink
  • Refactor Kafka source and sink accordingly
  • Various minor improvements (see separate commits)

Verifying this change

  • Added a test in KafkaTableITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 3f19217 (Wed Oct 21 16:35:57 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 21, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @twalthr . I left some comments.

* @see TableSchema#toPhysicalRowDataType()
*/
TypeInformation<?> createTypeInformation(DataType consumedDataType);
<T> TypeInformation<T> createTypeInformation(DataType consumedDataType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice improvement. But is this a compatible change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not a compatible change. But given that those interfaces are still relatively new and not many people have changed to the new sources/sinks. We should do this change now or never and avoid @SuppressWarning in almost all implementations.

* support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
* with this implementation in future versions.
*/
public static boolean deepEquals(Row row, Object other) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other should also be Row class? Because the Javadoc says "Compares two Rows".

private static <E> boolean deepEqualsList(List<E> l1, List<?> l2) {
final Iterator<E> i1 = l1.iterator();
final Iterator<?> i2 = l2.iterator();
while (i1.hasNext() && i2.hasNext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not compare size first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this implementation from java.util.AbstractList#equals but I don't have strong opinion on this. LinkedList are usually uncommon I guess.

*
* <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
* nested row structures that might be created in the table ecosystem. For example, it does not
* support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we have already support this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might not understand your comment here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "For example, it does not support comparing arrays stored in the values of a map", however, the tests prove that we have supported this.

Copy link
Contributor Author

@twalthr twalthr Oct 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to Row#equals(Object). And this doesn't support arrays in values of maps.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got.

true,
new Integer[]{1, null, 3, 99}, // diff here
Arrays.asList(1, null, 3),
originalMap,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a same map reference, create another map?

),

LEADER_EPOCH(
"leader-epoch",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just find if we define metadata key with dash - separator, it has to be declared using FROM clause or escaped, e.g. leader_epoch INT FROM 'leader-epoch'. What do you think about changing the key to leader_epoch which is more SQL identifier compliant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, then I need to change also the recommendations in SupportsMetadata interfaces:

	 * <p>Metadata key names follow the same pattern as mentioned in {@link Factory}. In case of duplicate
	 * names in format and source keys, format keys shall have higher precedence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to keep it as it is. Users can use backticks and leader epoch is not very frequently used. Furthermore, once we introduce metadata for formats such as debezium-json.ingestion-timestamp it would be confusing if the format identifier changes from debezium-json to debezium_json for metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that possible to remove the prefix of debezium-json? The Javadoc also says "In case of duplicate names in format and source keys, format keys shall have higher precedence."

So far, the metadata keys of format and source are very different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can shorten them. The idea was to design the metadata similar to regular options. So if key and value are defined they would get a key. and value. prefix. That should be enough. We don't need the debezium-json.

Copy link
Contributor Author

@twalthr twalthr Oct 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But even for key. and value., users would need to use backticks or the FROM clause. I would stick to the naming convention to not cause confusion.


// --------------------------------------------------------------------------------------------

private static class MetadataKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declare serialVersionUID for this class.


// --------------------------------------------------------------------------------------------

private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declare serialVersionUID for this class.

public void collect(RowData physicalRow) {
final int metadataArity = metadataConverters.length;
// shortcut if no metadata is required
if (metadataArity == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a hasMetadata final member variable to allow JIT compiler optimization.

Row.of("data 3", 3, "CreateTime", LocalDateTime.parse("2020-03-10T13:12:11.123"), 2L, 0, headers3, 0, topic, true)
);

assertTrue(Row.deepEquals(expected, result));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about add exception message for the assertion to print the expected and actual rows. That would be helpful for debugging Azure builds.

@twalthr
Copy link
Contributor Author

twalthr commented Oct 22, 2020

@wuchong I updated the PR, I hope I addressed most of your comments.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

+1 to merge

@twalthr twalthr closed this in d21f5d9 Oct 23, 2020
jnh5y pushed a commit to jnh5y/flink that referenced this pull request Dec 18, 2023
…data

This updates the `KafkaDynamicSource` and `KafkaDynamicSink` to read and
write metadata according to FLIP-107. Reading and writing metadata of formats
is not supported yet.

This closes apache#13732.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants