Skip to content

Commit

Permalink
AutoConsumeSchema: handle schema NONE as BYTES (apache#10277)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 12, 2021
1 parent 32e3bb8 commit c73cdae
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -392,7 +393,16 @@ public void testStringSchema() throws Exception {
}

@Test
public void testUseAutoConsumeWithSchemalessTopic() throws Exception {
public void testUseAutoConsumeWithBytesSchemaTopic() throws Exception {
testUseAutoConsumeWithSchemalessTopic(SchemaType.BYTES);
}

@Test
public void testUseAutoConsumeWithNoneSchemaTopic() throws Exception {
testUseAutoConsumeWithSchemalessTopic(SchemaType.NONE);
}

private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-schemaless";
Expand All @@ -409,6 +419,15 @@ public void testUseAutoConsumeWithSchemalessTopic() throws Exception {

admin.topics().createPartitionedTopic(topic, 2);

// set schema
SchemaInfo schemaInfo = SchemaInfo
.builder()
.schema(new byte[0])
.name("dummySchema")
.type(schema)
.build();
admin.schemas().createSchema(topic, schemaInfo);

Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
Expand All @@ -420,7 +439,7 @@ public void testUseAutoConsumeWithSchemalessTopic() throws Exception {
.subscribe();

// use GenericRecord even for primitive types
// it will be a PrimitiveRecord
// it will be a GenericObjectWrapper
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub3")
.topic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public static Schema<?> getSchema(SchemaInfo schemaInfo) {
case BOOLEAN:
return BooleanSchema.of();
case BYTES:
case NONE:
return BytesSchema.of();
case DATE:
return DateSchema.of();
Expand Down

0 comments on commit c73cdae

Please sign in to comment.