Skip to content

Commit 1893323

Browse files
jerrypengsijie
authored andcommitted
change JSONSchema to generate and store an avro schema (#2071)
Change JSONSchema to generate a Avro schema from POJO so we can standardize on using Avro schema
1 parent ec04dc0 commit 1893323

14 files changed

Lines changed: 647 additions & 146 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,29 @@
2020

2121
import com.fasterxml.jackson.databind.ObjectMapper;
2222
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
23-
import java.io.IOException;
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.SchemaParseException;
25+
import org.apache.avro.SchemaValidationException;
26+
import org.apache.avro.SchemaValidator;
27+
import org.apache.avro.SchemaValidatorBuilder;
2428
import org.apache.pulsar.common.schema.SchemaData;
2529
import org.apache.pulsar.common.schema.SchemaType;
2630

31+
import java.io.IOException;
32+
import java.util.Arrays;
33+
2734
@SuppressWarnings("unused")
2835
public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
29-
private final ObjectMapper objectMapper = new ObjectMapper();
36+
37+
private final SchemaCompatibilityStrategy compatibilityStrategy;
38+
39+
public JsonSchemaCompatibilityCheck () {
40+
this(SchemaCompatibilityStrategy.FULL);
41+
}
42+
43+
public JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy compatibilityStrategy) {
44+
this.compatibilityStrategy = compatibilityStrategy;
45+
}
3046

3147
@Override
3248
public SchemaType getSchemaType() {
@@ -35,12 +51,109 @@ public SchemaType getSchemaType() {
3551

3652
@Override
3753
public boolean isCompatible(SchemaData from, SchemaData to) {
54+
55+
if (isAvroSchema(from)) {
56+
if (isAvroSchema(to)) {
57+
// if both producer and broker have the schema in avro format
58+
return isCompatibleAvroSchema(from, to);
59+
} else if (isJsonSchema(to)) {
60+
// if broker have the schema in avro format but producer sent a schema in the old json format
61+
// allow old schema format for backwards compatiblity
62+
return true;
63+
} else {
64+
// unknown schema format
65+
return false;
66+
}
67+
} else if (isJsonSchema(from)){
68+
69+
if (isAvroSchema(to)) {
70+
// if broker have the schema in old json format but producer sent a schema in the avro format
71+
// return true and overwrite the old format
72+
return true;
73+
} else if (isJsonSchema(to)) {
74+
// if both producer and broker have the schema in old json format
75+
return isCompatibleJsonSchema(from, to);
76+
} else {
77+
// unknown schema format
78+
return false;
79+
}
80+
} else {
81+
// broker has schema format with unknown format
82+
// maybe corrupted?
83+
// return true to overwrite
84+
return true;
85+
}
86+
}
87+
88+
private boolean isCompatibleAvroSchema(SchemaData from, SchemaData to) {
89+
Schema.Parser fromParser = new Schema.Parser();
90+
Schema fromSchema = fromParser.parse(new String(from.getData()));
91+
Schema.Parser toParser = new Schema.Parser();
92+
Schema toSchema = toParser.parse(new String(to.getData()));
93+
94+
SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true);
95+
try {
96+
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
97+
} catch (SchemaValidationException e) {
98+
return false;
99+
}
100+
return true;
101+
}
102+
103+
private ObjectMapper objectMapper;
104+
private ObjectMapper getObjectMapper() {
105+
if (objectMapper == null) {
106+
objectMapper = new ObjectMapper();
107+
}
108+
return objectMapper;
109+
}
110+
111+
private boolean isCompatibleJsonSchema(SchemaData from, SchemaData to) {
38112
try {
113+
ObjectMapper objectMapper = getObjectMapper();
39114
JsonSchema fromSchema = objectMapper.readValue(from.getData(), JsonSchema.class);
40115
JsonSchema toSchema = objectMapper.readValue(to.getData(), JsonSchema.class);
41116
return fromSchema.getId().equals(toSchema.getId());
42117
} catch (IOException e) {
43118
return false;
44119
}
45120
}
121+
122+
private boolean isAvroSchema(SchemaData schemaData) {
123+
try {
124+
125+
Schema.Parser fromParser = new Schema.Parser();
126+
Schema fromSchema = fromParser.parse(new String(schemaData.getData()));
127+
return true;
128+
} catch (SchemaParseException e) {
129+
return false;
130+
}
131+
}
132+
133+
private boolean isJsonSchema(SchemaData schemaData) {
134+
ObjectMapper objectMapper = getObjectMapper();
135+
try {
136+
JsonSchema fromSchema = objectMapper.readValue(schemaData.getData(), JsonSchema.class);
137+
return true;
138+
} catch (IOException e) {
139+
return false;
140+
}
141+
}
142+
143+
private static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy,
144+
boolean onlyLatestValidator) {
145+
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
146+
switch (compatibilityStrategy) {
147+
case BACKWARD:
148+
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
149+
case FORWARD:
150+
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
151+
default:
152+
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
153+
}
154+
}
155+
156+
private static SchemaValidator createLatestOrAllValidator(SchemaValidatorBuilder validatorBuilder, boolean onlyLatest) {
157+
return onlyLatest ? validatorBuilder.validateLatest() : validatorBuilder.validateAll();
158+
}
46159
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@
2323

2424
public interface SchemaCompatibilityCheck {
2525
SchemaType getSchemaType();
26+
27+
/**
28+
*
29+
* @param from the current schema i.e. schema that the broker has
30+
* @param to the future schema i.e. the schema sent by the producer
31+
* @return whether the schemas are compatible
32+
*/
2633
boolean isCompatible(SchemaData from, SchemaData to);
2734

2835
SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java

Lines changed: 10 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -23,127 +23,20 @@
2323
import org.testng.Assert;
2424
import org.testng.annotations.Test;
2525

26-
public class AvroSchemaCompatibilityCheckTest {
26+
public class AvroSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
2727

28-
private static final String schemaJson1 =
29-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
30-
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
31-
private static final SchemaData schemaData1 = getSchemaData(schemaJson1);
32-
33-
private static final String schemaJson2 =
34-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
35-
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
36-
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
37-
private static final SchemaData schemaData2 = getSchemaData(schemaJson2);
38-
39-
private static final String schemaJson3 =
40-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org" +
41-
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest$\"," +
42-
"\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"string\"}]}";
43-
private static final SchemaData schemaData3 = getSchemaData(schemaJson3);
44-
45-
private static final String schemaJson4 =
46-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
47-
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
48-
"\"aliases\":[\"field1\"]}]}";
49-
private static final SchemaData schemaData4 = getSchemaData(schemaJson4);
50-
51-
private static final String schemaJson5 =
52-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
53-
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
54-
"\"string\"]}]}";
55-
private static final SchemaData schemaData5 = getSchemaData(schemaJson5);
56-
57-
private static final String schemaJson6 =
58-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
59-
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
60-
"\"string\",\"int\"]}]}";
61-
private static final SchemaData schemaData6 = getSchemaData(schemaJson6);
62-
63-
private static final String schemaJson7 =
64-
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
65-
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
66-
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"},{\"name\":\"field3\"," +
67-
"\"type\":\"string\",\"default\":\"bar\"}]}";
68-
private static final SchemaData schemaData7 = getSchemaData(schemaJson7);
69-
70-
/**
71-
* make sure new schema is backwards compatible with latest
72-
*/
73-
@Test
74-
public void testBackwardCompatibility() {
75-
76-
AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
77-
SchemaCompatibilityStrategy.BACKWARD
78-
);
79-
80-
// adding a field with default is backwards compatible
81-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
82-
"adding a field with default is backwards compatible");
83-
// adding a field without default is NOT backwards compatible
84-
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
85-
"adding a field without default is NOT backwards compatible");
86-
// Modifying a field name is not backwards compatible
87-
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData4),
88-
"Modifying a field name is not backwards compatible");
89-
// evolving field to a union is backwards compatible
90-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData5),
91-
"evolving field to a union is backwards compatible");
92-
// removing a field from a union is NOT backwards compatible
93-
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData1),
94-
"removing a field from a union is NOT backwards compatible");
95-
// adding a field to a union is backwards compatible
96-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData6),
97-
"adding a field to a union is backwards compatible");
98-
// removing a field a union is NOT backwards compatible
99-
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData6, schemaData5),
100-
"removing a field a union is NOT backwards compatible");
101-
}
102-
103-
/**
104-
* Check to make sure the last schema version is forward-compatible with new schemas
105-
*/
106-
@Test
107-
public void testForwardCompatibility() {
108-
109-
AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
110-
SchemaCompatibilityStrategy.FORWARD
111-
);
112-
113-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
114-
"adding a field is forward compatible");
115-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
116-
"adding a field is forward compatible");
117-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData3),
118-
"adding a field is forward compatible");
119-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
120-
"adding a field is forward compatible");
121-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
122-
"adding a field is forward compatible");
123-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData7),
124-
"removing fields is forward compatible");
125-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData1),
126-
"removing fields with defaults forward compatible");
28+
@Override
29+
public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
30+
return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
12731
}
12832

129-
/**
130-
* Make sure the new schema is forward- and backward-compatible from the latest to newest and from the newest to latest.
131-
*/
132-
@Test
133-
public void testFullCompatibility() {
134-
AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
135-
SchemaCompatibilityStrategy.FULL
136-
);
137-
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
138-
"adding a field with default fully compatible");
139-
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
140-
"adding a field without default is not fully compatible");
141-
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData1),
142-
"adding a field without default is not fully compatible");
143-
33+
@Override
34+
public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
35+
return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
14436
}
14537

146-
private static SchemaData getSchemaData(String schemaJson) {
147-
return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).build();
38+
@Override
39+
public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
40+
return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
14841
}
14942
}

0 commit comments

Comments
 (0)