Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19667] Add AWS Glue Schema Registry integration #14737

Merged
merged 1 commit into from
Mar 15, 2021

Conversation

mohitpali
Copy link
Contributor

What is the purpose of the change

The AWS Glue Schema Registry is a new feature of AWS Glue that allows you to centrally discover, control, and evolve data stream schemas. This request is to add a new format to launch an integration for Apache Flink with AWS Glue Schema Registry.

Brief change log

  • Added flink-avro-glue-schema-registry module under flink-formats
  • Added end-to-end test named flink-glue-schema-registry-test for the new module

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests for end-to-end deployment

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 399f06e (Sun Jan 24 02:36:14 UTC 2021)

Warnings:

  • 4 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 24, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@rmetzger
Copy link
Contributor

What's the relationship of this PR to #14490 ?

@LinyuYao1021
Copy link
Contributor

What's the relationship of this PR to #14490 ?

It's the same one but fixing the compiling error.

@rmetzger
Copy link
Contributor

You don't need to open a PR for every fix, you can just keep (force) pushing to the branch. Can you close the old PR?

I'll review the PR tomorrow.

@mohitpali
Copy link
Contributor Author

mohitpali commented Jan 26, 2021

Apologies for the confusion, we have closed the other PR. We had to create another PR because two developers were working on it and hence the different login. I have included some CI compilation fixes in this PR and rebased.

@rmetzger
Copy link
Contributor

Thanks a lot for the clarification

Copy link
Contributor

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this big PR. I've made a first very rough pass over the code and commented on some issues. Depending on my level of confidence after a full review, I might ask another Flink committer to take a look as well.

What's your plan regarding documentation? Will this be done in a follow up PR, or in this one?

@@ -0,0 +1,185 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the package name is not properly encoded into subdirectories.
Part of the directory name of this file is org.apache.flink.glue.schema.registry.test, but it should be org/apache/flink/glue/schema/registry/test. This might be difficult to see in some IDEs, as they are replacing this directory structure with the dot-notation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// TODO: main thread needs to create job or CLI fails with:
// "The program didn't contain a Flink job. Perhaps you forgot to call execute() on the
// execution environment."
System.out.println("test finished");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use LOG?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


Schema schema;
try {
schema = (new Schema.Parser()).parse(schemaDefinition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I'm not mistaken, this parser initialization and schema parsing is done for every RegistryAvroDeserializationSchema.deserialize() call.
I guess this is necessary when deserializing GenericRecord Avro records, but for SpecificRecord we only need to deserialize the schema once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of schema parsing is because GSR return its own defined Schema class from serialized byte array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question was focusing on the frequency of deserialisation. To improve performance can we deserialise schema for specific info once, and cache it? Or are we expecting the schema definition to change over time? What happens if the schema changes for a SpecificRecord? Is the idea that the Flink job would fail if the upstream data format changes in a non compatible way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

flink-formats/flink-avro-glue-schema-registry/pom.xml Outdated Show resolved Hide resolved
@dannycranmer dannycranmer self-requested a review January 28, 2021 14:08
Copy link
Contributor

@dannycranmer dannycranmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution. I have taken a first pass at the PR.

</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-glue-schema-registry-test_${scala.binary.version}</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, but I do not think you need additional artifacts per scala version. Suggest dropping _${scala.binary.version}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>compile</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you should not need <version> and <scope> tags here. I assume you meant to include junit as a compile scoped dependency (compile is default)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +38 to +39
<aws.sdk.version>1.11.754</aws.sdk.version>
<aws.sdkv2.version>2.15.32</aws.sdkv2.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider updating, at the time of writing:

  • v1 @ 1.11.943
  • v2 @ 2.15.70

this.properties = properties;
}

public void createTopic(String stream, int shards, Properties props) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createStream? I would consider updating the method name or add Javadoc to indicate that an existing stream will be deleted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

flink-formats/flink-avro-glue-schema-registry/pom.xml Outdated Show resolved Hide resolved
Comment on lines 57 to 173
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be <scope>provided</scope>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll cause ClassNotFoundException if adding this scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you seeing ClassNotFoundExcception? At runtime the Class would be provided by the Flink cluster. You may see an issue running in standalone mode. The problem with not making this provided is that when building an uber jar for an app, it could bundle additional unnecessary Flink code into the jar. This would bloat the jar size and classpath.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is fixed? I do not see any changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change may be lost during rebasing. Will add again in next commit.

<enforcer.skip>true</enforcer.skip>
</properties>

<dependencies>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have taken a look at the dependency footprint of this module and it looks like there is too much pulled in:

Why do we need Kafka dependencies?

  • +- org.apache.kafka:connect-json:jar:2.5.0:compile
  • +- org.apache.kafka:connect-api:jar:2.5.0:compile
  • +- org.apache.kafka:kafka-streams:jar:2.5.0:compile
  • +- org.apache.kafka:kafka-clients:jar:2.5.0:compile

Pulling lombok as a compile dependency looks wrong, is this scoped correctly in upstream module?

  • +- org.projectlombok:lombok:jar:1.18.2:compile
  • \- org.projectlombok:lombok-utils:jar:1.18.12:compile

As mentioned, we should use the standard junit framework for flink:

  • +- org.junit.jupiter:junit-jupiter-api:jar:5.6.2:test
[INFO] --- maven-dependency-plugin:3.1.1:tree (default-cli) @ flink-avro-glue-schema-registry ---
[INFO] org.apache.flink:flink-avro-glue-schema-registry:jar:1.13-SNAPSHOT
[INFO] +- org.apache.flink:flink-core:jar:1.13-SNAPSHOT:provided
[INFO] |  +- org.apache.flink:flink-annotations:jar:1.13-SNAPSHOT:provided
[INFO] |  +- org.apache.flink:flink-metrics-core:jar:1.13-SNAPSHOT:provided
[INFO] |  +- org.apache.flink:flink-shaded-asm-7:jar:7.1-12.0:provided
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:provided
[INFO] |  |  \- com.esotericsoftware.minlog:minlog:jar:1.2:provided
[INFO] |  +- commons-collections:commons-collections:jar:3.2.2:provided
[INFO] |  +- org.apache.commons:commons-compress:jar:1.20:compile
[INFO] |  \- org.apache.flink:flink-shaded-guava:jar:18.0-12.0:compile
[INFO] +- org.apache.flink:flink-avro:jar:1.13-SNAPSHOT:compile
[INFO] |  \- org.apache.avro:avro:jar:1.10.0:compile
[INFO] |     +- com.fasterxml.jackson.core:jackson-core:jar:2.12.1:compile
[INFO] |     \- com.fasterxml.jackson.core:jackson-databind:jar:2.12.1:compile
[INFO] |        \- com.fasterxml.jackson.core:jackson-annotations:jar:2.12.1:compile
[INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.13-SNAPSHOT:compile
[INFO] |  +- org.apache.flink:flink-file-sink-common:jar:1.13-SNAPSHOT:compile
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.13-SNAPSHOT:compile
[INFO] |  |  +- org.apache.flink:flink-queryable-state-client-java:jar:1.13-SNAPSHOT:compile
[INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.13-SNAPSHOT:compile
[INFO] |  |  +- commons-io:commons-io:jar:2.7:compile
[INFO] |  |  +- org.apache.flink:flink-shaded-netty:jar:4.1.49.Final-12.0:compile
[INFO] |  |  +- org.apache.flink:flink-shaded-jackson:jar:2.10.1-12.0:compile
[INFO] |  |  +- org.apache.flink:flink-shaded-zookeeper-3:jar:3.4.14-12.0:compile
[INFO] |  |  +- org.javassist:javassist:jar:3.24.0-GA:compile
[INFO] |  |  +- org.scala-lang:scala-library:jar:2.11.12:compile
[INFO] |  |  +- com.typesafe.akka:akka-actor_2.11:jar:2.5.21:compile
[INFO] |  |  |  +- com.typesafe:config:jar:1.3.0:compile
[INFO] |  |  |  \- org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
[INFO] |  |  +- com.typesafe.akka:akka-stream_2.11:jar:2.5.21:compile
[INFO] |  |  |  +- org.reactivestreams:reactive-streams:jar:1.0.2:compile
[INFO] |  |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.3.7:compile
[INFO] |  |  |     \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.1.1:compile
[INFO] |  |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.5.21:compile
[INFO] |  |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.5.21:compile
[INFO] |  |  +- org.clapper:grizzled-slf4j_2.11:jar:1.3.2:compile
[INFO] |  |  +- com.github.scopt:scopt_2.11:jar:3.5.0:compile
[INFO] |  |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
[INFO] |  |  +- com.twitter:chill_2.11:jar:0.7.6:compile
[INFO] |  |  |  \- com.twitter:chill-java:jar:0.7.6:compile
[INFO] |  |  \- org.lz4:lz4-java:jar:1.6.0:compile
[INFO] |  +- org.apache.flink:flink-java:jar:1.13-SNAPSHOT:compile
[INFO] |  \- org.apache.commons:commons-math3:jar:3.5:compile
[INFO] +- org.apache.flink:flink-clients_2.11:jar:1.13-SNAPSHOT:compile
[INFO] |  +- org.apache.flink:flink-optimizer_2.11:jar:1.13-SNAPSHOT:compile
[INFO] |  \- commons-cli:commons-cli:jar:1.3.1:compile
[INFO] +- software.amazon.glue:schema-registry-serde:jar:1.0.0:compile
[INFO] |  +- software.amazon.glue:schema-registry-common:jar:1.0.0:compile
[INFO] |  |  +- software.amazon.awssdk:glue:jar:2.15.32:compile
[INFO] |  |  |  +- software.amazon.awssdk:protocol-core:jar:2.15.32:compile
[INFO] |  |  |  +- software.amazon.awssdk:auth:jar:2.15.32:compile
[INFO] |  |  |  |  \- software.amazon.eventstream:eventstream:jar:1.0.1:compile
[INFO] |  |  |  +- software.amazon.awssdk:http-client-spi:jar:2.15.32:compile
[INFO] |  |  |  +- software.amazon.awssdk:regions:jar:2.15.32:compile
[INFO] |  |  |  +- software.amazon.awssdk:aws-core:jar:2.15.32:compile
[INFO] |  |  |  +- software.amazon.awssdk:metrics-spi:jar:2.15.32:compile
[INFO] |  |  |  +- software.amazon.awssdk:apache-client:jar:2.15.32:runtime
[INFO] |  |  |  |  +- org.apache.httpcomponents:httpclient:jar:4.5.3:runtime
[INFO] |  |  |  |  |  +- commons-logging:commons-logging:jar:1.1.3:runtime
[INFO] |  |  |  |  |  \- commons-codec:commons-codec:jar:1.13:runtime
[INFO] |  |  |  |  \- org.apache.httpcomponents:httpcore:jar:4.4.6:runtime
[INFO] |  |  |  \- software.amazon.awssdk:netty-nio-client:jar:2.15.32:runtime
[INFO] |  |  |     +- io.netty:netty-codec-http:jar:4.1.53.Final:runtime
[INFO] |  |  |     +- io.netty:netty-codec-http2:jar:4.1.53.Final:runtime
[INFO] |  |  |     +- io.netty:netty-codec:jar:4.1.53.Final:runtime
[INFO] |  |  |     +- io.netty:netty-transport:jar:4.1.53.Final:runtime
[INFO] |  |  |     |  \- io.netty:netty-resolver:jar:4.1.53.Final:runtime
[INFO] |  |  |     +- io.netty:netty-common:jar:4.1.53.Final:runtime
[INFO] |  |  |     +- io.netty:netty-buffer:jar:4.1.53.Final:runtime
[INFO] |  |  |     +- io.netty:netty-handler:jar:4.1.53.Final:runtime
[INFO] |  |  |     +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.53.Final:runtime
[INFO] |  |  |     |  \- io.netty:netty-transport-native-unix-common:jar:4.1.53.Final:runtime
[INFO] |  |  |     \- com.typesafe.netty:netty-reactive-streams-http:jar:2.0.4:runtime
[INFO] |  |  |        \- com.typesafe.netty:netty-reactive-streams:jar:2.0.4:runtime
[INFO] |  |  +- software.amazon.awssdk:aws-json-protocol:jar:2.15.30:compile
[INFO] |  |  +- software.amazon.awssdk:cloudwatch:jar:2.15.30:compile
[INFO] |  |  |  \- software.amazon.awssdk:aws-query-protocol:jar:2.15.30:compile
[INFO] |  |  +- software.amazon.awssdk:sdk-core:jar:2.15.30:compile
[INFO] |  |  |  \- software.amazon.awssdk:profiles:jar:2.15.30:compile
[INFO] |  |  +- org.apache.kafka:kafka-clients:jar:2.5.0:compile
[INFO] |  |  |  \- com.github.luben:zstd-jni:jar:1.4.4-7:compile
[INFO] |  |  +- org.apache.kafka:kafka-streams:jar:2.5.0:compile
[INFO] |  |  |  +- org.apache.kafka:connect-json:jar:2.5.0:compile
[INFO] |  |  |  |  +- org.apache.kafka:connect-api:jar:2.5.0:compile
[INFO] |  |  |  |  \- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.12.1:compile
[INFO] |  |  |  \- org.rocksdb:rocksdbjni:jar:5.18.3:compile
[INFO] |  |  \- com.google.guava:guava:jar:29.0-jre:compile
[INFO] |  |     +- com.google.guava:failureaccess:jar:1.0.1:compile
[INFO] |  |     +- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile
[INFO] |  |     +- org.checkerframework:checker-qual:jar:2.11.1:compile
[INFO] |  |     +- com.google.errorprone:error_prone_annotations:jar:2.3.4:compile
[INFO] |  |     \- com.google.j2objc:j2objc-annotations:jar:1.3:compile
[INFO] |  +- software.amazon.awssdk:arns:jar:2.15.26:compile
[INFO] |  |  +- software.amazon.awssdk:annotations:jar:2.15.26:compile
[INFO] |  |  \- software.amazon.awssdk:utils:jar:2.15.26:compile
[INFO] |  +- org.projectlombok:lombok:jar:1.18.2:compile
[INFO] |  \- org.projectlombok:lombok-utils:jar:1.18.12:compile
[INFO] +- org.junit.jupiter:junit-jupiter-api:jar:5.6.2:test
[INFO] |  +- org.apiguardian:apiguardian-api:jar:1.1.0:test
[INFO] |  +- org.opentest4j:opentest4j:jar:1.2.0:test
[INFO] |  \- org.junit.platform:junit-platform-commons:jar:1.6.2:test
[INFO] +- org.junit.jupiter:junit-jupiter-params:jar:5.6.2:test
[INFO] +- org.mockito:mockito-junit-jupiter:jar:2.21.0:test
[INFO] +- org.slf4j:slf4j-api:jar:1.7.15:provided
[INFO] +- org.apache.flink:flink-test-utils-junit:jar:1.13-SNAPSHOT:test
[INFO] +- org.apache.flink:force-shading:jar:1.13-SNAPSHOT:compile
[INFO] +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] +- junit:junit:jar:4.12:test
[INFO] |  \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] +- org.mockito:mockito-core:jar:2.21.0:test
[INFO] |  +- net.bytebuddy:byte-buddy:jar:1.8.15:test
[INFO] |  +- net.bytebuddy:byte-buddy-agent:jar:1.8.15:test
[INFO] |  \- org.objenesis:objenesis:jar:2.1:provided
[INFO] +- org.powermock:powermock-module-junit4:jar:2.0.4:test
[INFO] |  \- org.powermock:powermock-module-junit4-common:jar:2.0.4:test
[INFO] |     +- org.powermock:powermock-reflect:jar:2.0.4:test
[INFO] |     \- org.powermock:powermock-core:jar:2.0.4:test
[INFO] +- org.powermock:powermock-api-mockito2:jar:2.0.4:test
[INFO] |  \- org.powermock:powermock-api-support:jar:2.0.4:test
[INFO] +- org.hamcrest:hamcrest-all:jar:1.3:test
[INFO] +- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.12.1:test
[INFO] +- org.apache.logging.log4j:log4j-api:jar:2.12.1:test
[INFO] +- org.apache.logging.log4j:log4j-core:jar:2.12.1:test
[INFO] \- org.apache.logging.log4j:log4j-1.2-api:jar:2.12.1:test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enforcer check skip property has been removed. Currently, version of all dependencies with convergence error have been specifically defined in Flink-GSR module and its e2e test module. Once the new version of GSR package with reorganized dependencies is released to maven, the version definition can be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks misplaced. This is not related to the enforcer skip, it is looking at the transitive dependency chain. Did you reply to the wrong thread?

Copy link
Contributor

@LinyuYao1021 LinyuYao1021 Feb 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency chain is fixed in GSR package but it'll need some time to release. Once it's out, it should also fix the enforcer check issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so we have removed the dependency on Kafka in the new version? What is the ECD for the new version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will fix this in a follow up

@dannycranmer
Copy link
Contributor

dannycranmer commented Feb 17, 2021

I am seeing a test failure when running mvn clean install on the flink-avro-glue-schema-registry:

[ERROR] Errors:
[ERROR]   GlueSchemaRegistryAvroSerializationSchemaTest.<init>:58 » IllegalArgument glue...
[ERROR]   GlueSchemaRegistryAvroSerializationSchemaTest.<init>:58 » IllegalArgument glue...
[ERROR]   GlueSchemaRegistryAvroSerializationSchemaTest.<init>:58 » IllegalArgument glue...
[ERROR]   GlueSchemaRegistryAvroSerializationSchemaTest.<init>:58 » IllegalArgument glue...
[ERROR]   GlueSchemaRegistryAvroSerializationSchemaTest.<init>:58 » IllegalArgument glue...
[INFO]
[ERROR] Tests run: 19, Failures: 0, Errors: 5, Skipped: 0

Looks like glueSchemaRegistryConfiguration is null

Comment on lines 57 to 173
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is fixed? I do not see any changes?

<enforcer.skip>true</enforcer.skip>
</properties>

<dependencies>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks misplaced. This is not related to the enforcer skip, it is looking at the transitive dependency chain. Did you reply to the wrong thread?


@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
byte[] data = ((ByteArrayOutputStream) out).toByteArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, please add the Preconditions check


import org.apache.flink.formats.avro.SchemaCoder;

import lombok.NonNull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I meant you are using lombok rather than javax.annotation. But I think this is not needed since Flink coding standards say everything is nonnull by default:

This also opens the question as to why the transportName is not @NonNull? Is it @Nullable?

Please remove the annotations unless there is a good reason to keep them.


@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class User extends org.apache.avro.specific.SpecificRecordBase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this generated file to be in the source, or could we just generate it on the fly like we do for flink-avro?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there's no avro file can be used directly. So this file is still needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't this file generated from the schema defined in flink-formats/flink-avro-glue-schema-registry/src/test/java/resources/avro/user.avsc?
The avro-maven-plugin can generated this User.java file based on user.avsc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with addressing this in a follow up PR if you prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I misunderstood what the dependency does. Will address this in follow up PR.

import static org.hamcrest.Matchers.instanceOf;

/** Tests for {@link GlueSchemaRegistryOutputStreamSerializer}. */
public class GlueSchemaRegistryOutputStreamSerializerTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is common for all test class to extend TestLogger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@dannycranmer
Copy link
Contributor

As a follow up we should add support for SQL client and Table API by:

@LinyuYao1021
Copy link
Contributor

SECRET_GLUE_SCHEMA_ACCESS_KEY: $[variables.IT_CASE_GLUE_SCHEMA_ACCESS_KEY]
SECRET_GLUE_SCHEMA_SECRET_KEY: $[variables.IT_CASE_GLUE_SCHEMA_SECRET_KEY]

Added

@LinyuYao1021
Copy link
Contributor

LinyuYao1021 commented Mar 9, 2021

Hi Robert, main CI passed now. Would you please bring the latest commit to your personal CI to verify it and then we can close this PR? @rmetzger

@rmetzger
Copy link
Contributor

rmetzger commented Mar 9, 2021

Have you setup your CI with the password as well, and verified the change?

@LinyuYao1021
Copy link
Contributor

Have you setup your CI with the password as well, and verified the change?

Not yet, I haven't used Azure before. Could you quickly walk me through what to do to set up CI?

@dannycranmer
Copy link
Contributor

I also need to set this up. I was planning on taking a look tomorrow morning. Let me try to setup my personal Azure and verify your change. I will update you tomorrow.

@rmetzger
Copy link
Contributor

rmetzger commented Mar 9, 2021

@LinyuYao1021
Copy link
Contributor

It failed again: https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8955&view=logs&j=9401bf33-03c4-5a24-83fe-e51d75db73ef&t=72901ab2-7cd0-57be-82b1-bca51de20fba

It's still because credentials can't be extracted. How did you success last time?

@dannycranmer
Copy link
Contributor

I have setup my pipeline, running master to verify it works, then I will run your GSR branch:

@LinyuYao1021
Copy link
Contributor

LinyuYao1021 commented Mar 10, 2021

This is my pipeline, directly to run my commit to see what's the problem:

-https://dev.azure.com/yaolinyu3547/PrivateFlink/_build/results?buildId=5&view=results

@dannycranmer
Copy link
Contributor

Current status is that my new Azure account is blocked waiting for limit increase to run parallel builds. Until this is complete, I cannot verify the e2e tests. I have sent an email to azure as described in the docs, and am waiting for a response.

@rmetzger
Copy link
Contributor

I was concerned that this would happen to you. A new hire in our company is facing the same issue.
Looks like Azure is having some issues with cryptocurrency mining on their Infra, but they lack the proper infra to detect it.

@dannycranmer
Copy link
Contributor

@dannycranmer
Copy link
Contributor

@@ -364,6 +364,7 @@ function check_logs_for_errors {
| grep -v "HeapDumpOnOutOfMemoryError" \
| grep -v "error_prone_annotations" \
| grep -v "Error sending fetch request" \
| grep -v "WARN akka.remote.ReliableDeliverySupervisor" \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinyuYao1021 Do we still need this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to avoid failure under this scenario:

2021-03-11T23:25:41.9106886Z Mar 11 23:25:41 2021-03-11 23:25:39,736 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink-metrics@10.1.0.4:37981] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2021-03-11T23:25:41.9108202Z Mar 11 23:25:41 2021-03-11 23:25:39,747 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink@10.1.0.4:37839] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2021-03-11T23:25:41.9109453Z Mar 11 23:25:41 2021-03-11 23:25:40,010 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: /10.1.0.4:37839
2021-03-11T23:25:41.9111511Z Mar 11 23:25:41 2021-03-11 23:25:40,010 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink@10.1.0.4:***@10.1.0.4:37839]] Caused by: [java.net.ConnectException: Connection refused: /10.1.0.4:37839]

We already ignore grep -v "WARN akka.remote.transport.netty.NettyTransport"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinyuYao1021 Do we still need this change?

Yes, it skips the akka exception check for logs.

@dannycranmer
Copy link
Contributor

dannycranmer commented Mar 15, 2021

CI is passing with and without WS credentials. I will merge this now:

@dannycranmer dannycranmer merged commit b77afd0 into apache:master Mar 15, 2021
@rmetzger
Copy link
Contributor

Very nice! Thanks a lot for your efforts!

@jiamo
Copy link

jiamo commented Mar 22, 2021

A little question. We know with this repe https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore .AWS EMR Hive can seamless talk with glue meta.
But when use flink hive. It use the original metastore client.
Is It possible to make an option that flink-connector-hive can talk with glue meta.

@dannycranmer
Copy link
Contributor

@jiamo this is currently not possible out of the box, but can be achieved with some tweaks. Once you have Flink instantiating and using the Glue Data Catalog client, you would still need to implement property translation to sources/sinks etc (Kinesis/Kafka etc).

What is your use-case?

@jiamo
Copy link

jiamo commented Mar 23, 2021

My use-case: use flink-sql to read data from EMR managed hive (while the table content was s3 files and partition by day)

@MartijnVisser
Copy link
Contributor

@mohitpali Do you think you could help with updating the dependencies of the Glue Schema Registry integrations in Flink? These are already quite outdated and it would be nice if we could update them to the latest supported versions.

@dannycranmer
Copy link
Contributor

@MartijnVisser I will reach out to the team, if they cannot help I will find someone.

@MartijnVisser
Copy link
Contributor

@dannycranmer Thanks!

@dannycranmer
Copy link
Contributor

@MartijnVisser this will be picked up by hlteoh37 (for some reason I cannot mention him)

@hlteoh37
Copy link
Contributor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants