diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java new file mode 100644 index 0000000000000..2173617733a2a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v2; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.isNull; +import static org.apache.commons.lang.StringUtils.defaultIfEmpty; +import static org.apache.pulsar.common.util.Codec.decode; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import io.swagger.annotations.ApiOperation; +import java.time.Clock; +import java.util.Optional; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.Encoded; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.DeleteSchemaResponse; +import org.apache.pulsar.common.schema.GetSchemaResponse; +import org.apache.pulsar.common.schema.PostSchemaPayload; +import org.apache.pulsar.common.schema.PostSchemaResponse; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.schema.SchemaVersion; + +@Path("/schemas") +public class SchemasResource extends AdminResource { + + private final Clock clock; + + public SchemasResource() { + this(Clock.systemUTC()); + } + + @VisibleForTesting + public SchemasResource(Clock clock) { + super(); + this.clock = clock; + } + + @GET + @Path("/{property}/{namespace}/{topic}/schema") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get topic schema", response = GetSchemaResponse.class) + public void getSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + String schemaId = buildSchemaId(property, namespace, topic); + pulsar().getSchemaRegistryService().getSchema(schemaId) + .handle((schema, error) -> { + if (isNull(error)) { + response.resume( + Response.ok() + .encoding(MediaType.APPLICATION_JSON) + .entity(GetSchemaResponse.builder() + .version(schema.version) + .type(schema.schema.getType()) + .timestamp(schema.schema.getTimestamp()) + .data(new String(schema.schema.getData())) + .properties(schema.schema.getProps()) + .build() + ) + .build() + ); + } else { + response.resume(error); + } + return null; + }); + } + + @GET + @Path("/{property}/{namespace}/{topic}/schema/{version}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get topic schema") + public void getSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + @PathParam("version") @Encoded String version, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + String schemaId = buildSchemaId(property, namespace, topic); + SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes()); + pulsar().getSchemaRegistryService().getSchema(schemaId, v) + .handle((schema, error) -> { + if (isNull(error)) { + if (schema.schema.isDeleted()) { + response.resume(Response.noContent()); + } else { + response.resume( + Response.ok() + .encoding(MediaType.APPLICATION_JSON) + .entity(GetSchemaResponse.builder() + .version(schema.version) + .type(schema.schema.getType()) + .timestamp(schema.schema.getTimestamp()) + .data(new String(schema.schema.getData())) + .properties(schema.schema.getProps()) + .build() + ).build() + ); + } + } else { + response.resume(error); + } + return null; + }); + } + + @DELETE + @Path("/{property}/{namespace}/{topic}/schema") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete topic schema") + public void deleteSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + String schemaId = buildSchemaId(property, namespace, topic); + pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), "")) + .handle((version, error) -> { + if (isNull(error)) { + response.resume( + Response.ok().entity( + DeleteSchemaResponse.builder() + .version(version) + .build() + ).build() + ); + } else { + response.resume(error); + } + return null; + }); + } + + @POST + @Path("/{property}/{namespace}/{topic}/schema") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Post topic schema") + public void postSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + PostSchemaPayload payload, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + pulsar().getSchemaRegistryService().putSchemaIfAbsent( + buildSchemaId(property, namespace, topic), + SchemaData.builder() + .data(payload.getSchema().getBytes(Charsets.UTF_8)) + .isDeleted(false) + .timestamp(clock.millis()) + .type(SchemaType.valueOf(payload.getType())) + .user(defaultIfEmpty(clientAppId(), "")) + .build() + ).thenAccept(version -> + response.resume( + Response.accepted().entity( + PostSchemaResponse.builder() + .version(version) + .build() + ).build() + ) + ); + } + + private String buildSchemaId(String property, String namespace, String topic) { + return TopicName.get("persistent", property, namespace, topic).getSchemaName(); + } + + private void validateDestinationAndAdminOperation(String property, String namespace, String topic) { + TopicName destinationName = TopicName.get( + "persistent", property, namespace, decode(topic) + ); + + try { + validateAdminAccessOnProperty(destinationName.getProperty()); + validateTopicOwnership(destinationName, false); + } catch (RestException e) { + if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) { + throw new RestException(Response.Status.NOT_FOUND, "Not Found"); + } else { + throw e; + } + } + } + + private void validateDestinationExists(TopicName dn) { + try { + Optional topic = pulsar().getBrokerService().getTopicReference(dn.toString()); + checkArgument(topic.isPresent()); + } catch (Exception e) { + throw new RestException(Response.Status.NOT_FOUND, "Topic not found"); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 8ba8750e91239..37301210980a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -21,28 +21,29 @@ import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Lists.newArrayList; import static com.google.protobuf.ByteString.copyFrom; -import static java.util.Collections.emptyMap; import static java.util.Objects.isNull; import static java.util.Objects.nonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.validation.constraints.NotNull; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.zookeeper.CreateMode; @@ -55,10 +56,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage { private static final String SchemaPath = "/schemas"; private static final List Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + private static final byte[] LedgerPassword = "".getBytes(); private final PulsarService pulsar; private final ZooKeeper zooKeeper; private final ZooKeeperCache localZkCache; + private final ServiceConfiguration config; private BookKeeper bookKeeper; @VisibleForTesting @@ -66,6 +69,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { this.pulsar = pulsar; this.localZkCache = pulsar.getLocalZkCache(); this.zooKeeper = localZkCache.getZooKeeper(); + this.config = pulsar.getConfiguration(); } @VisibleForTesting @@ -79,6 +83,7 @@ public void init() throws KeeperException, InterruptedException { } } + @Override public void start() throws IOException { this.bookKeeper = pulsar.getBookKeeperClientFactory().create( pulsar.getConfiguration(), @@ -119,8 +124,7 @@ private CompletableFuture getSchema(String schemaId) { .thenApply(entry -> new StoredSchema( entry.getSchemaData().toByteArray(), - new LongSchemaVersion(schemaLocator.getInfo().getVersion()), - emptyMap() + new LongSchemaVersion(schemaLocator.getInfo().getVersion()) ) ); }); @@ -156,8 +160,7 @@ private CompletableFuture getSchema(String schemaId, long version) .thenApply(entry -> new StoredSchema( entry.getSchemaData().toByteArray(), - new LongSchemaVersion(version), - emptyMap() + new LongSchemaVersion(version) ) ); }); @@ -377,14 +380,19 @@ private CompletableFuture addEntry(LedgerHandle ledgerHandle, SchemaStorag @NotNull private CompletableFuture createLedger() { final CompletableFuture future = new CompletableFuture<>(); - bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{}, + bookKeeper.asyncCreateLedger( + config.getManagedLedgerDefaultEnsembleSize(), + config.getManagedLedgerDefaultWriteQuorum(), + config.getManagedLedgerDefaultAckQuorum(), + config.getManagedLedgerDigestType(), + LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { future.completeExceptionally(BKException.create(rc)); } else { future.complete(handle); } - }, null + }, null, Collections.emptyMap() ); return future; } @@ -392,7 +400,10 @@ private CompletableFuture createLedger() { @NotNull private CompletableFuture openLedger(Long ledgerId) { final CompletableFuture future = new CompletableFuture<>(); - bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{}, + bookKeeper.asyncOpenLedger( + ledgerId, + config.getManagedLedgerDigestType(), + LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { future.completeExceptionally(BKException.create(rc)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java index 69e736481616e..b9fa998007ef5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java @@ -35,9 +35,10 @@ static SchemaRegistryService create(PulsarService pulsar) { Object factoryInstance = storageClass.newInstance(); Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class); SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar); + schemaStorage.start(); return new SchemaRegistryServiceImpl(schemaStorage); } catch (Exception e) { - log.warn("Error when trying to create scehema registry storage: {}", e); + log.warn("Unable to create schema registry storage, defaulting to empty storage: {}", e); } return new DefaultSchemaRegistryService(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java index b0c80752b500f..c5c7f832ece64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java @@ -31,6 +31,8 @@ public interface SchemaStorage { SchemaVersion versionFromBytes(byte[] version); + void start() throws Exception; + void close() throws Exception; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java index f28a70797ccbe..fd2602bccb39b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java @@ -20,19 +20,16 @@ import com.google.common.base.MoreObjects; import java.util.Arrays; -import java.util.Map; import java.util.Objects; import org.apache.pulsar.common.schema.SchemaVersion; public class StoredSchema { public final byte[] data; public final SchemaVersion version; - public final Map metadata; - public StoredSchema(byte[] data, SchemaVersion version, Map metadata) { + StoredSchema(byte[] data, SchemaVersion version) { this.data = data; this.version = version; - this.metadata = metadata; } @Override @@ -45,14 +42,13 @@ public boolean equals(Object o) { } StoredSchema that = (StoredSchema) o; return Arrays.equals(data, that.data) && - Objects.equals(version, that.version) && - Objects.equals(metadata, that.metadata); + Objects.equals(version, that.version); } @Override public int hashCode() { - int result = Objects.hash(version, metadata); + int result = Objects.hash(version); result = 31 * result + Arrays.hashCode(data); return result; } @@ -62,7 +58,6 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("data", data) .add("version", version) - .add("metadata", metadata) .toString(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 89f49f009470a..b800bfabf0f59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -32,8 +32,13 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.net.URI; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -41,11 +46,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; - import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriInfo; - import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; import org.apache.bookkeeper.util.ZkUtils; @@ -56,6 +59,7 @@ import org.apache.pulsar.broker.admin.v1.PersistentTopics; import org.apache.pulsar.broker.admin.v1.Properties; import org.apache.pulsar.broker.admin.v1.ResourceQuotas; +import org.apache.pulsar.broker.admin.v2.SchemasResource; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.web.PulsarWebResource; @@ -83,13 +87,10 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - @Test public class AdminTest extends MockedPulsarServiceBaseTest { + private final String configClusterName = "use"; private ConfigurationCacheService configurationCache; - private Clusters clusters; private Properties properties; private Namespaces namespaces; @@ -97,9 +98,12 @@ public class AdminTest extends MockedPulsarServiceBaseTest { private Brokers brokers; private ResourceQuotas resourceQuotas; private BrokerStats brokerStats; - + private SchemasResource schemasResource; private Field uriField; - private final String configClusterName = "use"; + private Clock mockClock = Clock.fixed( + Instant.ofEpochSecond(365248800), + ZoneId.of("-05:00") + ); public AdminTest() { super(); @@ -184,6 +188,14 @@ public void setup() throws Exception { doReturn(mockZookKeeper).when(brokerStats).localZk(); doReturn(configurationCache.propertiesCache()).when(brokerStats).propertiesCache(); doReturn(configurationCache.policiesCache()).when(brokerStats).policiesCache(); + + schemasResource = spy(new SchemasResource(mockClock)); + schemasResource.setServletContext(new MockServletContext()); + schemasResource.setPulsar(pulsar); + doReturn(mockZookKeeper).when(schemasResource).globalZk(); + doReturn(mockZookKeeper).when(schemasResource).localZk(); + doReturn(configurationCache.propertiesCache()).when(schemasResource).propertiesCache(); + doReturn(configurationCache.policiesCache()).when(schemasResource).policiesCache(); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java index 346c52581b994..3a91fde179ff1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -18,10 +18,14 @@ */ package org.apache.pulsar.client.api; +import org.apache.pulsar.common.schema.SchemaInfo; + public interface Schema { byte[] encode(T message); T decode(byte[] bytes); + SchemaInfo getSchemaInfo(); + Schema IDENTITY = new Schema() { @Override public byte[] encode(byte[] message) { @@ -32,5 +36,10 @@ public byte[] encode(byte[] message) { public byte[] decode(byte[] bytes) { return bytes; } + + @Override + public SchemaInfo getSchemaInfo() { + return null; + } }; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 37904fdfcdfca..d89d8fbd5a1f1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -21,6 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.unix.Errors.NativeIoException; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; @@ -29,11 +37,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - import javax.net.ssl.SSLSession; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; @@ -63,28 +67,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.unix.Errors.NativeIoException; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Promise; - public class ClientCnx extends PulsarHandler { private final Authentication authentication; private State state; - private final ConcurrentLongHashMap>> pendingRequests = new ConcurrentLongHashMap<>( - 16, 1); - private final ConcurrentLongHashMap> pendingLookupRequests = new ConcurrentLongHashMap<>( - 16, 1); - private final ConcurrentLongHashMap> pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>( - 16, 1); - private final ConcurrentLongHashMap>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>( - 16, 1); + private final ConcurrentLongHashMap> pendingRequests = + new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap> pendingLookupRequests = + new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap> pendingGetLastMessageIdRequests = + new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap>> pendingGetTopicsRequests = + new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLongHashMap> producers = new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLongHashMap> consumers = new ConcurrentLongHashMap<>(16, 1); @@ -280,7 +275,7 @@ protected void handleSuccess(CommandSuccess success) { log.debug("{} Received success response from server: {}", ctx.channel(), success.getRequestId()); } long requestId = success.getRequestId(); - CompletableFuture> requestFuture = pendingRequests.remove(requestId); + CompletableFuture requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { requestFuture.complete(null); } else { @@ -313,9 +308,9 @@ protected void handleProducerSuccess(CommandProducerSuccess success) { success.getRequestId(), success.getProducerName()); } long requestId = success.getRequestId(); - CompletableFuture> requestFuture = pendingRequests.remove(requestId); + CompletableFuture requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(new ImmutablePair<>(success.getProducerName(), success.getLastSequenceId())); + requestFuture.complete(new ProducerResponse(success.getProducerName(), success.getLastSequenceId(), success.getSchemaVersion().toByteArray())); } else { log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } @@ -460,7 +455,7 @@ protected void handleError(CommandError error) { log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic", ctx.channel()); } - CompletableFuture> requestFuture = pendingRequests.remove(requestId); + CompletableFuture requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage())); } else { @@ -575,8 +570,8 @@ CompletableFuture connectionFuture() { return connectionFuture; } - CompletableFuture> sendRequestWithId(ByteBuf cmd, long requestId) { - CompletableFuture> future = new CompletableFuture<>(); + CompletableFuture sendRequestWithId(ByteBuf cmd, long requestId) { + CompletableFuture future = new CompletableFuture<>(); pendingRequests.put(requestId, future); ctx.writeAndFlush(cmd).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2a88bf0519e6f..a446dd4567631 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -469,7 +469,7 @@ public void connectionOpened(final ClientCnx cnx) { } ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, - consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue())); + consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), schema.getSchemaInfo()); if (startMessageIdData != null) { startMessageIdData.recycle(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 27757d1190caf..92a512de2b398 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -25,11 +25,13 @@ import static org.apache.pulsar.common.api.Commands.hasChecksum; import static org.apache.pulsar.common.api.Commands.readChecksum; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -106,6 +108,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private final Map metadata; + private Optional schemaVersion = Optional.empty(); + private final ConnectionHandler connectionHandler; @SuppressWarnings("rawtypes") @@ -288,6 +292,10 @@ public void sendAsync(Message message, SendCallback callback) { return; } + if (schemaVersion.isPresent()) { + msgMetadata.setSchemaVersion(ByteString.copyFrom(schemaVersion.get())); + } + try { synchronized (this) { long sequenceId; @@ -837,9 +845,10 @@ public void connectionOpened(final ClientCnx cnx) { cnx.sendRequestWithId( Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata), - requestId).thenAccept(pair -> { - String producerName = pair.getLeft(); - long lastSequenceId = pair.getRight(); + requestId).thenAccept(response -> { + String producerName = response.getProducerName(); + long lastSequenceId = response.getLastSequenceId(); + schemaVersion = Optional.ofNullable(response.getSchemaVersion()); // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java new file mode 100644 index 0000000000000..edb98f2b04755 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class ProducerResponse { + private String producerName; + private long lastSequenceId; + private byte[] schemaVersion; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 90dbf67b933fa..904eddda3a7ae 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.api; +import static com.google.protobuf.ByteString.copyFrom; import static com.google.protobuf.ByteString.copyFromUtf8; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum; @@ -31,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod; @@ -75,6 +77,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; @@ -310,12 +314,12 @@ public static ByteBufPair newSend(long producerId, long sequenceId, int numMessa public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, - true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest); + true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest, null); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) { + Map metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) { CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder(); subscribeBuilder.setTopic(topic); subscribeBuilder.setSubscription(subscription); @@ -332,6 +336,10 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu } subscribeBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata)); + if (null != schemaInfo) { + subscribeBuilder.setSchema(getSchema(schemaInfo)); + } + CommandSubscribe subscribe = subscribeBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe)); subscribeBuilder.recycle(); @@ -425,6 +433,41 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId, public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, boolean encrypted, Map metadata) { + return newProducer(topic, producerId, requestId, producerName, encrypted, metadata, null); + } + + private static PulsarApi.Schema.Type getSchemaType(SchemaType type) { + switch (type) { + case PROTOBUF: + return PulsarApi.Schema.Type.Protobuf; + case THRIFT: + return PulsarApi.Schema.Type.Thrift; + case AVRO: + return PulsarApi.Schema.Type.Avro; + case JSON: + return PulsarApi.Schema.Type.Json; + default: + return null; + } + } + + private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) { + return PulsarApi.Schema.newBuilder() + .setName(schemaInfo.getName()) + .setSchemaData(copyFrom(schemaInfo.getSchema())) + .setType(getSchemaType(schemaInfo.getType())) + .addAllProperties( + schemaInfo.getProperties().entrySet().stream().map(entry -> + PulsarApi.KeyValue.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build() + ).collect(Collectors.toList()) + ).build(); + } + + public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, + boolean encrypted, Map metadata, SchemaInfo schemaInfo) { CommandProducer.Builder producerBuilder = CommandProducer.newBuilder(); producerBuilder.setTopic(topic); producerBuilder.setProducerId(producerId); @@ -436,6 +479,10 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId, producerBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata)); + if (null != schemaInfo) { + producerBuilder.setSchema(getSchema(schemaInfo)); + } + CommandProducer producer = producerBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.PRODUCER).setProducer(producer)); producerBuilder.recycle(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 86892fab4a9ee..56f09fe10b85f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -266,10 +266,6 @@ public interface SchemaOrBuilder boolean hasName(); String getName(); - // required bytes version = 2; - boolean hasVersion(); - com.google.protobuf.ByteString getVersion(); - // required bytes schema_data = 3; boolean hasSchemaData(); com.google.protobuf.ByteString getSchemaData(); @@ -400,21 +396,11 @@ private com.google.protobuf.ByteString getNameBytes() { } } - // required bytes version = 2; - public static final int VERSION_FIELD_NUMBER = 2; - private com.google.protobuf.ByteString version_; - public boolean hasVersion() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public com.google.protobuf.ByteString getVersion() { - return version_; - } - // required bytes schema_data = 3; public static final int SCHEMA_DATA_FIELD_NUMBER = 3; private com.google.protobuf.ByteString schemaData_; public boolean hasSchemaData() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public com.google.protobuf.ByteString getSchemaData() { return schemaData_; @@ -424,7 +410,7 @@ public com.google.protobuf.ByteString getSchemaData() { public static final int TYPE_FIELD_NUMBER = 4; private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_; public boolean hasType() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type getType() { return type_; @@ -453,7 +439,6 @@ public org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder getPropert private void initFields() { name_ = ""; - version_ = com.google.protobuf.ByteString.EMPTY; schemaData_ = com.google.protobuf.ByteString.EMPTY; type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; properties_ = java.util.Collections.emptyList(); @@ -467,10 +452,6 @@ public final boolean isInitialized() { memoizedIsInitialized = 0; return false; } - if (!hasVersion()) { - memoizedIsInitialized = 0; - return false; - } if (!hasSchemaData()) { memoizedIsInitialized = 0; return false; @@ -501,12 +482,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr output.writeBytes(1, getNameBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, version_); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, schemaData_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { + if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeEnum(4, type_.getNumber()); } for (int i = 0; i < properties_.size(); i++) { @@ -525,14 +503,10 @@ public int getSerializedSize() { .computeBytesSize(1, getNameBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, version_); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, schemaData_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { + if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream .computeEnumSize(4, type_.getNumber()); } @@ -655,14 +629,12 @@ public Builder clear() { super.clear(); name_ = ""; bitField0_ = (bitField0_ & ~0x00000001); - version_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000002); schemaData_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); properties_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -703,18 +675,14 @@ public org.apache.pulsar.common.api.proto.PulsarApi.Schema buildPartial() { if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.version_ = version_; + result.schemaData_ = schemaData_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.schemaData_ = schemaData_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } result.type_ = type_; - if (((bitField0_ & 0x00000010) == 0x00000010)) { + if (((bitField0_ & 0x00000008) == 0x00000008)) { properties_ = java.util.Collections.unmodifiableList(properties_); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); } result.properties_ = properties_; result.bitField0_ = to_bitField0_; @@ -726,9 +694,6 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Schema oth if (other.hasName()) { setName(other.getName()); } - if (other.hasVersion()) { - setVersion(other.getVersion()); - } if (other.hasSchemaData()) { setSchemaData(other.getSchemaData()); } @@ -738,7 +703,7 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Schema oth if (!other.properties_.isEmpty()) { if (properties_.isEmpty()) { properties_ = other.properties_; - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); } else { ensurePropertiesIsMutable(); properties_.addAll(other.properties_); @@ -753,10 +718,6 @@ public final boolean isInitialized() { return false; } - if (!hasVersion()) { - - return false; - } if (!hasSchemaData()) { return false; @@ -801,13 +762,8 @@ public Builder mergeFrom( name_ = input.readBytes(); break; } - case 18: { - bitField0_ |= 0x00000002; - version_ = input.readBytes(); - break; - } case 26: { - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; schemaData_ = input.readBytes(); break; } @@ -815,7 +771,7 @@ public Builder mergeFrom( int rawValue = input.readEnum(); org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type value = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.valueOf(rawValue); if (value != null) { - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; type_ = value; } break; @@ -868,34 +824,10 @@ void setName(com.google.protobuf.ByteString value) { } - // required bytes version = 2; - private com.google.protobuf.ByteString version_ = com.google.protobuf.ByteString.EMPTY; - public boolean hasVersion() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public com.google.protobuf.ByteString getVersion() { - return version_; - } - public Builder setVersion(com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - version_ = value; - - return this; - } - public Builder clearVersion() { - bitField0_ = (bitField0_ & ~0x00000002); - version_ = getDefaultInstance().getVersion(); - - return this; - } - // required bytes schema_data = 3; private com.google.protobuf.ByteString schemaData_ = com.google.protobuf.ByteString.EMPTY; public boolean hasSchemaData() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public com.google.protobuf.ByteString getSchemaData() { return schemaData_; @@ -904,13 +836,13 @@ public Builder setSchemaData(com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; schemaData_ = value; return this; } public Builder clearSchemaData() { - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); schemaData_ = getDefaultInstance().getSchemaData(); return this; @@ -919,7 +851,7 @@ public Builder clearSchemaData() { // required .pulsar.proto.Schema.Type type = 4; private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; public boolean hasType() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type getType() { return type_; @@ -928,13 +860,13 @@ public Builder setType(org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; type_ = value; return this; } public Builder clearType() { - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; return this; @@ -944,9 +876,9 @@ public Builder clearType() { private java.util.List properties_ = java.util.Collections.emptyList(); private void ensurePropertiesIsMutable() { - if (!((bitField0_ & 0x00000010) == 0x00000010)) { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { properties_ = new java.util.ArrayList(properties_); - bitField0_ |= 0x00000010; + bitField0_ |= 0x00000008; } } @@ -1018,7 +950,7 @@ public Builder addAllProperties( } public Builder clearProperties() { properties_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java new file mode 100644 index 0000000000000..b9c47b2209597 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class DeleteSchemaResponse { + private SchemaVersion version; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java new file mode 100644 index 0000000000000..bc98b89a0d7bd --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class GetSchemaResponse { + private SchemaVersion version; + private SchemaType type; + private long timestamp; + private String data; + private Map properties; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java new file mode 100644 index 0000000000000..af04418ea759b --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import lombok.Data; + +@Data +public class PostSchemaPayload { + private String type; + private String schema; + private Map properties; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java new file mode 100644 index 0000000000000..b12db5de1ed79 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class PostSchemaResponse { + private SchemaVersion version; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java new file mode 100644 index 0000000000000..b32eba40aa97c --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import lombok.Data; + +@Data +public class SchemaInfo { + private String name; + private byte[] schema; + private SchemaType type; + private Map properties; +} diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 288a262e439a3..3514554b10125 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -31,10 +31,10 @@ message Schema { } required string name = 1; - required bytes version = 2; required bytes schema_data = 3; required Type type = 4; repeated KeyValue properties = 5; + } message MessageIdData {