Skip to content

Conversation

@wuchong
Copy link
Member

@wuchong wuchong commented May 14, 2020

What is the purpose of the change

Introduce DebeziumFormatFactory and CanalFormatFactory to read changelogs.

CREATE TABLE my_table (
  ...
) WITH (
  'connector'='...',  -- e.g. 'kafka'
  'format'='debezium-json',
  'debezium-json.schema-include'='true' -- default false, Debeizum can be configured to include or exclude the message schema
  'debezium-json.ignore-parse-errors'='true' -- default false
);

CREATE TABLE my_table (
  ...
) WITH (
 'connector'='...', -- e.g. 'kafka'
 'format'='canal-json',
 'canal-json.ignore-parse-errors'='true' -- default false
);

Brief change log

  • Introduced DebeziumJsonDeserializationSchema and DebeziumFormatFactory to deserialize Debezium messages.
  • Introduced CanalJsonDeserializationSchema and CanalFormatFactory to deserialize Canal messages.
  • Throw exception in DebeziumFormatFactory#createSinkFormat and CanalFormatFactory#createSinkFormat to tell users they can't be as sink for now.

Verifying this change

  • Added DebeziumJsonDeserializationSchemaTest and CanalsonDeserializationSchemaTest to deserialize debezium messages and canal messages.
  • Added DebeziumFormatFactoryTest and CanalFormatFactoryTest to unit test factories.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

@wuchong
Copy link
Member Author

wuchong commented May 14, 2020

cc @danny0405

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 81d1f17 (Thu May 14 12:17:34 UTC 2020)

Warnings:

  • 2 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented May 14, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@wuchong wuchong force-pushed the changelog-formats branch from 81d1f17 to 4ddbfe9 Compare May 15, 2020 01:19
} else {
if (!ignoreParseErrors) {
throw new IOException(format("Failed to deserialize Debezium JSON '%s'.", new String(message)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give more detailed message about why the deserialization fails.

*/
public class DebeziumFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {

public static final String IDENTIFIER = "debezium-json";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DebeziumJsonFormatFactory or maybe we can make the underlying format configurable.

RowData row = jsonDeserializer.deserialize(message);
String type = row.getString(2).toString(); // "type" field
if (OP_INSERT.equals(type)) {
// "data" field is an array of row, contains inserted rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace the if else clause with switch case seems better.

Copy link
Member Author

@wuchong wuchong May 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not easy to rewrite in switch case, because we can't allocate local variable with the same name.

if (before.isNullAt(f)) {
// not null fields in "old" (before) means the fields are changed
// null/empty fields in "old" (before) means the fields not not changed
// so we just copy the not changed fields into before
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not not changed => are not changed

} else {
if (!ignoreParseErrors) {
throw new IOException(format("Failed to deserialize Canal JSON '%s'.", new String(message)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give more detailed exception for an intentionally thrown exception.

*/
public class CanalFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {

public static final String IDENTIFIER = "canal-json";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanalJsonFormatFactory ? or makes the underlying format configurable because Canal default format is Protobuf

}

// Debezium captures change data (`debezium-data-schema-include.txt`) on the `product` table:
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canal ?

@wuchong
Copy link
Member Author

wuchong commented May 15, 2020

Thanks for the reviewing @danny0405 . PR updated.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@wuchong
Copy link
Member Author

wuchong commented May 15, 2020

Thanks for the reviewing @danny0405 .

Merging...

@wuchong wuchong closed this in d2b8ebb May 15, 2020
@wuchong wuchong deleted the changelog-formats branch May 15, 2020 15:09
@wuchong
Copy link
Member Author

wuchong commented May 15, 2020

The failed kafka case is related to FLINK-16383.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants