Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Avro to 1.9.1 #5938

Merged
merged 11 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ The Apache Software License, Version 2.0
* JCommander -- com.beust-jcommander-1.48.jar
* High Performance Primitive Collections for Java -- com.carrotsearch-hppc-0.7.3.jar
* Jackson
- org.codehaus.jackson-jackson-core-asl-1.9.13.jar
- org.codehaus.jackson-jackson-mapper-asl-1.9.13.jar
- com.fasterxml.jackson.core-jackson-annotations-2.10.1.jar
- com.fasterxml.jackson.core-jackson-core-2.10.1.jar
- com.fasterxml.jackson.core-jackson-databind-2.10.1.jar
Expand Down Expand Up @@ -454,13 +452,11 @@ The Apache Software License, Version 2.0
* OpenCensus
- io.opencensus-opencensus-api-0.18.0.jar
- io.opencensus-opencensus-contrib-grpc-metrics-0.18.0.jar
* Paranamer
- com.thoughtworks.paranamer-paranamer-2.7.jar
* Jodah
- net.jodah-typetools-0.5.0.jar
* Apache Avro
- org.apache.avro-avro-1.8.2.jar
- org.apache.avro-avro-protobuf-1.8.2.jar
- org.apache.avro-avro-1.9.1.jar
- org.apache.avro-avro-protobuf-1.9.1.jar
* Apache Curator
- org.apache.curator-curator-client-4.0.1.jar
- org.apache.curator-curator-framework-4.0.1.jar
Expand Down Expand Up @@ -568,10 +564,6 @@ Eclipse Public License 1.0 -- licenses/LICENSE-AspectJ.txt
- org.aspectj-aspectjrt-1.9.2.jar
- org.aspectj-aspectjweaver-1.9.2.jar

Public Domain
* XZ for Java -- licenses/LICENSE-xz.txt
- org.tukaani-xz-1.5.jar

Public Domain (CC0) -- licenses/LICENSE-CC0.txt
* Reactive Streams -- org.reactivestreams-reactive-streams-1.0.2.jar

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ flexible messaging model and an intuitive client API.</description>
<kafka-client.version>2.3.0</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.297</aws-sdk.version>
<avro.version>1.8.2</avro.version>
<avro.version>1.9.1</avro.version>
<joda.version>2.10.1</joda.version>
<jclouds.version>2.1.1</jclouds.version>
<sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaComp
try {
for (SchemaData schemaData : from) {
Schema.Parser parser = new Schema.Parser();
parser.setValidateDefaults(false);
fromList.addFirst(parser.parse(new String(schemaData.getData(), UTF_8)));
}
Schema.Parser parser = new Schema.Parser();
parser.setValidateDefaults(false);
Schema toSchema = parser.parse(new String(to.getData(), UTF_8));
SchemaValidator schemaValidator = createSchemaValidator(strategy);
schemaValidator.validate(toSchema, fromList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private boolean isAvroSchema(SchemaData schemaData) {
try {

Schema.Parser fromParser = new Schema.Parser();
fromParser.setValidateDefaults(false);
Schema fromSchema = fromParser.parse(new String(schemaData.getData(), UTF_8));
return true;
} catch (SchemaParseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {

try {
Schema.Parser avroSchemaParser = new Schema.Parser();
avroSchemaParser.setValidateDefaults(false);
avroSchemaParser.parse(new String(data, UTF_8));
} catch (SchemaParseException e) {
if (schemaData.getType() == SchemaType.JSON) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,49 @@ public abstract class BaseAvroSchemaCompatibilityTest {

private static final String schemaJson1 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
private static final SchemaData schemaData1 = getSchemaData(schemaJson1);

private static final String schemaJson2 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
private static final SchemaData schemaData2 = getSchemaData(schemaJson2);

private static final String schemaJson3 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org" +
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest$\"," +
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest\"," +
"\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"string\"}]}";
private static final SchemaData schemaData3 = getSchemaData(schemaJson3);

private static final String schemaJson4 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
"\"aliases\":[\"field1\"]}]}";
private static final SchemaData schemaData4 = getSchemaData(schemaJson4);

private static final String schemaJson5 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\"]}]}";
private static final SchemaData schemaData5 = getSchemaData(schemaJson5);

private static final String schemaJson6 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\",\"int\"]}]}";
private static final SchemaData schemaData6 = getSchemaData(schemaJson6);

private static final String schemaJson7 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"},{\"name\":\"field3\"," +
"\"type\":\"string\",\"default\":\"bar\"}]}";
private static final SchemaData schemaData7 = getSchemaData(schemaJson7);

private static final String schemaJson8 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\"}]}";
private static final SchemaData schemaData8 = getSchemaData(schemaJson8);

Expand All @@ -96,10 +96,10 @@ public void testBackwardCompatibility() {
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
SchemaCompatibilityStrategy.BACKWARD),
"adding a field without default is NOT backwards compatible");
// Modifying a field name is not backwards compatible
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4,
SchemaCompatibilityStrategy.BACKWARD),
"Modifying a field name is not backwards compatible");
// Modifying a field name with an alias is backwards compatible
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4,
SchemaCompatibilityStrategy.BACKWARD),
"Modifying a field name with an alias is backwards compatible");
// evolving field to a union is backwards compatible
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData5,
SchemaCompatibilityStrategy.BACKWARD),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,18 +293,18 @@ public void dontReAddExistingSchemaInMiddle() throws Exception {
public void checkIsCompatible() throws Exception {
String schemaJson1 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
SchemaData schemaData1 = getSchemaData(schemaJson1);

String schemaJson2 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
SchemaData schemaData2 = getSchemaData(schemaJson2);

String schemaJson3 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\"}]}";
SchemaData schemaData3 = getSchemaData(schemaJson3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.netty.buffer.Unpooled;
Expand All @@ -44,8 +45,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import avro.shaded.com.google.common.collect.Lists;

public class MessageParserTest extends MockedPulsarServiceBaseTest {

@BeforeMethod
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/schemaDef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestSchemaDef(t *testing.T) {
_, err = initAvroCodec(errSchemaDef4)
assert.NotNil(t, err)

errSchemaDef5 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"operation.createJsonConsumer$\"," +
errSchemaDef5 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"operation.createJsonConsumer\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\":[\"null\",\"string\"],\"default\":null\"}]}"
_, err = initAvroCodec(errSchemaDef5)
assert.NotNil(t, err)
Expand Down
6 changes: 0 additions & 6 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@
<artifactId>jackson-module-jsonSchema</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,27 @@
public class AvroSchema<T> extends StructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);

// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
try {
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();

reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());

ReflectData reflectDataNotAllowNull = ReflectData.get();
ReflectData reflectDataNotAllowNull = ReflectData.get();

reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
} catch (Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug("Avro logical types are not available. If you are going to use avro logical types, " +
"you can include `joda-time` in your dependency.");
}
}
reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

private AvroSchema(SchemaInfo schemaInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import java.lang.reflect.Field;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

Expand All @@ -29,6 +30,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
import org.apache.commons.codec.binary.Hex;
Expand Down Expand Up @@ -133,14 +135,34 @@ protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schema
if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
return parseAvroSchema(schemaDefinition.getJsonDef());
} else if (pojo != null) {
return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
ThreadLocal<Boolean> validateDefaults = null;

try {
Field validateDefaultsField = Schema.class.getDeclaredField("VALIDATE_DEFAULTS");
validateDefaultsField.setAccessible(true);
validateDefaults = (ThreadLocal<Boolean>) validateDefaultsField.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Cannot disable validation of default values", e);
}

final boolean savedValidateDefaults = validateDefaults.get();

try {
// Disable validation of default values for compatibility
validateDefaults.set(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not a very good way. However, there seems to be no public method to disable validation. Another option is to enable validation of default values.

return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo)
: ReflectData.get().getSchema(pojo);
} finally {
validateDefaults.set(savedValidateDefaults);
}
} else {
throw new RuntimeException("Schema definition must specify pojo class or schema json definition");
}
}

protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
final Parser parser = new Parser();
parser.setValidateDefaults(false);
return parser.parse(schemaJson);
}

Expand Down
Loading