diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/VersionId.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/VersionId.java new file mode 100644 index 00000000000..26522dc3040 --- /dev/null +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/VersionId.java @@ -0,0 +1,48 @@ +/* + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.rest; + +/** + * A valid version id should be a positive integer between 1 and 2^31-1. + * -1 is a special version id that indicates the "latest" version under + * a subject + */ +public class VersionId { + + private final int version; + + public VersionId(String version) { + if (version.trim().toLowerCase().equals("latest")) { + this.version = -1; + } else { + try { + this.version = Integer.valueOf(version.trim()); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException(version + " is not a valid version. Valid values are " + + "[1,2^31-1] both inclusive and \"latest\""); + } + if (this.version <= 0) { + throw new IllegalArgumentException(version + " is not a valid version. Valid values are " + + "[1,2^31-1] both inclusive and \"latest\""); + } + } + } + + public int getVersionId() { + return this.version; + } +} diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java index 536ea97c30e..13328b55efb 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/CompatibilityResource.java @@ -37,6 +37,7 @@ import io.confluent.kafka.schemaregistry.client.rest.Versions; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.CompatibilityCheckResponse; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; +import io.confluent.kafka.schemaregistry.rest.VersionId; import io.confluent.kafka.schemaregistry.rest.entities.Schema; import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException; @@ -62,12 +63,12 @@ public CompatibilityResource(KafkaSchemaRegistry schemaRegistry) { @POST @Path("/subjects/{subject}/versions/{version}") @PerformanceMetric("compatibility.subjects.versions.verify") - public void getSchemaUnderSubject(final @Suspended AsyncResponse asyncResponse, - final @HeaderParam("Content-Type") String contentType, - final @HeaderParam("Accept") String accept, - @PathParam("subject") String subject, - @PathParam("version") String version, - RegisterSchemaRequest request) { + public void lookUpSchemaUnderSubject(final @Suspended AsyncResponse asyncResponse, + final @HeaderParam("Content-Type") String contentType, + final @HeaderParam("Accept") String accept, + @PathParam("subject") String subject, + @PathParam("version") String version, + RegisterSchemaRequest request) { // returns true if posted schema is compatible with the specified version. "latest" is // a special version Map headerProperties = new HashMap(); @@ -78,12 +79,8 @@ public void getSchemaUnderSubject(final @Suspended AsyncResponse asyncResponse, CompatibilityCheckResponse compatibilityCheckResponse = new CompatibilityCheckResponse(); try { Schema schemaForSpecifiedVersion = null; - if (version.trim().toLowerCase().equals("latest")) { - schemaForSpecifiedVersion = schemaRegistry.getLatestVersion(subject); - } else { - int versionId = Integer.valueOf(version.trim()); - schemaForSpecifiedVersion = schemaRegistry.get(subject, versionId); - } + VersionId versionId = new VersionId(version); + schemaForSpecifiedVersion = schemaRegistry.get(subject, versionId.getVersionId()); if (schemaForSpecifiedVersion == null) { if (version.trim().toLowerCase().equals("latest")) { isCompatible = true; diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java index ef1421be71f..9ea6fbe872b 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java @@ -40,6 +40,7 @@ import io.confluent.kafka.schemaregistry.client.rest.Versions; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; +import io.confluent.kafka.schemaregistry.rest.VersionId; import io.confluent.kafka.schemaregistry.rest.entities.Schema; import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException; @@ -67,10 +68,11 @@ public SubjectVersionsResource(KafkaSchemaRegistry registry, String subject) { @GET @Path("/{version}") @PerformanceMetric("subjects.versions.get-schema") - public Schema getSchema(@PathParam("version") Integer version) { + public Schema getSchema(@PathParam("version") String version) { + VersionId versionId = new VersionId(version); Schema schema = null; try { - schema = schemaRegistry.get(this.subject, version); + schema = schemaRegistry.get(this.subject, versionId.getVersionId()); } catch (SchemaRegistryException e) { log.debug("Error while retrieving schema for subject " + this.subject + " with version " + version + " from the schema registry", e); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index 3c4d194dc43..c6618e6700b 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -398,14 +398,18 @@ private AvroSchema canonicalizeSchema(Schema schema) { @Override public Schema get(String subject, int version) throws SchemaRegistryException { - SchemaKey key = new SchemaKey(subject, version); - try { - Schema schema = (Schema) kafkaStore.get(key); - return schema; - } catch (StoreException e) { - throw new SchemaRegistryException( - "Error while retrieving schema from the backend Kafka" + - " store", e); + if (version == -1) { + return getLatestVersion(subject); + } else { + SchemaKey key = new SchemaKey(subject, version); + try { + Schema schema = (Schema) kafkaStore.get(key); + return schema; + } catch (StoreException e) { + throw new SchemaRegistryException( + "Error while retrieving schema from the backend Kafka" + + " store", e); + } } } diff --git a/docs/api.rst b/docs/api.rst index e46378a7c7a..87935ec910b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -158,12 +158,12 @@ Schemas 1, 2, 3, 4 ] -.. http:get:: /subjects/(string:subject)/versions/(int:version) +.. http:get:: /subjects/(string:subject)/versions/(versionId:version) Get a specific version of the schema registered under this subject :param string subject: Name of the subject - :param int version: Version of the schema to be returned + :param versionId version: Version of the schema to be returned. Valid values for versionId are between [1,2^31-1] or the string "latest". "latest" returns the last registered schema under the specified subject :>json string name: Name of the subject that this schema is registered under :>json int version: Version of the returned schema @@ -309,13 +309,13 @@ Compatibility The compatibility resource allows the user to test schemas for compatibility against specific versions of a subject's schema. -.. http:post:: /compatibility/subjects/(string:subject)/versions/(int:version) +.. http:post:: /compatibility/subjects/(string:subject)/versions/(versionId:version) Test input schema against a particular version of a subject's schema for compatibility. Note that the compatibility level applied for the check is the configured compatibility level for the subject (``http:get:: /config/(string:subject)``). If this subject's compatibility level was never changed, then the global compatibility level applies (``http:get:: /config``). :param string subject: Subject of the schema version against which compatibility is to be tested - :param int version: Version of the subject's schema against which compatibility is to be tested. -1 is a special value that indicates the current latest version of the subject's schema - + :param versionId version: Version of the subject's schema against which compatibility is to be tested. Valid values for versionId are between [1,2^31-1] or the string "latest". "latest" checks compatibility of the input schema with the last registered schema under the specified subject + :>json boolean is_compatible: True, if compatible. False otherwise :statuscode 404: @@ -326,7 +326,7 @@ Test input schema against a particular version of a subject's schema for compati .. sourcecode:: http - POST /compatibility/subjects/test/versions/-1 HTTP/1.1 + POST /compatibility/subjects/test/versions/latest HTTP/1.1 Host: schemaregistry.example.com Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json