Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First cut at JSON Schema support #1289

Merged
merged 10 commits into from Jan 27, 2020

Conversation

@rayokota
Copy link
Member

rayokota commented Jan 9, 2020

Adds JSON Schema support to Schema Registry, including

  • a JsonSchema plugin that provides compatibility checking
  • KafkaJsonSchemaSerializer and KafkaJsonSchemaDeserializer for use with Java Kafka Consumers and Producers
  • JsonSchemaConverter for use with Kafka Connect
  • KafkaJsonSchemaSerde for use with Kafka Streams
@rayokota rayokota force-pushed the rayokota:add-json-schema-support branch 3 times, most recently from a7dc012 to b92e65f Jan 10, 2020
Copy link
Contributor

cricket007 left a comment

Looking good so far ❤️


private static final Random random = new Random();

private static final ObjectMapper MAPPER = new ObjectMapper();

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

Could a mapper be protected and inherited from ClusterTestHarness?

json-schema-converter/pom.xml Outdated Show resolved Hide resolved
private static boolean isPrimitive(Object value) {
return value == null
|| value instanceof Boolean
|| value instanceof Number
|| value instanceof String;
}
Comment on lines +177 to +182

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

Is there a preference of methods over Predicate instances?


public class KafkaJsonSchemaSerializerConfig extends AbstractKafkaSchemaSerDeConfig {

public static final String FAIL_INVALID_SCHEMA = "json.fail.invalid.schema";

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

This seems duplicated on the DeserializerConfig (minus the DOC)

@rayokota

This comment has been minimized.

Copy link
Member Author

rayokota commented Jan 15, 2020

Thanks for the review @cricket007 ! I've incorporated most of your feedback. Let me know of any other issues or concerns. Thanks!

Copy link
Contributor

cricket007 left a comment

More comments.

Disclaimer, I'm skipping over the test classes since they show as passing

Map<String, String> schemas = new HashMap<>();
String reference = "{\"type\":\"object\",\"additionalProperties\":false,\"definitions\":"
+ "{\"ExternalType\":{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"}},"
+ "\"additionalProperties\":false}}}";

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

Should schema strings be externalized to resource files?

This comment has been minimized.

Copy link
@dragosvictor
} else if (cls == String.class) {
g.writeString((String) ob);
} else if (cls == Integer.class) {
g.writeNumber(((Integer) ob).intValue());
} else if (cls == Long.class) {
g.writeNumber(((Long) ob).longValue());
} else if (cls == Boolean.class) {
g.writeBoolean(((Boolean) ob).booleanValue());
} else if (cls == Double.class) {
g.writeNumber(((Double) ob).doubleValue());
Comment on lines +92 to +101

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

Is there a way to refactor this duplicated block?

e.g. protected void writePrimitives(JsonGenerator g, Class cls, Object ob)

*/
public static ObjectMapper newObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

nit: extra whitespace

*/
public static ObjectMapper newObjectMapper(JsonFactory jsonFactory) {
final ObjectMapper mapper = new ObjectMapper(jsonFactory);

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

nit: extra whitespace

public static ObjectMapper newObjectMapper(JsonFactory jsonFactory) {
final ObjectMapper mapper = new ObjectMapper(jsonFactory);
Comment on lines +50 to +51

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

Could make the factory nullable.

return jsonFactory == null ? new ObjectMapper() : new ObjectMapper(jsonFactory);

Call newObjectMapper(null) from overloaded method

public static String readFile(String fileName) {
ClassLoader classLoader = ClassLoader.getSystemClassLoader();
InputStream is = classLoader.getResourceAsStream(fileName);
if (is != null) {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
return reader.lines().collect(Collectors.joining(System.lineSeparator()));
}
return null;
}
Comment on lines +85 to +93

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

I feel like this could be refactored out to a utility class

private Map<String, Object> getPropertiesMap(Properties props) {
Map<String, Object> originals = new HashMap<>();
for (final String name : props.stringPropertyNames()) {
originals.put(name, props.getProperty(name));
}
return originals;
}
Comment on lines +147 to +153

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

Could be moved into MessageFormatter as a default method?

private int schemaIdFor(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
return buffer.getInt();
}
Comment on lines +215 to +221

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

I feel like this could be refactored out to a utility class, if it doesn't already exist

private SchemaRegistryClient createSchemaRegistry(
String schemaRegistryUrl, Map<String, Object> originals
) {
return schemaRegistry != null
? schemaRegistry
: new CachedSchemaRegistryClient(Collections.singletonList(schemaRegistryUrl),
AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT,
Collections.singletonList(new JsonSchemaProvider()),
originals
);
}
Comment on lines +223 to +233

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

If the SchemaProvider interface was extracted to a parameter, this method could be re-used elsewhere

private List<SchemaProvider> defaultSchemaProviders() {
return Arrays.asList(
new AvroSchemaProvider(), new ProtobufSchemaProvider()
new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()
);
}
Comment on lines 86 to 90

This comment has been minimized.

Copy link
@cricket007

cricket007 Jan 15, 2020

Contributor

This method is duplicated in KafkaSchemaRegistry, no?

@cricket007

This comment has been minimized.

Copy link
Contributor

cricket007 commented Jan 15, 2020

Unrelated to PR, but how easy would it be to integrate https://github.com/joelittlejohn/jsonschema2pojo with the maven-plugin here?

@rayokota

This comment has been minimized.

Copy link
Member Author

rayokota commented Jan 15, 2020

@cricket007 , the SR maven plugin currently doesn't handle code generation. I guess we could add an additional generate action that would call protoc, etc, but since code generation doesn't actually require interaction with SR, it makes less sense to add it to the SR maven plugin, where all the current actions require the SR URL to be configured.

@cricket007

This comment has been minimized.

Copy link
Contributor

cricket007 commented Jan 16, 2020

Maybe integrate was the wrong word.

For example, we've been able to combine Avro maven plugin with the register phase here.

I understand those are separate plugins, so, in theory, any schema / code-gen combo could likely be used

@rayokota rayokota changed the base branch from add-protobuf-support to master Jan 24, 2020
@rayokota rayokota changed the base branch from master to add-protobuf-support Jan 24, 2020
Copy link
Contributor

dragosvictor left a comment

core looks good overall, with a couple of remarks.

Map<String, String> schemas = new HashMap<>();
String reference = "{\"type\":\"object\",\"additionalProperties\":false,\"definitions\":"
+ "{\"ExternalType\":{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"}},"
+ "\"additionalProperties\":false}}}";

This comment has been minimized.

Copy link
@dragosvictor
Copy link
Contributor

dragosvictor left a comment

My comments got lost somehow, reposting.

"Registering a new schema should succeed",
(long) expectedId,
(long) registeredId
Comment on lines 172 to 174

This comment has been minimized.

Copy link
@dragosvictor

dragosvictor Jan 24, 2020

Contributor

Question: why the cast ?

This comment has been minimized.

Copy link
@rayokota

rayokota Jan 24, 2020

Author Member

Ack

List<String> schemas = new ArrayList<>();
for (int i = 0; i < num; i++) {
String schema = "{\"type\":\"object\",\"properties\":{\"f"
+ random.nextInt(Integer.MAX_VALUE)

This comment has been minimized.

Copy link
@dragosvictor

dragosvictor Jan 24, 2020

Contributor

Nit: it's highly improbable, but we could end up with the same number twice here and break the tests every million builds or so :)

Copy link
Contributor

dragosvictor left a comment

Tentatively approving in order to unblock other teams. Will revisit.

@rayokota rayokota force-pushed the rayokota:add-json-schema-support branch from f19fa36 to a8b073f Jan 27, 2020
@rayokota rayokota changed the base branch from add-protobuf-support to master Jan 27, 2020
@rayokota rayokota merged commit 748d64e into confluentinc:master Jan 27, 2020
1 check failed
1 check failed
continuous-integration/jenkins/pr-merge This commit cannot be built
Details
@rayokota rayokota mentioned this pull request Jan 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

3 participants
You can’t perform that action at this time.