From ab3caca169c35b2ce189d84c6b3834023bfb2e90 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 9 Apr 2021 19:29:00 +0200 Subject: [PATCH] GenericObject - support KeyValue in Message#getValue() (#10107) --- .../org/apache/pulsar/schema/SchemaTest.java | 48 +++++++++++++++++++ .../pulsar/client/impl/MessageImpl.java | 13 ++++- .../client/impl/schema/AutoConsumeSchema.java | 4 ++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index fba8108b34c02..56ca04c0f458b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -49,6 +49,8 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.AfterMethod; @@ -304,6 +306,52 @@ public void testUseAutoConsumeWithSchemalessTopic() throws Exception { consumer2.close(); } + @Test + public void testKeyValueSchema() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicName = "test-string-schema"; + + final String topic = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicName).toString(); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME)); + + admin.topics().createPartitionedTopic(topic, 2); + + Producer> producer = pulsarClient + .newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE)) + .topic(topic) + .create(); + + producer.send(new KeyValue<>("foo", 123)); + + Consumer> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE)) + .subscriptionName("test-sub") + .topic(topic) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .subscriptionName("test-sub2") + .topic(topic) + .subscribe(); + + producer.send(new KeyValue<>("foo", 123)); + + Message> message = consumer.receive(); + Message message2 = consumer2.receive(); + assertEquals(message.getValue(), message2.getValue().getNativeObject()); + + producer.close(); + consumer.close(); + consumer2.close(); + } + @Test public void testIsUsingAvroSchemaParser() { for (SchemaType value : SchemaType.values()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index ac387a43d97a7..f7d8cf91d7d29 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.EncryptionContext; @@ -306,8 +307,16 @@ public T getValue() { } } + private KeyValueSchema getKeyValueSchema() { + if (schema instanceof AutoConsumeSchema) { + return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema(); + } else { + return (KeyValueSchema) schema; + } + } + private T getKeyValueBySchemaVersion() { - KeyValueSchema kvSchema = (KeyValueSchema) schema; + KeyValueSchema kvSchema = getKeyValueSchema(); byte[] schemaVersion = getSchemaVersion(); if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { return (T) kvSchema.decode( @@ -319,7 +328,7 @@ private T getKeyValueBySchemaVersion() { } private T getKeyValue() { - KeyValueSchema kvSchema = (KeyValueSchema) schema; + KeyValueSchema kvSchema = getKeyValueSchema(); if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { return (T) kvSchema.decode( msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 6ab92adcad603..647a87b163eef 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -250,4 +250,8 @@ protected GenericRecord adapt(Object value, byte[] schemaVersion) { return GenericObjectWrapper.of(value, this.schema.getSchemaInfo().getType(), schemaVersion); } + + public Schema getInternalSchema() { + return schema; + } }