Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9337] Implemented AvroDeserializationSchema #5995

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 94 additions & 0 deletions flink-formats/flink-avro-confluent-registry/pom.xml
@@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-formats</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-avro-confluent-registry</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want the Confluent Schema Registry code to be in flink-formats? Shouldn't this be in something like flink-catalogs in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question - maybe eventually. We could leave it in flink-formats for now until we have a case to create flink-catalogs.

This is also not a full-blown catalog support, as for the Table API, but something much simpler - just multiple Avro Schemas.


<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations combine.children="append">
<relocation>
<pattern>com.fasterxml.jackson.core</pattern>
<shadedPattern>org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson.core</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.registry.confluent;

import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.SchemaCoder;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;

import javax.annotation.Nullable;

/**
* Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
* Confluent Schema Registry.
*
* @param <T> type of record it produces
*/
public class ConfluentRegistryAvroDeserializationSchema<T> extends RegistryAvroDeserializationSchema<T> {

private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;

private static final long serialVersionUID = -1671641202177852775L;

/**
* Creates a Avro deserialization schema.
*
* @param recordClazz class to which deserialize. Should be either
* {@link SpecificRecord} or {@link GenericRecord}.
* @param reader reader's Avro schema. Should be provided if recordClazz is
* {@link GenericRecord}
* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
*/
private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
super(recordClazz, reader, schemaCoderProvider);
}

/**
* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
* using provided reader schema and looks up writer schema in Confluent Schema Registry.
*
* @param schema schema of produced records
* @param url url of schema registry to connect
* @return deserialized record in form of {@link GenericRecord}
*/
public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url) {
return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
}

/**
* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
* using provided reader schema and looks up writer schema in Confluent Schema Registry.
*
* @param schema schema of produced records
* @param url url of schema registry to connect
* @param identityMapCapacity maximum number of cached schema versions (default: 1000)
* @return deserialized record in form of {@link GenericRecord}
*/
public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dawidwys couldn't this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End user is supposed to use only this or forSpecific method and no other one. Therefore it must be public.

int identityMapCapacity) {
return new ConfluentRegistryAvroDeserializationSchema<>(
GenericRecord.class,
schema,
new CachedSchemaCoderProvider(url, identityMapCapacity));
}

/**
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
* schema and looks up writer schema in Confluent Schema Registry.
*
* @param tClass class of record to be produced
* @param url url of schema registry to connect
* @return deserialized record
*/
public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
String url) {
return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY);
}

/**
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
* schema and looks up writer schema in Confluent Schema Registry.
*
* @param tClass class of record to be produced
* @param url url of schema registry to connect
* @param identityMapCapacity maximum number of cached schema versions (default: 1000)
* @return deserialized record
*/
public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dawidwys couldn't this be private?

String url, int identityMapCapacity) {
return new ConfluentRegistryAvroDeserializationSchema<>(
tClass,
null,
new CachedSchemaCoderProvider(url, identityMapCapacity)
);
}

private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {

private static final long serialVersionUID = 4023134423033312666L;
private final String url;
private final int identityMapCapacity;

CachedSchemaCoderProvider(String url, int identityMapCapacity) {
this.url = url;
this.identityMapCapacity = identityMapCapacity;
}

@Override
public SchemaCoder get() {
return new ConfluentSchemaRegistryCoder(new CachedSchemaRegistryClient(
url,
identityMapCapacity));
}
}
}
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.registry.confluent;

import org.apache.flink.formats.avro.SchemaCoder;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;

import static java.lang.String.format;

/**
* Reads schema using Confluent Schema Registry protocol.
*/
public class ConfluentSchemaRegistryCoder implements SchemaCoder {

private final SchemaRegistryClient schemaRegistryClient;

/**
* Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
* schema registry.
*
* @param schemaRegistryClient client to connect schema registry
*/
public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
this.schemaRegistryClient = schemaRegistryClient;
}

@Override
public Schema readSchema(InputStream in) throws IOException {
DataInputStream dataInputStream = new DataInputStream(in);

if (dataInputStream.readByte() != 0) {
throw new IOException("Unknown data format. Magic number does not match");
} else {
int schemaId = dataInputStream.readInt();

try {
return schemaRegistryClient.getById(schemaId);
} catch (RestClientException e) {
throw new IOException(format("Could not find schema with id %s in registry", schemaId), e);
}
}
}

}
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.registry.confluent;

import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static org.junit.Assert.assertEquals;

/**
* Tests for {@link ConfluentSchemaRegistryCoder}.
*/
public class ConfluentSchemaRegistryCoderTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to test the magic byte verification?


@Test
public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
MockSchemaRegistryClient client = new MockSchemaRegistryClient();

Schema schema = SchemaBuilder.record("testRecord")
.fields()
.optionalString("testField")
.endRecord();
int schemaId = client.register("testTopic", schema);

ConfluentSchemaRegistryCoder registryCoder = new ConfluentSchemaRegistryCoder(client);
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteOutStream);
dataOutputStream.writeByte(0);
dataOutputStream.writeInt(schemaId);
dataOutputStream.flush();

ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
Schema readSchema = registryCoder.readSchema(byteInStream);

assertEquals(schema, readSchema);
assertEquals(0, byteInStream.available());
}

@Test(expected = IOException.class)
public void testMagicByteVerification() throws Exception {
MockSchemaRegistryClient client = new MockSchemaRegistryClient();
int schemaId = client.register("testTopic", Schema.create(Schema.Type.BOOLEAN));

ConfluentSchemaRegistryCoder coder = new ConfluentSchemaRegistryCoder(client);
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteOutStream);
dataOutputStream.writeByte(5);
dataOutputStream.writeInt(schemaId);
dataOutputStream.flush();

ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
coder.readSchema(byteInStream);

// exception is thrown
}

}