Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into multi-type
Browse files Browse the repository at this point in the history
  • Loading branch information
ept committed Dec 11, 2017
2 parents 7b5a2f1 + c8a579c commit 6764a57
Show file tree
Hide file tree
Showing 34 changed files with 974 additions and 170 deletions.
136 changes: 69 additions & 67 deletions README.md
Expand Up @@ -13,73 +13,75 @@ Quickstart

The following assumes you have Kafka and an instance of the Schema Registry running using the default settings.

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key/versions
{"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-value/versions
{"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
[1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
{"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
[1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key
{"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
{"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
{"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://localhost:8081/config
{"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/Kafka-value
{"compatibility":"BACKWARD"}
```bash
# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key/versions
{"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-value/versions
{"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
[1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
{"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
[1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key
{"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
{"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
{"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://localhost:8081/config
{"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/Kafka-value
{"compatibility":"BACKWARD"}
```

Installation
------------
Expand Down
Expand Up @@ -55,13 +55,12 @@ public AvroConverter(SchemaRegistryClient client) {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;

AvroConverterConfig avroConverterConfig = new AvroConverterConfig(configs);

if (schemaRegistry == null) {
schemaRegistry =
new CachedSchemaRegistryClient(avroConverterConfig.getSchemaRegistryUrls(),
avroConverterConfig.getMaxSchemasPerSubject());
avroConverterConfig.getMaxSchemasPerSubject(), configs);
}

serializer = new Serializer(schemaRegistry, avroConverterConfig.autoRegisterSchema());
Expand Down
Expand Up @@ -31,6 +31,8 @@
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
Expand Down Expand Up @@ -95,8 +97,10 @@ public void init(Properties props) {
if (url == null) {
throw new ConfigException("Missing schema registry url!");
}

Map<String, Object> originals = getPropertiesMap(props);
schemaRegistry = new CachedSchemaRegistryClient(
url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, originals);

if (props.containsKey("print.key")) {
printKey = props.getProperty("print.key").trim().toLowerCase().equals("true");
Expand All @@ -117,6 +121,14 @@ public void init(Properties props) {
}
}

private Map<String, Object> getPropertiesMap(Properties props) {
Map<String,Object> originals = new HashMap<>();
for (final String name: props.stringPropertyNames()) {
originals.put(name, props.getProperty(name));
}
return originals;
}

@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
if (printKey) {
Expand Down
Expand Up @@ -29,6 +29,9 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -129,8 +132,11 @@ public void init(java.io.InputStream inputStream, java.util.Properties props) {
if (url == null) {
throw new ConfigException("Missing schema registry url!");
}

Map<String, Object> originals = getPropertiesMap(props);

schemaRegistry = new CachedSchemaRegistryClient(
url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, originals);
if (!props.containsKey("value.schema")) {
throw new ConfigException("Must provide the Avro schema string in value.schema");
}
Expand All @@ -154,6 +160,14 @@ public void init(java.io.InputStream inputStream, java.util.Properties props) {
}
}

private Map<String, Object> getPropertiesMap(Properties props) {
Map<String, Object> originals = new HashMap<>();
for (final String name: props.stringPropertyNames()) {
originals.put(name, props.getProperty(name));
}
return originals;
}

@Override
public ProducerRecord<byte[], byte[]> readMessage() {
try {
Expand Down
Expand Up @@ -70,8 +70,9 @@ protected void configureClientProperties(AbstractKafkaAvroSerDeConfig config) {
try {
List<String> urls = config.getSchemaRegistryUrls();
int maxSchemaObject = config.getMaxSchemasPerSubject();
Map<String, Object> originals = config.originalsWithPrefix("");
if (null == schemaRegistry) {
schemaRegistry = new CachedSchemaRegistryClient(urls, maxSchemaObject);
schemaRegistry = new CachedSchemaRegistryClient(urls, maxSchemaObject, originals);
}
keySubjectNameStrategy = config.keySubjectNameStrategy();
valueSubjectNameStrategy = config.valueSubjectNameStrategy();
Expand Down
Expand Up @@ -23,6 +23,8 @@
import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigDef.Importance;
import io.confluent.common.config.ConfigDef.Type;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialSource;

/**
* Base class for configs for serializers and deserializers, defining a few common configs and
Expand Down Expand Up @@ -58,6 +60,20 @@ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig {
"Determines how to construct the subject name under which the value schema is registered "
+ "with the schema registry";

public static final String BASIC_AUTH_CREDENTIALS_SOURCE = SchemaRegistryClientConfig
.BASIC_AUTH_CREDENTIALS_SOURCE;
public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT = "URL";
public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DOC =
"Specify how to pick the credentials for Basic uth header. "
+ "The supported values are URL, USER_INFO and SASL_INHERIT";

public static final String SCHEMA_REGISTRY_USER_INFO_CONFIG =
SchemaRegistryClientConfig.SCHEMA_REGISTRY_USER_INFO_CONFIG;
public static final String SCHEMA_REGISTRY_USER_INFO_DEFAULT = "";
public static final String SCHEMA_REGISTRY_USER_INFO_DOC =
"Specify the user info for Basic Auth in the form of {username}:{password}";


public static ConfigDef baseConfigDef() {
return new ConfigDef()
.define(SCHEMA_REGISTRY_URL_CONFIG, Type.LIST,
Expand All @@ -69,8 +85,12 @@ public static ConfigDef baseConfigDef() {
.define(KEY_SUBJECT_NAME_STRATEGY, Type.STRING, KEY_SUBJECT_NAME_STRATEGY_DEFAULT,
Importance.MEDIUM, KEY_SUBJECT_NAME_STRATEGY_DOC)
.define(VALUE_SUBJECT_NAME_STRATEGY, Type.STRING, VALUE_SUBJECT_NAME_STRATEGY_DEFAULT,
Importance.MEDIUM, VALUE_SUBJECT_NAME_STRATEGY_DOC
);
Importance.MEDIUM, VALUE_SUBJECT_NAME_STRATEGY_DOC)
.define(BASIC_AUTH_CREDENTIALS_SOURCE, Type.STRING, BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT,
ConfigDef.ValidString.in(BasicAuthCredentialSource.NAMES),
Importance.MEDIUM, BASIC_AUTH_CREDENTIALS_SOURCE_DOC)
.define(SCHEMA_REGISTRY_USER_INFO_CONFIG, Type.PASSWORD, SCHEMA_REGISTRY_USER_INFO_DEFAULT,
Importance.MEDIUM, SCHEMA_REGISTRY_USER_INFO_DOC);
}

public AbstractKafkaAvroSerDeConfig(ConfigDef config, Map<?, ?> props) {
Expand Down
9 changes: 9 additions & 0 deletions client/pom.xml
Expand Up @@ -15,6 +15,15 @@
<name>kafka-schema-registry-client</name>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down

0 comments on commit 6764a57

Please sign in to comment.