Skip to content

Commit

Permalink
Add CLI commands for schema registry (#1944)
Browse files Browse the repository at this point in the history
* Add CLI commands for schema registry

* Rename commandline args

* Fix License Headers Issue

* - all schema structures used in rest apis should have default constructors
- change version to long in rest apis
- fix schema cli issues

* Add integration tests for Schema CLI

* Exclude schema example file from license check

* Exclude schema_example.conf from apache-rat:check
  • Loading branch information
mgodave authored and jerrypeng committed Jun 15, 2018
1 parent 4625667 commit 4e99ffe
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 13 deletions.
7 changes: 7 additions & 0 deletions conf/schema_example.conf
@@ -0,0 +1,7 @@
{
"type": "STRING",
"schema": "",
"properties": {
"key1" : "value1"
}
}
4 changes: 4 additions & 0 deletions pom.xml
Expand Up @@ -927,6 +927,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>**/ByteBufCodedOutputStream.java</exclude> <exclude>**/ByteBufCodedOutputStream.java</exclude>
<exclude>bin/proto/*</exclude> <exclude>bin/proto/*</exclude>
<exclude>**/*.patch</exclude> <exclude>**/*.patch</exclude>
<exclude>conf/schema_example.conf</exclude>
<exclude>data/**</exclude> <exclude>data/**</exclude>
<exclude>logs/**</exclude> <exclude>logs/**</exclude>
<exclude>**/*.versionsBackup</exclude> <exclude>**/*.versionsBackup</exclude>
Expand Down Expand Up @@ -1055,6 +1056,9 @@ flexible messaging model and an intuitive client API.</description>


<!-- Python requirements files --> <!-- Python requirements files -->
<exclude>**/requirements.txt</exclude> <exclude>**/requirements.txt</exclude>

<!-- Configuration Templates -->
<exclude>conf/schema_example.conf</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>
Expand Down
Expand Up @@ -27,7 +27,6 @@
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Clock; import java.time.Clock;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE; import javax.ws.rs.DELETE;
import javax.ws.rs.Encoded; import javax.ws.rs.Encoded;
Expand All @@ -42,6 +41,7 @@
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.DeleteSchemaResponse; import org.apache.pulsar.common.schema.DeleteSchemaResponse;
Expand All @@ -51,10 +51,14 @@
import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Path("/schemas") @Path("/schemas")
public class SchemasResource extends AdminResource { public class SchemasResource extends AdminResource {


private static final Logger log = LoggerFactory.getLogger(SchemasResource.class);

private final Clock clock; private final Clock clock;


public SchemasResource() { public SchemasResource() {
Expand All @@ -67,6 +71,14 @@ public SchemasResource(Clock clock) {
this.clock = clock; this.clock = clock;
} }


private long getLongSchemaVersion(SchemaVersion schemaVersion) {
if (schemaVersion instanceof LongSchemaVersion) {
return ((LongSchemaVersion) schemaVersion).getVersion();
} else {
return -1L;
}
}

@GET @GET
@Path("/{tenant}/{namespace}/{topic}/schema") @Path("/{tenant}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
Expand All @@ -86,13 +98,13 @@ public void getSchema(
if (isNull(schema)) { if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build()); response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) { } else if (schema.schema.isDeleted()) {
response.resume(Response.noContent().build()); response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else { } else {
response.resume( response.resume(
Response.ok() Response.ok()
.encoding(MediaType.APPLICATION_JSON) .encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder() .entity(GetSchemaResponse.builder()
.version(schema.version) .version(getLongSchemaVersion(schema.version))
.type(schema.schema.getType()) .type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp()) .timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData())) .data(new String(schema.schema.getData()))
Expand Down Expand Up @@ -132,13 +144,13 @@ public void getSchema(
if (isNull(schema)) { if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build()); response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) { } else if (schema.schema.isDeleted()) {
response.resume(Response.noContent().build()); response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else { } else {
response.resume( response.resume(
Response.ok() Response.ok()
.encoding(MediaType.APPLICATION_JSON) .encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder() .entity(GetSchemaResponse.builder()
.version(schema.version) .version(getLongSchemaVersion(schema.version))
.type(schema.schema.getType()) .type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp()) .timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData())) .data(new String(schema.schema.getData()))
Expand Down Expand Up @@ -173,7 +185,7 @@ public void deleteSchema(
response.resume( response.resume(
Response.ok().entity( Response.ok().entity(
DeleteSchemaResponse.builder() DeleteSchemaResponse.builder()
.version(version) .version(getLongSchemaVersion(version))
.build() .build()
).build() ).build()
); );
Expand Down
Expand Up @@ -23,10 +23,10 @@
import java.util.Objects; import java.util.Objects;
import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaVersion;


class LongSchemaVersion implements SchemaVersion { public class LongSchemaVersion implements SchemaVersion {
private final long version; private final long version;


LongSchemaVersion(long version) { public LongSchemaVersion(long version) {
this.version = version; this.version = version;
} }


Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.client.admin.internal.LookupImpl; import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.NamespacesImpl; import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl; import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl; import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl; private final String serviceUrl;
private final Lookup lookups; private final Lookup lookups;
private final Functions functions; private final Functions functions;
private final Schemas schemas;
protected final WebTarget root; protected final WebTarget root;
protected final Authentication auth; protected final Authentication auth;


Expand Down Expand Up @@ -183,6 +185,7 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
this.resourceQuotas = new ResourceQuotasImpl(root, auth); this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls); this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth); this.functions = new FunctionsImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
} }


/** /**
Expand Down Expand Up @@ -362,6 +365,13 @@ public ClientConfigurationData getClientConfigData() {
return clientConfigData; return clientConfigData;
} }


/**
* @return the schemas
*/
public Schemas schemas() {
return schemas;
}

/** /**
* Close the Pulsar admin client to release all the resources * Close the Pulsar admin client to release all the resources
*/ */
Expand Down
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin;

import org.apache.pulsar.common.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
* Admin interface on interacting with schemas.
*/
public interface Schemas {

/**
* Retrieve the latest schema of a topic.
*
* @param topic topic name, in fully qualified format
* @return latest schema
* @throws PulsarAdminException
*/
SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException;

/**
* Retrieve the schema of a topic at a given <tt>version</tt>.
*
* @param topic topic name, in fully qualified format
* @param version schema version
* @return the schema info at a given <tt>version</tt>
* @throws PulsarAdminException
*/
SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException;

/**
* Delete the schema associated with a given <tt>topic</tt>.
*
* @param topic topic name, in fully qualified format
* @throws PulsarAdminException
*/
void deleteSchema(String topic) throws PulsarAdminException;

/**
* Create a schema for a given <tt>topic</tt>.
*
* @param topic topic name, in fully qualified format
* @param schemaPayload schema payload
* @throws PulsarAdminException
*/
void createSchema(String topic, PostSchemaPayload schemaPayload) throws PulsarAdminException;

}
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal;

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.schema.GetSchemaResponse;
import org.apache.pulsar.common.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;

public class SchemasImpl extends BaseResource implements Schemas {

private final WebTarget target;

public SchemasImpl(WebTarget web, Authentication auth) {
super(auth);
this.target = web.path("/admin/v2/schemas");
}

@Override
public SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response = request(schemaPath(tn)).get(GetSchemaResponse.class);
SchemaInfo info = new SchemaInfo();
info.setSchema(response.getData().getBytes());
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response = request(schemaPath(tn).path(Long.toString(version))).get(GetSchemaResponse.class);
SchemaInfo info = new SchemaInfo();
info.setSchema(response.getData().getBytes());
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void deleteSchema(String topic) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
request(schemaPath(tn)).delete(DeleteSchemaResponse.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void createSchema(String topic, PostSchemaPayload payload) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
request(schemaPath(tn))
.post(Entity.json(payload), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

private WebTarget schemaPath(TopicName topicName) {
return target
.path(topicName.getTenant())
.path(topicName.getNamespacePortion())
.path(topicName.getEncodedLocalName())
.path("schema");
}
}

0 comments on commit 4e99ffe

Please sign in to comment.