Skip to content

Commit

Permalink
Renamed Schema.IDENTITY into Schema.BYTES (#1694)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Apr 30, 2018
1 parent e9a1b9a commit 1eff40e
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 31 deletions.
Expand Up @@ -103,7 +103,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY);
consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.BYTES);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
Expand Down
Expand Up @@ -851,7 +851,7 @@ private List<Message<byte[]>> getMessageFromHttpResponse(Response response) thro
}
}

return Collections.singletonList(new MessageImpl<byte[]>(msgId, properties, data, Schema.IDENTITY));
return Collections.singletonList(new MessageImpl<byte[]>(msgId, properties, data, Schema.BYTES));
} finally {
if (stream != null) {
stream.close();
Expand All @@ -876,7 +876,7 @@ private List<Message<byte[]>> getIndividualMsgsFromBatch(String msgId, byte[] da
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl<>(batchMsgId, properties, singleMessagePayload, Schema.IDENTITY));
ret.add(new MessageImpl<>(batchMsgId, properties, singleMessagePayload, Schema.BYTES));
} catch (Exception ex) {
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
}
Expand Down
Expand Up @@ -44,7 +44,7 @@ static <T> MessageBuilder<T> create(Schema<T> schema) {
}

static MessageBuilder<byte[]> create() {
return create(Schema.IDENTITY);
return create(Schema.BYTES);
}

/**
Expand Down
Expand Up @@ -18,7 +18,8 @@
*/
package org.apache.pulsar.client.api;

import org.apache.pulsar.client.api.schemas.StringSchema;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
Expand Down Expand Up @@ -51,22 +52,10 @@ public interface Schema<T> {
*/
SchemaInfo getSchemaInfo();

Schema<byte[]> IDENTITY = new Schema<byte[]>() {
@Override
public byte[] encode(byte[] message) {
return message;
}

@Override
public byte[] decode(byte[] bytes) {
return bytes;
}

@Override
public SchemaInfo getSchemaInfo() {
return null;
}
};
/**
* Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through.
*/
Schema<byte[]> BYTES = new BytesSchema();

/**
* Schema that can be used to encode/decode messages whose values are String. The payload is encoded with UTF-8.
Expand Down
Expand Up @@ -74,7 +74,7 @@ static MessageImpl<byte[]> create(MessageMetadata.Builder msgMetadataBuilder, By
msg.cnx = null;
msg.payload = Unpooled.wrappedBuffer(payload);
msg.properties = null;
msg.schema = Schema.IDENTITY;
msg.schema = Schema.BYTES;
return msg;
}

Expand Down
Expand Up @@ -144,7 +144,7 @@ public ClientConfigurationData getConfiguration() {

@Override
public ProducerBuilder<byte[]> newProducer() {
return new ProducerBuilderImpl<>(this, Schema.IDENTITY);
return new ProducerBuilderImpl<>(this, Schema.BYTES);
}

@Override
Expand All @@ -154,7 +154,7 @@ public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {

@Override
public ConsumerBuilder<byte[]> newConsumer() {
return new ConsumerBuilderImpl<>(this, Schema.IDENTITY);
return new ConsumerBuilderImpl<>(this, Schema.BYTES);
}

@Override
Expand All @@ -164,7 +164,7 @@ public <T> ConsumerBuilder<T> newConsumer(Schema<T> schema) {

@Override
public ReaderBuilder<byte[]> newReader() {
return new ReaderBuilderImpl<>(this, Schema.IDENTITY);
return new ReaderBuilderImpl<>(this, Schema.BYTES);
}

@Override
Expand Down Expand Up @@ -229,7 +229,7 @@ public CompletableFuture<Producer<byte[]>> createProducerAsync(final String topi
}

public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData conf) {
return createProducerAsync(conf, Schema.IDENTITY);
return createProducerAsync(conf, Schema.BYTES);
}

public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema) {
Expand Down Expand Up @@ -321,7 +321,7 @@ public CompletableFuture<Consumer<byte[]>> subscribeAsync(final String topic, fi
}

public CompletableFuture<Consumer<byte[]>> subscribeAsync(ConsumerConfigurationData<byte[]> conf) {
return subscribeAsync(conf, Schema.IDENTITY);
return subscribeAsync(conf, Schema.BYTES);
}

public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
Expand Down Expand Up @@ -417,7 +417,7 @@ private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConf
}

public CompletableFuture<Consumer<byte[]>> patternTopicSubscribeAsync(ConsumerConfigurationData<byte[]> conf) {
return patternTopicSubscribeAsync(conf, Schema.IDENTITY);
return patternTopicSubscribeAsync(conf, Schema.BYTES);
}

private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
Expand Down Expand Up @@ -495,7 +495,7 @@ public CompletableFuture<Reader<byte[]>> createReaderAsync(String topic, Message
}

public CompletableFuture<Reader<byte[]>> createReaderAsync(ReaderConfigurationData<byte[]> conf) {
return createReaderAsync(conf, Schema.IDENTITY);
return createReaderAsync(conf, Schema.BYTES);
}

public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
Expand Down
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;

public class BytesSchema implements Schema<byte[]> {
@Override
public byte[] encode(byte[] message) {
return message;
}

@Override
public byte[] decode(byte[] bytes) {
return bytes;
}

@Override
public SchemaInfo getSchemaInfo() {
return null;
}
}
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schemas;
package org.apache.pulsar.client.impl.schema;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.schemas.StringSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down

0 comments on commit 1eff40e

Please sign in to comment.