Skip to content

Commit

Permalink
GenericObject - support KeyValue in Message#getValue() (apache#10107)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 12, 2021
1 parent 23b2859 commit ab3caca
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
Expand Up @@ -49,6 +49,8 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -304,6 +306,52 @@ public void testUseAutoConsumeWithSchemalessTopic() throws Exception {
consumer2.close();
}

@Test
public void testKeyValueSchema() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-string-schema";

final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();

admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));

admin.topics().createPartitionedTopic(topic, 2);

Producer<KeyValue<String, Integer>> producer = pulsarClient
.newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
.topic(topic)
.create();

producer.send(new KeyValue<>("foo", 123));

Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
.subscriptionName("test-sub")
.topic(topic)
.subscribe();

Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub2")
.topic(topic)
.subscribe();

producer.send(new KeyValue<>("foo", 123));

Message<KeyValue<String, Integer>> message = consumer.receive();
Message<GenericRecord> message2 = consumer2.receive();
assertEquals(message.getValue(), message2.getValue().getNativeObject());

producer.close();
consumer.close();
consumer2.close();
}

@Test
public void testIsUsingAvroSchemaParser() {
for (SchemaType value : SchemaType.values()) {
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
Expand Down Expand Up @@ -306,8 +307,16 @@ public T getValue() {
}
}

private KeyValueSchema getKeyValueSchema() {
if (schema instanceof AutoConsumeSchema) {
return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema();
} else {
return (KeyValueSchema) schema;
}
}

private T getKeyValueBySchemaVersion() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
KeyValueSchema kvSchema = getKeyValueSchema();
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(
Expand All @@ -319,7 +328,7 @@ private T getKeyValueBySchemaVersion() {
}

private T getKeyValue() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
KeyValueSchema kvSchema = getKeyValueSchema();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(
msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
Expand Down
Expand Up @@ -250,4 +250,8 @@ protected GenericRecord adapt(Object value, byte[] schemaVersion) {
return GenericObjectWrapper.of(value,
this.schema.getSchemaInfo().getType(), schemaVersion);
}

public Schema<?> getInternalSchema() {
return schema;
}
}

0 comments on commit ab3caca

Please sign in to comment.