Skip to content

Commit

Permalink
Included a type VersionId that validates version specified using the …
Browse files Browse the repository at this point in the history
…string latest or an integer between [1,2^31-1]. Updated documentation to reflect that. This is a follow-up to and fixes #48
  • Loading branch information
nehanarkhede committed Feb 2, 2015
1 parent c708b40 commit 5035ee0
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2015 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.kafka.schemaregistry.rest;

/**
* A valid version id should be a positive integer between 1 and 2^31-1.
* -1 is a special version id that indicates the "latest" version under
* a subject
*/
public class VersionId {

private final int version;

public VersionId(String version) {
if (version.trim().toLowerCase().equals("latest")) {
this.version = -1;
} else {
try {
this.version = Integer.valueOf(version.trim());
} catch (NumberFormatException nfe) {
throw new IllegalArgumentException(version + " is not a valid version. Valid values are "
+ "[1,2^31-1] both inclusive and \"latest\"");
}
if (this.version <= 0) {
throw new IllegalArgumentException(version + " is not a valid version. Valid values are "
+ "[1,2^31-1] both inclusive and \"latest\"");
}
}
}

public int getVersionId() {
return this.version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.CompatibilityCheckResponse;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
Expand All @@ -62,12 +63,12 @@ public CompatibilityResource(KafkaSchemaRegistry schemaRegistry) {
@POST
@Path("/subjects/{subject}/versions/{version}")
@PerformanceMetric("compatibility.subjects.versions.verify")
public void getSchemaUnderSubject(final @Suspended AsyncResponse asyncResponse,
final @HeaderParam("Content-Type") String contentType,
final @HeaderParam("Accept") String accept,
@PathParam("subject") String subject,
@PathParam("version") String version,
RegisterSchemaRequest request) {
public void lookUpSchemaUnderSubject(final @Suspended AsyncResponse asyncResponse,
final @HeaderParam("Content-Type") String contentType,
final @HeaderParam("Accept") String accept,
@PathParam("subject") String subject,
@PathParam("version") String version,
RegisterSchemaRequest request) {
// returns true if posted schema is compatible with the specified version. "latest" is
// a special version
Map<String, String> headerProperties = new HashMap<String, String>();
Expand All @@ -78,12 +79,8 @@ public void getSchemaUnderSubject(final @Suspended AsyncResponse asyncResponse,
CompatibilityCheckResponse compatibilityCheckResponse = new CompatibilityCheckResponse();
try {
Schema schemaForSpecifiedVersion = null;
if (version.trim().toLowerCase().equals("latest")) {
schemaForSpecifiedVersion = schemaRegistry.getLatestVersion(subject);
} else {
int versionId = Integer.valueOf(version.trim());
schemaForSpecifiedVersion = schemaRegistry.get(subject, versionId);
}
VersionId versionId = new VersionId(version);
schemaForSpecifiedVersion = schemaRegistry.get(subject, versionId.getVersionId());
if (schemaForSpecifiedVersion == null) {
if (version.trim().toLowerCase().equals("latest")) {
isCompatible = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
Expand Down Expand Up @@ -67,10 +68,11 @@ public SubjectVersionsResource(KafkaSchemaRegistry registry, String subject) {
@GET
@Path("/{version}")
@PerformanceMetric("subjects.versions.get-schema")
public Schema getSchema(@PathParam("version") Integer version) {
public Schema getSchema(@PathParam("version") String version) {
VersionId versionId = new VersionId(version);
Schema schema = null;
try {
schema = schemaRegistry.get(this.subject, version);
schema = schemaRegistry.get(this.subject, versionId.getVersionId());
} catch (SchemaRegistryException e) {
log.debug("Error while retrieving schema for subject " + this.subject + " with version " +
version + " from the schema registry", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,18 @@ private AvroSchema canonicalizeSchema(Schema schema) {

@Override
public Schema get(String subject, int version) throws SchemaRegistryException {
SchemaKey key = new SchemaKey(subject, version);
try {
Schema schema = (Schema) kafkaStore.get(key);
return schema;
} catch (StoreException e) {
throw new SchemaRegistryException(
"Error while retrieving schema from the backend Kafka" +
" store", e);
if (version == -1) {
return getLatestVersion(subject);
} else {
SchemaKey key = new SchemaKey(subject, version);
try {
Schema schema = (Schema) kafkaStore.get(key);
return schema;
} catch (StoreException e) {
throw new SchemaRegistryException(
"Error while retrieving schema from the backend Kafka" +
" store", e);
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ Schemas
1, 2, 3, 4
]

.. http:get:: /subjects/(string:subject)/versions/(int:version)
.. http:get:: /subjects/(string:subject)/versions/(versionId:version)
Get a specific version of the schema registered under this subject

:param string subject: Name of the subject
:param int version: Version of the schema to be returned
:param versionId version: Version of the schema to be returned. Valid values for versionId are between [1,2^31-1] or the string "latest". "latest" returns the last registered schema under the specified subject

:>json string name: Name of the subject that this schema is registered under
:>json int version: Version of the returned schema
Expand Down Expand Up @@ -309,13 +309,13 @@ Compatibility

The compatibility resource allows the user to test schemas for compatibility against specific versions of a subject's schema.

.. http:post:: /compatibility/subjects/(string:subject)/versions/(int:version)
.. http:post:: /compatibility/subjects/(string:subject)/versions/(versionId:version)
Test input schema against a particular version of a subject's schema for compatibility. Note that the compatibility level applied for the check is the configured compatibility level for the subject (``http:get:: /config/(string:subject)``). If this subject's compatibility level was never changed, then the global compatibility level applies (``http:get:: /config``).

:param string subject: Subject of the schema version against which compatibility is to be tested
:param int version: Version of the subject's schema against which compatibility is to be tested. -1 is a special value that indicates the current latest version of the subject's schema

:param versionId version: Version of the subject's schema against which compatibility is to be tested. Valid values for versionId are between [1,2^31-1] or the string "latest". "latest" checks compatibility of the input schema with the last registered schema under the specified subject
:>json boolean is_compatible: True, if compatible. False otherwise

:statuscode 404:
Expand All @@ -326,7 +326,7 @@ Test input schema against a particular version of a subject's schema for compati

.. sourcecode:: http

POST /compatibility/subjects/test/versions/-1 HTTP/1.1
POST /compatibility/subjects/test/versions/latest HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json

Expand Down

0 comments on commit 5035ee0

Please sign in to comment.