Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18777][catalog] Supports schema registry catalog #13033

Closed
wants to merge 1 commit into from

Conversation

danny0405
Copy link
Contributor

What is the purpose of the change

Supports a catalog that can read the kafka topics flexibly.
The demo code:

        String schemaRegistryURL = ...;
        Map<String, String> kafkaProps = ...;
        SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder()
                .schemaRegistryURL(schemaRegistryURL)
                .kafkaOptions(kafkaProps)
                .dbName("myDB")
                .build();
        tEnv.registerCatalog("myCatalog", catalog);
 
        // ---------- Consume stream from Kafka -------------------
 
        // Assumes there is a topic named 'transactions'
        String query = "SELECT\n" +
            " id, amount\n" +
            "FROM myCatalog.myDB.transactions";

Brief change log

  • Add a new Catalog named SchemaRegistryCatalog
  • Add tests for the table attributes

Verifying this change

Added UTs.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes, schema registry client 5.4.2 -> 5.5.1
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

@@ -33,7 +33,7 @@ under the License.

<properties>
<kafka.version>2.4.1</kafka.version>
<confluent.version>5.4.2</confluent.version>
<confluent.version>5.5.1</confluent.version>
</properties>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upgrade to new version client so that we can fetch for format type from the SchemaMetadata.

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit b73c589 (Fri Jul 31 05:05:35 UTC 2020)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 31, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@maver1ck
Copy link
Contributor

@danny0405
Any plans to have this included in Flink 1.12 ?

@danny0405
Copy link
Contributor Author

@maver1ck The code seems straight-forward, @dawidwys said that he would like to review this PR, but i'm not sure if he has time.

@maver1ck
Copy link
Contributor

maver1ck commented Nov 2, 2020

@danny0405 I'll try to test this PR.
@dawidwys Do we have any plans to include this in 1.12 ?

@dawidwys
Copy link
Contributor

dawidwys commented Nov 2, 2020

I'd say it won't make it in 1.12. I talked with @knaufk some time ago that we should probably conclude the FLIP that was started around it first before proceeding with the contribution.

@maver1ck
Copy link
Contributor

maver1ck commented Nov 2, 2020

@dawidwys
Workaround ? Copy this code to own catalog implementation and register it ?

@dawidwys
Copy link
Contributor

dawidwys commented Nov 2, 2020

Yes, that is possible.

@maver1ck
Copy link
Contributor

maver1ck commented Nov 2, 2020

@danny0405
I'm seeing also two problems here.

  1. How this would work with: FLIP-107
  2. This is not compiling on current master.
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /home/maverick/flink/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/avro/Schemas.java:[22,28] package org.codehaus.jackson does not exist
[ERROR] /home/maverick/flink/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/avro/Schemas.java:[23,28] package org.codehaus.jackson does not exist
[ERROR] /home/maverick/flink/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/avro/Schemas.java:[36,30] cannot find symbol
  symbol:   class JsonFactory
  location: class org.apache.avro.Schemas
[ERROR] /home/maverick/flink/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/avro/Schemas.java:[36,56] cannot find symbol
  symbol:   class JsonFactory
  location: class org.apache.avro.Schemas
[ERROR] /home/maverick/flink/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/avro/Schemas.java:[45,25] cannot find symbol
  symbol:   class JsonGenerator
  location: class org.apache.avro.Schemas

@danny0405
Copy link
Contributor Author

@maver1ck When i write the code, the FLIP-107 was not started yet, i would fix the compile issue.

@maver1ck
Copy link
Contributor

maver1ck commented Nov 3, 2020

@dawidwys
Can we at least pick one change from this patch and merge to master ? This one ?

-		<confluent.version>5.4.2</confluent.version>
+		<confluent.version>5.5.1</confluent.version>

@dawidwys
Copy link
Contributor

dawidwys commented Nov 3, 2020

@maver1ck I'll look into it. What is the motivation for upgrading the version? Can't you simply upgrade the version in your job?

@maver1ck
Copy link
Contributor

maver1ck commented Nov 3, 2020

Motivation: to have possibility to run this code on ververica platform when version 1.12 will be available.

I can add dependency in my job.
But this will also probably need some shading as I will have two different version of confluent libraries.

@danny0405 danny0405 closed this Nov 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants