Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APIs changed to refer to subjects. Retrieval by globally unique schema id #30

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ Quickstart
mvn exec:java -Dexec.mainClass="io.confluent.kafka.schemaregistry.rest.Main" -Dexec.args="config/schema-registry.properties"

4. Register a schema
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X POST -i http://localhost:8080/topics/Kafka/value/versions -d '
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X POST -i http://localhost:8080/subjects/Kafka/versions -d '
{"schema":"Hello World"}'

5. List all topics
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X GET http://localhost:8080/topics
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X POST -i http://localhost:8080/subjects/Kafka,key/versions -d '
{"schema":"Kafka"}'

6. List all versions of a topic's schema
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X GET http://localhost:8080/topics/Kafka/value/versions
5. List all subjects
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X GET http://localhost:8080/subjects

6. List all versions of a subject's schema
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X GET http://localhost:8080/subjects/Kafka/versions

7. Get a particular version of a subject's schema
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X GET http://localhost:8080/subjects/Kafka/versions/1

7. Get a particular version of a topic's schema
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X GET http://localhost:8080/topics/Kafka/value/versions/1

3 changes: 3 additions & 0 deletions config/schema-registry.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

schemaregistry.rest.port=8080
schemaregistry.connection.url=localhost:2181
kafkastore.connection.url=localhost:2181
kafkastore.topic=_schemas
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Properties;

import io.confluent.common.utils.Utils;
import io.confluent.rest.RestConfigException;

public class Main {
Expand All @@ -33,8 +35,12 @@ public class Main {
public static void main(String[] args) throws IOException {

try {
SchemaRegistryConfig config =
new SchemaRegistryConfig((args.length > 0 ? args[0] : null));
if (args.length != 1) {
log.error("Properties file is required to start the schema registry REST instance");
System.exit(1);
}
Properties props = Utils.loadProps(args[0]);
SchemaRegistryConfig config = new SchemaRegistryConfig(props);
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,20 @@
import io.confluent.kafka.schemaregistry.utils.RestUtils;

/**
* An agent responsible for forwarding an incoming registering schema request to another HTTP server
* An agent responsible for forwarding an incoming registering schema request to another HTTP
* server
*/
public class RegisterSchemaForwardingAgent {

private static final Logger log = LoggerFactory.getLogger(RegisterSchemaForwardingAgent.class);
private final Map<String, String> requestProperties;
private final String topic;
private final boolean isKey;
private final String subject;
private final RegisterSchemaRequest registerSchemaRequest;

public RegisterSchemaForwardingAgent(Map<String, String> requestProperties, String topic,
boolean isKey, RegisterSchemaRequest registerSchemaRequest) {
public RegisterSchemaForwardingAgent(Map<String, String> requestProperties, String subject,
RegisterSchemaRequest registerSchemaRequest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should topic be subject?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change during the push

this.requestProperties = requestProperties;
this.topic = topic;
this.isKey = isKey;
this.subject = subject;
this.registerSchemaRequest = registerSchemaRequest;
}

Expand All @@ -50,15 +49,15 @@ public RegisterSchemaForwardingAgent(Map<String, String> requestProperties, Stri
* @param host host to forward the request to
* @param port port to forward the request to
* @return The version id of the schema if registration is successful. Otherwise, throw a
* WebApplicationException.
* WebApplicationException.
*/
public int forward(String host, int port) throws SchemaRegistryException {
String baseUrl = String.format("http://%s:%d", host, port);
log.debug(String.format("Forwarding registering schema request %s to %s",
registerSchemaRequest, baseUrl));
try {
int version = RestUtils.registerSchema(baseUrl, requestProperties, registerSchemaRequest,
topic, isKey);
subject);
return version;
} catch (IOException e) {
throw new SchemaRegistryException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,42 @@
public class SchemaRegistryConfig extends RestConfig {

public static final String KAFKASTORE_CONNECTION_URL_CONFIG = "kafkastore.connection.url";
protected static final String KAFKASTORE_CONNECTION_URL_DOC =
"Zookeeper url for the Kafka cluster";

/**
* <code>kafkastore.zk.session.timeout.ms</code>
*/
public static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG
= "kafkastore.zk.session.timeout.ms";
protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC =
"Zookeeper session timeout";

/**
* <code>kafkastore.topic</code>
*/
public static final String KAFKASTORE_TOPIC_CONFIG = "kafkastore.topic";
public static final String DEFAULT_KAFKASTORE_TOPIC = "_schemas";
protected static final String KAFKASTORE_TOPIC_DOC =
"The durable single partition topic that acts" +
"as the durable log for the data";

/**
* <code>kafkastore.timeout.ms</code>
*/
public static final String KAFKASTORE_TIMEOUT_CONFIG = "kafkastore.timeout.ms";
protected static final String KAFKASTORE_TIMEOUT_DOC =
"The timeout for an operation on the Kafka store";

/**
* <code>kafkastore.commit.interval.ms</code>
*/
public static final String KAFKASTORE_COMMIT_INTERVAL_MS_CONFIG = "kafkastore.commit.interval.ms";
protected static final String KAFKASTORE_COMMIT_INTERVAL_MS_DOC =
"The interval to commit offsets while consuming the Kafka topic";
public static final int OFFSET_COMMIT_OFF = -1;
// TODO: turn off offset commit by default for now since we only have an in-memory store
private static final int KAFKASTORE_COMMIT_INTERVAL_MS_DEFAULT = OFFSET_COMMIT_OFF;

/**
* <code>advertised.host</code>
*/
public static final String ADVERTISED_HOST_CONFIG = "advertised.host";
protected static final String KAFKASTORE_CONNECTION_URL_DOC =
"Zookeeper url for the Kafka cluster";
protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC =
"Zookeeper session timeout";
protected static final String KAFKASTORE_TOPIC_DOC =
"The durable single partition topic that acts" +
"as the durable log for the data";
protected static final String KAFKASTORE_TIMEOUT_DOC =
"The timeout for an operation on the Kafka store";
protected static final String KAFKASTORE_COMMIT_INTERVAL_MS_DOC =
"The interval to commit offsets while consuming the Kafka topic";
protected static final String ADVERTISED_HOST_DOC = "The host name advertised in Zookeeper";

static {
Expand All @@ -95,14 +90,6 @@ public class SchemaRegistryConfig extends RestConfig {
ConfigDef.Importance.LOW, ADVERTISED_HOST_DOC);
}

private static String getDefaultHost() {
try {
return InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
throw new ConfigException("Unknown local hostname", e);
}
}

public SchemaRegistryConfig(Map<? extends Object, ? extends Object> props) {
super(props);
}
Expand All @@ -111,6 +98,14 @@ public SchemaRegistryConfig(String propsFile) throws RestConfigException {
this(getPropsFromFile(propsFile));
}

private static String getDefaultHost() {
try {
return InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
throw new ConfigException("Unknown local hostname", e);
}
}

public static void main(String[] args) {
System.out.println(config.toHtmlTable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import io.confluent.kafka.schemaregistry.rest.resources.RootResource;
import io.confluent.kafka.schemaregistry.rest.resources.SchemasResource;
import io.confluent.kafka.schemaregistry.rest.resources.TopicsResource;
import io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
Expand Down Expand Up @@ -55,7 +55,7 @@ public void setupResources(Configurable<?> config, SchemaRegistryConfig schemaRe
System.exit(1);
}
config.register(RootResource.class);
config.register(new TopicsResource(schemaRegistry));
config.register(new SubjectsResource(schemaRegistry));
config.register(SchemasResource.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ public String toString() {
@Override
public int compareTo(Schema that) {
int result = this.name.compareTo(that.name);
if (result != 0)
if (result != 0) {
return result;
}
result = this.version - that.version;
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.hibernate.validator.constraints.NotEmpty;

public class Topic {
public class Subject {

@NotEmpty
private String name;
Expand All @@ -29,19 +29,19 @@ public class Topic {
private String deprecation = "all";
private String validators = null;

public Topic(@JsonProperty("name") String name,
@JsonProperty("compatibility") String compatibility,
@JsonProperty("registration") String registration,
@JsonProperty("deprecation") String deprecation,
@JsonProperty("validators") String validators) {
public Subject(@JsonProperty("name") String name,
@JsonProperty("compatibility") String compatibility,
@JsonProperty("registration") String registration,
@JsonProperty("deprecation") String deprecation,
@JsonProperty("validators") String validators) {
this.name = name;
this.compatibility = compatibility;
this.registration = registration;
this.deprecation = deprecation;
this.validators = validators;
}

public Topic(@JsonProperty("name") String name) {
public Subject(@JsonProperty("name") String name) {
this.name = name;
}

Expand Down Expand Up @@ -104,21 +104,21 @@ public boolean equals(Object o) {
return false;
}

Topic topic = (Topic) o;
Subject subject = (Subject) o;

if (!name.equals(topic.name)) {
if (!name.equals(subject.name)) {
return false;
}
if (!this.compatibility.equals(topic.compatibility)) {
if (!this.compatibility.equals(subject.compatibility)) {
return false;
}
if (!this.registration.equals(topic.registration)) {
if (!this.registration.equals(subject.registration)) {
return false;
}
if (!this.deprecation.equals(topic.deprecation)) {
if (!this.deprecation.equals(subject.deprecation)) {
return false;
}
if (!this.validators.equals(topic.validators)) {
if (!this.validators.equals(subject.validators)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public class RegisterSchemaRequest {
@NotEmpty
private String schema;

public static RegisterSchemaRequest fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, RegisterSchemaRequest.class);
}

@JsonProperty("schema")
public String getSchema() {
return this.schema;
Expand Down Expand Up @@ -76,8 +80,4 @@ public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}

public static RegisterSchemaRequest fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, RegisterSchemaRequest.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public class RegisterSchemaResponse {
@NotEmpty
private int version;

public static RegisterSchemaResponse fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, RegisterSchemaResponse.class);
}

@JsonProperty("version")
public int getVersion() {
return version;
Expand All @@ -41,8 +45,4 @@ public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
}

public static RegisterSchemaResponse fromJson(String json) throws IOException {
return new ObjectMapper().readValue(json, RegisterSchemaResponse.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,23 @@ public class SchemasResource {
public final static String MESSAGE_SCHEMA_NOT_FOUND = "Schema not found.";
private static final Logger log = LoggerFactory.getLogger(SchemasResource.class);

private final String topic;
private final boolean isKey;
private final String subject;
private final SchemaRegistry schemaRegistry;

public SchemasResource(SchemaRegistry registry, String topic, boolean isKey) {
public SchemasResource(SchemaRegistry registry, String subject) {
this.schemaRegistry = registry;
this.topic = topic;
this.isKey = isKey;
this.subject = subject;
}

@GET
@Path("/{id}")
public Schema getSchema(@PathParam("id") Integer id) {
@Path("/{version}")
public Schema getSchema(@PathParam("version") Integer version) {
Schema schema = null;
try {
schema = schemaRegistry.get(this.topic, id);
schema = schemaRegistry.get(this.subject, version);
} catch (SchemaRegistryException e) {
log.debug("Error while retrieving schema with id " + id + " from the schema registry",
e);
log.debug("Error while retrieving schema for subject " + this.subject + " with version " +
version + " from the schema registry", e);
throw new NotFoundException(MESSAGE_SCHEMA_NOT_FOUND, e);
}
if (schema == null) {
Expand All @@ -88,7 +86,7 @@ public List<Integer> list() {
Iterator<Schema> allSchemasForThisTopic = null;
List<Integer> allVersions = new ArrayList<Integer>();
try {
allSchemasForThisTopic = schemaRegistry.getAllVersions(this.topic);
allSchemasForThisTopic = schemaRegistry.getAllVersions(this.subject);
} catch (SchemaRegistryException e) {
throw new ClientErrorException(Response.Status.INTERNAL_SERVER_ERROR, e);
}
Expand All @@ -103,17 +101,16 @@ public List<Integer> list() {
public void register(final @Suspended AsyncResponse asyncResponse,
final @HeaderParam("Content-Type") String contentType,
final @HeaderParam("Accept") String accept,
@PathParam("topic") String topicName, RegisterSchemaRequest request) {
@PathParam("subject") String subjectName, RegisterSchemaRequest request) {
Map<String, String> requestProperties = new HashMap<String, String>();
requestProperties.put("Content-Type", contentType);
requestProperties.put("Accept", accept);
RegisterSchemaForwardingAgent forwardingAgent =
new RegisterSchemaForwardingAgent(requestProperties, topicName, isKey, request);

Schema schema = new Schema(topicName, 0, request.getSchema(), false);
new RegisterSchemaForwardingAgent(requestProperties, subjectName, request);
Schema schema = new Schema(subjectName, 0, request.getSchema(), false);
int version = -1;
try {
version = schemaRegistry.register(topicName, schema, forwardingAgent);
version = schemaRegistry.register(subjectName, schema, forwardingAgent);
} catch (SchemaRegistryException e) {
throw new ClientErrorException(Response.Status.INTERNAL_SERVER_ERROR, e);
}
Expand Down
Loading