Skip to content

Commit

Permalink
Schema registry 4/N (#1381)
Browse files Browse the repository at this point in the history
* Schema Registry proto changes

* Infrastructure to store schemas

* A default schema registry implementation

* Add admin api for the schema registry

* Renumber schema fields

* Update Pulsar API with schema changes

* Revert field number change

* Fix merge conflict

* Fix broken merge

* DestinationName has been renamed to TopicName

* Address issues in review

* Add schema type back to proto definition

* Address comments regarding lombok usage

* Remove reserved future enum fields

* regenerate code from protobuf

* Remove unused code

* Add schema version to producer success message

* plumb schema through to producer

* Revert "Add schema version to producer success message"

This reverts commit e7e72f4.

* Revert "Revert "Add schema version to producer success message""

This reverts commit 7b902f6.

* Persist schema on producer connect

* Add principal to schema on publish

* Reformat function for readability

* Remove unused protoc profile

* Rename put on schema registry to putIfAbsent

* Reformat function for readability

* Remove unused protoc profile

* Rename put on schema registry to putIfAbsent

* fix compile errors from parent branch changes

* fix lombok tomfoolery on builder

* plumb hash through and allow lookup by data

* wip

* run tests

* wip: address review comments

* switch underscore to slash in schema name

* blah

* Get duplicate schema detection to work

* Fix protobuf version incompatibility

* fix merge issues

* Fix license headers

* Fix license headers

* Address review

* Fix webservice

* plumb schema from producer to server and back

* Plumb schema through subscriber

* Create and return schema via rest endpoint

* Make DELETE great again

* Clean up imports

* Move resource objects to common package

* Fix licenses

* Update error message for schema registry service

* Remove cruft

* Address review comments

- rename props to properties in GetSchemaResponse
- Use config for ledger parameters

* Address review comments

* Fix license headers

* deal with lombock stuff causing issues

* Resolve conflict
  • Loading branch information
mgodave authored and merlimat committed Apr 6, 2018
1 parent cc28dd0 commit 1eb8068
Show file tree
Hide file tree
Showing 19 changed files with 583 additions and 150 deletions.
@@ -0,0 +1,242 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v2;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
import static org.apache.pulsar.common.util.Codec.decode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import io.swagger.annotations.ApiOperation;
import java.time.Clock;
import java.util.Optional;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.schema.GetSchemaResponse;
import org.apache.pulsar.common.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.PostSchemaResponse;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;

@Path("/schemas")
public class SchemasResource extends AdminResource {

private final Clock clock;

public SchemasResource() {
this(Clock.systemUTC());
}

@VisibleForTesting
public SchemasResource(Clock clock) {
super();
this.clock = clock;
}

@GET
@Path("/{property}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get topic schema", response = GetSchemaResponse.class)
public void getSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

String schemaId = buildSchemaId(property, namespace, topic);
pulsar().getSchemaRegistryService().getSchema(schemaId)
.handle((schema, error) -> {
if (isNull(error)) {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
.version(schema.version)
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
.properties(schema.schema.getProps())
.build()
)
.build()
);
} else {
response.resume(error);
}
return null;
});
}

@GET
@Path("/{property}/{namespace}/{topic}/schema/{version}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get topic schema")
public void getSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@PathParam("version") @Encoded String version,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

String schemaId = buildSchemaId(property, namespace, topic);
SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes());
pulsar().getSchemaRegistryService().getSchema(schemaId, v)
.handle((schema, error) -> {
if (isNull(error)) {
if (schema.schema.isDeleted()) {
response.resume(Response.noContent());
} else {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
.version(schema.version)
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
.properties(schema.schema.getProps())
.build()
).build()
);
}
} else {
response.resume(error);
}
return null;
});
}

@DELETE
@Path("/{property}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Delete topic schema")
public void deleteSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

String schemaId = buildSchemaId(property, namespace, topic);
pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""))
.handle((version, error) -> {
if (isNull(error)) {
response.resume(
Response.ok().entity(
DeleteSchemaResponse.builder()
.version(version)
.build()
).build()
);
} else {
response.resume(error);
}
return null;
});
}

@POST
@Path("/{property}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Post topic schema")
public void postSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
PostSchemaPayload payload,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

pulsar().getSchemaRegistryService().putSchemaIfAbsent(
buildSchemaId(property, namespace, topic),
SchemaData.builder()
.data(payload.getSchema().getBytes(Charsets.UTF_8))
.isDeleted(false)
.timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.build()
).thenAccept(version ->
response.resume(
Response.accepted().entity(
PostSchemaResponse.builder()
.version(version)
.build()
).build()
)
);
}

private String buildSchemaId(String property, String namespace, String topic) {
return TopicName.get("persistent", property, namespace, topic).getSchemaName();
}

private void validateDestinationAndAdminOperation(String property, String namespace, String topic) {
TopicName destinationName = TopicName.get(
"persistent", property, namespace, decode(topic)
);

try {
validateAdminAccessOnProperty(destinationName.getProperty());
validateTopicOwnership(destinationName, false);
} catch (RestException e) {
if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
throw new RestException(Response.Status.NOT_FOUND, "Not Found");
} else {
throw e;
}
}
}

private void validateDestinationExists(TopicName dn) {
try {
Optional<Topic> topic = pulsar().getBrokerService().getTopicReference(dn.toString());
checkArgument(topic.isPresent());
} catch (Exception e) {
throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
}
}

}
Expand Up @@ -21,28 +21,29 @@
import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayList;
import static com.google.protobuf.ByteString.copyFrom; import static com.google.protobuf.ByteString.copyFrom;
import static java.util.Collections.emptyMap;
import static java.util.Objects.isNull; import static java.util.Objects.isNull;
import static java.util.Objects.nonNull; import static java.util.Objects.nonNull;
import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
Expand All @@ -55,17 +56,20 @@
public class BookkeeperSchemaStorage implements SchemaStorage { public class BookkeeperSchemaStorage implements SchemaStorage {
private static final String SchemaPath = "/schemas"; private static final String SchemaPath = "/schemas";
private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private static final byte[] LedgerPassword = "".getBytes();


private final PulsarService pulsar; private final PulsarService pulsar;
private final ZooKeeper zooKeeper; private final ZooKeeper zooKeeper;
private final ZooKeeperCache localZkCache; private final ZooKeeperCache localZkCache;
private final ServiceConfiguration config;
private BookKeeper bookKeeper; private BookKeeper bookKeeper;


@VisibleForTesting @VisibleForTesting
BookkeeperSchemaStorage(PulsarService pulsar) { BookkeeperSchemaStorage(PulsarService pulsar) {
this.pulsar = pulsar; this.pulsar = pulsar;
this.localZkCache = pulsar.getLocalZkCache(); this.localZkCache = pulsar.getLocalZkCache();
this.zooKeeper = localZkCache.getZooKeeper(); this.zooKeeper = localZkCache.getZooKeeper();
this.config = pulsar.getConfiguration();
} }


@VisibleForTesting @VisibleForTesting
Expand All @@ -79,6 +83,7 @@ public void init() throws KeeperException, InterruptedException {
} }
} }


@Override
public void start() throws IOException { public void start() throws IOException {
this.bookKeeper = pulsar.getBookKeeperClientFactory().create( this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
pulsar.getConfiguration(), pulsar.getConfiguration(),
Expand Down Expand Up @@ -119,8 +124,7 @@ private CompletableFuture<StoredSchema> getSchema(String schemaId) {
.thenApply(entry -> .thenApply(entry ->
new StoredSchema( new StoredSchema(
entry.getSchemaData().toByteArray(), entry.getSchemaData().toByteArray(),
new LongSchemaVersion(schemaLocator.getInfo().getVersion()), new LongSchemaVersion(schemaLocator.getInfo().getVersion())
emptyMap()
) )
); );
}); });
Expand Down Expand Up @@ -156,8 +160,7 @@ private CompletableFuture<StoredSchema> getSchema(String schemaId, long version)
.thenApply(entry -> .thenApply(entry ->
new StoredSchema( new StoredSchema(
entry.getSchemaData().toByteArray(), entry.getSchemaData().toByteArray(),
new LongSchemaVersion(version), new LongSchemaVersion(version)
emptyMap()
) )
); );
}); });
Expand Down Expand Up @@ -377,22 +380,30 @@ private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorag
@NotNull @NotNull
private CompletableFuture<LedgerHandle> createLedger() { private CompletableFuture<LedgerHandle> createLedger() {
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{}, bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
config.getManagedLedgerDefaultWriteQuorum(),
config.getManagedLedgerDefaultAckQuorum(),
config.getManagedLedgerDigestType(),
LedgerPassword,
(rc, handle, ctx) -> { (rc, handle, ctx) -> {
if (rc != BKException.Code.OK) { if (rc != BKException.Code.OK) {
future.completeExceptionally(BKException.create(rc)); future.completeExceptionally(BKException.create(rc));
} else { } else {
future.complete(handle); future.complete(handle);
} }
}, null }, null, Collections.emptyMap()
); );
return future; return future;
} }


@NotNull @NotNull
private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) { private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{}, bookKeeper.asyncOpenLedger(
ledgerId,
config.getManagedLedgerDigestType(),
LedgerPassword,
(rc, handle, ctx) -> { (rc, handle, ctx) -> {
if (rc != BKException.Code.OK) { if (rc != BKException.Code.OK) {
future.completeExceptionally(BKException.create(rc)); future.completeExceptionally(BKException.create(rc));
Expand Down
Expand Up @@ -35,9 +35,10 @@ static SchemaRegistryService create(PulsarService pulsar) {
Object factoryInstance = storageClass.newInstance(); Object factoryInstance = storageClass.newInstance();
Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class); Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class);
SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar); SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar);
schemaStorage.start();
return new SchemaRegistryServiceImpl(schemaStorage); return new SchemaRegistryServiceImpl(schemaStorage);
} catch (Exception e) { } catch (Exception e) {
log.warn("Error when trying to create scehema registry storage: {}", e); log.warn("Unable to create schema registry storage, defaulting to empty storage: {}", e);
} }
return new DefaultSchemaRegistryService(); return new DefaultSchemaRegistryService();
} }
Expand Down
Expand Up @@ -31,6 +31,8 @@ public interface SchemaStorage {


SchemaVersion versionFromBytes(byte[] version); SchemaVersion versionFromBytes(byte[] version);


void start() throws Exception;

void close() throws Exception; void close() throws Exception;


} }

0 comments on commit 1eb8068

Please sign in to comment.