Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar IO][Issue 5633]Support avro schema for debezium connector #6034

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e3c344b
Add avro parse
tuteng Dec 31, 2019
a0699df
Add Class KafkaSchema
tuteng Jan 1, 2020
9533d3b
Add cache for key and value schema
tuteng Jan 8, 2020
b84ecea
Support debezium avro schema
tuteng Jan 12, 2020
f616562
Delete no used dependency
tuteng Jan 12, 2020
e9a8bc2
debezium connect source support `org.apache.pulsar.kafka.shade.io.con…
gaoran10 Apr 16, 2020
6675996
fix
gaoran10 Apr 17, 2020
86d4779
fix
gaoran10 Apr 17, 2020
0f041d7
add some test log
gaoran10 Apr 21, 2020
01eee75
fix
gaoran10 Apr 21, 2020
58bfc7d
add some test log
gaoran10 Apr 21, 2020
78856e0
fix
gaoran10 Apr 21, 2020
ded2cd0
fix
gaoran10 Apr 21, 2020
b15839d
use the ClassLoader of the SourceContext to generate KeyValueSchema
gaoran10 Apr 21, 2020
dcf9202
some test
gaoran10 Apr 22, 2020
2a9de58
some test
gaoran10 Apr 22, 2020
8a77ba8
fix
gaoran10 Apr 22, 2020
56b4d27
fix
gaoran10 Apr 22, 2020
54d636e
add test log
gaoran10 Apr 22, 2020
980ecb8
fix test
gaoran10 Apr 22, 2020
c09e9b4
add test log
gaoran10 Apr 22, 2020
406d3af
add test log
gaoran10 Apr 23, 2020
0057cbb
add test log
gaoran10 Apr 23, 2020
1e19a9a
fix docs
gaoran10 Apr 23, 2020
39f9f7d
fix test
gaoran10 Apr 25, 2020
1d54597
fix test
gaoran10 Apr 27, 2020
4e077b5
fix test
gaoran10 Apr 27, 2020
78ef864
fix test
gaoran10 Apr 27, 2020
d45f27a
fix test
gaoran10 Apr 27, 2020
7e91ab0
fix test
gaoran10 Apr 27, 2020
5aa985c
fix test
gaoran10 Apr 27, 2020
dc83b12
fix test
gaoran10 Apr 27, 2020
a03f6dc
fix
gaoran10 Apr 27, 2020
d07394a
fix
gaoran10 Apr 27, 2020
7698bc9
delete test log
gaoran10 Apr 27, 2020
6c2f98d
delete test log
gaoran10 Apr 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 128 additions & 0 deletions kafka-connect-avro-converter-shaded/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>pulsar</artifactId>
<groupId>org.apache.pulsar</groupId>
<version>2.6.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>kafka-connect-avro-converter-shaded</artifactId>
<name>Apache Pulsar :: Kafka Connect Avro Converter shaded</name>

<dependencies>
<!-- confluent connect avro converter -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${kafka-avro-convert-jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${kafka-avro-convert-jackson.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>

<artifactSet>
<includes>
<include>io.confluent:*</include>
<include>io.confluent:kafka-avro-serializer</include>
<include>io.confluent:kafka-schema-registry-client</include>
<include>io.confluent:common-config</include>
<include>io.confluent:common-utils</include>
<include>org.apache.avro:*</include>

<include>org.codehaus.jackson:jackson-core-asl</include>
<include>org.codehaus.jackson:jackson-mapper-asl</include>
<include>com.thoughtworks.paranamer:paranamer</include>
<include>org.xerial.snappy:snappy-java</include>
<include>org.apache.commons:commons-compress</include>
<include>org.tukaani:xz</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>io.confluent</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.io.confluent</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.avro</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.codehaus.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.com.thoughtworks.paranamer</shadedPattern>
</relocation>
<relocation>
<pattern>org.xerial.snappy</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.xerial.snappy</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.tukaani</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</plugin>
</plugins>
</build>

</project>
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ flexible messaging model and an intuitive client API.</description>
<!-- connector-related modules -->
<module>pulsar-io</module>

<!-- kafka connect avro converter shaded, because version mismatch -->
<module>kafka-connect-avro-converter-shaded</module>

<!-- examples -->
<module>examples</module>

Expand Down Expand Up @@ -208,6 +211,8 @@ flexible messaging model and an intuitive client API.</description>
<guava.version>25.1-jre</guava.version>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.12.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>

<!-- test dependencies -->
<cassandra.version>3.6.0</cassandra.version>
Expand All @@ -220,6 +225,7 @@ flexible messaging model and an intuitive client API.</description>
<javassist.version>3.25.0-GA</javassist.version>
<failsafe.version>2.3.1</failsafe.version>
<skyscreamer.version>1.5.0</skyscreamer.version>
<confluent.version>5.2.2</confluent.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down Expand Up @@ -1715,5 +1721,9 @@ flexible messaging model and an intuitive client API.</description>
<id>spring-plugins-release</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -78,6 +79,11 @@ public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
protected StructSchema(SchemaInfo schemaInfo) {
this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
this.schemaInfo = schemaInfo;

if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
}
}

public org.apache.avro.Schema getAvroSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> {
private final List<Field> fields;
private final Schema schema;
private final byte[] schemaVersion;
private int offset;

public GenericAvroReader(Schema schema) {
this(null, schema, null);
}
Expand All @@ -65,12 +67,22 @@ public GenericAvroReader(Schema writerSchema, Schema readerSchema, byte[] schema
}
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);

if (schema.getObjectProp(GenericAvroSchema.OFFSET_PROP) != null) {
this.offset = Integer.parseInt(schema.getObjectProp(GenericAvroSchema.OFFSET_PROP).toString());
} else {
this.offset = 0;
}

}

@Override
public GenericAvroRecord read(byte[] bytes, int offset, int length) {
try {
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length, null);
if (offset == 0 && this.offset > 0) {
offset = this.offset;
}
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length - offset, null);
org.apache.avro.generic.GenericRecord avroRecord =
(org.apache.avro.generic.GenericRecord)reader.read(
null,
Expand Down Expand Up @@ -101,5 +113,9 @@ public GenericRecord read(InputStream inputStream) {
}
}

public int getOffset() {
return offset;
}

private static final Logger log = LoggerFactory.getLogger(GenericAvroReader.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@Slf4j
public class GenericAvroSchema extends GenericSchemaImpl {

public final static String OFFSET_PROP = "__AVRO_READ_OFFSET__";

public GenericAvroSchema(SchemaInfo schemaInfo) {
this(schemaInfo, true);
}
Expand Down Expand Up @@ -73,6 +75,8 @@ protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersio
schemaInfo);
Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
readerSchema.addProp(OFFSET_PROP, schemaInfo.getProperties().getOrDefault(OFFSET_PROP, "0"));

return new GenericAvroReader(
writerSchema,
readerSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertEquals;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
Expand All @@ -38,18 +40,22 @@ public class GenericAvroReaderTest {
private AvroSchema fooSchemaNotNull;
private AvroSchema fooSchema;
private AvroSchema fooV2Schema;

private AvroSchema fooOffsetSchema;

@BeforeMethod
public void setup() {
fooSchema = AvroSchema.of(Foo.class);

fooV2Schema = AvroSchema.of(FooV2.class);
fooSchemaNotNull = AvroSchema.of(SchemaDefinition
.builder()
.withAlwaysAllowNull(false)
.withPojo(Foo.class)
.build());

fooOffsetSchema = AvroSchema.of(Foo.class);
fooOffsetSchema.getAvroSchema().addProp(GenericAvroSchema.OFFSET_PROP, 5);

foo = new Foo();
foo.setField1("foo1");
foo.setField2("bar1");
Expand Down Expand Up @@ -83,4 +89,20 @@ public void testGenericAvroReaderByReaderSchema() {
assertEquals(genericRecordByReaderSchema.getField("field3"), 10);
}

@Test
public void testOffsetSchema() {
byte[] fooBytes = fooOffsetSchema.encode(foo);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeByte(0);
byteBuf.writeInt(10);
byteBuf.writeBytes(fooBytes);

GenericAvroReader reader = new GenericAvroReader(fooOffsetSchema.getAvroSchema());
assertEquals(reader.getOffset(), 5);
GenericRecord record = reader.read(byteBuf.array());
assertEquals(record.getField("field1"), "foo1");
assertEquals(record.getField("field2"), "bar1");
assertEquals(record.getField("fieldUnableNull"), "notNull");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

/**
* key value schema record.
*/
public interface KVRecord<K, V> extends Record {

Schema<K> getKeySchema();

Schema<V> getValueSchema();

KeyValueEncodingType getKeyValueEncodingType();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.api;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;

import java.util.Collections;
import java.util.Map;
Expand All @@ -43,6 +44,10 @@ default Optional<String> getKey() {
return Optional.empty();
}

default Schema<T> getSchema() {
return null;
}

/**
* Retrieves the actual data of the record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
import lombok.AllArgsConstructor;
import lombok.Data;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;

@Slf4j
@Data
@AllArgsConstructor
public class SinkRecord<T> implements Record<T> {
Expand Down Expand Up @@ -81,4 +86,24 @@ public void fail() {
public Optional<String> getDestinationTopic() {
return sourceRecord.getDestinationTopic();
}

@Override
public Schema<T> getSchema() {
if (sourceRecord == null) {
return null;
}

if (sourceRecord.getSchema() != null) {
return sourceRecord.getSchema();
}

if (sourceRecord instanceof KVRecord) {
KVRecord kvRecord = (KVRecord) sourceRecord;
return KeyValueSchema.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
kvRecord.getKeyValueEncodingType());
}

return null;
}

}