diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 6d0ce1224e5fd..2ac31fc7b1b50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -81,6 +81,12 @@ public void validate(byte[] message) { schemaMap.get(SchemaVersion.Latest).validate(message); } + public void validate(byte[] message, byte[] schemaVersion) { + SchemaVersion sv = getSchemaVersion(schemaVersion); + ensureSchemaInitialized(sv); + schemaMap.get(sv).validate(message); + } + @Override public byte[] encode(GenericRecord message) { throw new UnsupportedOperationException("AutoConsumeSchema is not intended to be used for encoding"); @@ -92,7 +98,7 @@ public boolean supportSchemaVersioning() { } public Schema atSchemaVersion(byte[] schemaVersion) { - SchemaVersion sv = BytesSchemaVersion.of(schemaVersion); + SchemaVersion sv = getSchemaVersion(schemaVersion); fetchSchemaIfNeeded(sv); ensureSchemaInitialized(sv); Schema topicVersionedSchema = schemaMap.get(sv); @@ -105,7 +111,7 @@ public Schema atSchemaVersion(byte[] schemaVersion) { @Override public GenericRecord decode(byte[] bytes, byte[] schemaVersion) { - SchemaVersion sv = BytesSchemaVersion.of(schemaVersion); + SchemaVersion sv = getSchemaVersion(schemaVersion); fetchSchemaIfNeeded(sv); ensureSchemaInitialized(sv); return adapt(schemaMap.get(sv).decode(bytes, schemaVersion), schemaVersion); @@ -114,8 +120,8 @@ public GenericRecord decode(byte[] bytes, byte[] schemaVersion) { @Override public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) { this.schemaInfoProvider = schemaInfoProvider; - if (schemaMap.containsKey(SchemaVersion.Latest)) { - schemaMap.get(SchemaVersion.Latest).setSchemaInfoProvider(schemaInfoProvider); + for (Schema schema : schemaMap.values()) { + schema.setSchemaInfoProvider(schemaInfoProvider); } } @@ -128,10 +134,7 @@ public SchemaInfo getSchemaInfo() { } public SchemaInfo getSchemaInfo(byte[] schemaVersion) { - if (schemaVersion == null) { - return Schema.BYTES.getSchemaInfo(); - } - SchemaVersion sv = BytesSchemaVersion.of(schemaVersion); + SchemaVersion sv = getSchemaVersion(schemaVersion); if (schemaMap.containsKey(sv)) { return schemaMap.get(sv).getSchemaInfo(); } @@ -251,7 +254,7 @@ protected GenericRecord adapt(Object value, byte[] schemaVersion) { if (value instanceof GenericRecord) { return (GenericRecord) value; } - BytesSchemaVersion sv = BytesSchemaVersion.of(schemaVersion); + SchemaVersion sv = getSchemaVersion(schemaVersion); if (!schemaMap.containsKey(sv)) { throw new IllegalStateException("Cannot decode a message without schema"); } @@ -267,7 +270,7 @@ public Schema getInternalSchema() { } public Schema getInternalSchema(byte[] schemaVersion) { - return schemaMap.get(BytesSchemaVersion.of(schemaVersion)); + return schemaMap.get(getSchemaVersion(schemaVersion)); } /** @@ -308,13 +311,26 @@ public void fetchSchemaIfNeeded(SchemaVersion schemaVersion) throws SchemaSerial } } + private static SchemaVersion getSchemaVersion(byte[] schemaVersion) { + if (schemaVersion != null) { + return BytesSchemaVersion.of(schemaVersion); + } + return BytesSchemaVersion.of(new byte[0]); + } + @Override public String toString() { - if (schemaMap.containsKey(SchemaVersion.Latest) - && schemaMap.get(SchemaVersion.Latest).getSchemaInfo() != null) { - return "AUTO_CONSUME(schematype=" + schemaMap.get(SchemaVersion.Latest).getSchemaInfo().getType() + ")"; - } else { + if (schemaMap.isEmpty()) { return "AUTO_CONSUME(uninitialized)"; } + StringBuilder sb = new StringBuilder("AUTO_CONSUME("); + for (Map.Entry> entry : schemaMap.entrySet()) { + sb.append("{schemaVersion=").append(entry.getKey()) + .append(",schemaType=").append(entry.getValue().getSchemaInfo().getType()) + .append("}"); + } + sb.append(")"); + return sb.toString(); } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java index 789565f2b81d6..5fa0da2ca33e8 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java @@ -20,6 +20,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.SchemaTestUtils; import org.apache.pulsar.common.schema.KeyValue; @@ -225,5 +226,4 @@ public void testSetKeyBytesEncodingTypeSeparated() { } } - } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchemaTest.java new file mode 100644 index 0000000000000..4ed2ffdbb7aeb --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchemaTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import java.nio.ByteBuffer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.junit.Assert; +import org.testng.annotations.Test; + +/** + * AutoConsumeSchema test. + */ +@Slf4j +public class AutoConsumeSchemaTest { + + @Test + public void decodeDataWithNullSchemaVersion() { + Schema autoConsumeSchema = new AutoConsumeSchema(); + byte[] bytes = "bytes data".getBytes(); + MessageImpl message = MessageImpl.create( + new MessageMetadata(), ByteBuffer.wrap(bytes), autoConsumeSchema); + Assert.assertNull(message.getSchemaVersion()); + GenericRecord genericRecord = message.getValue(); + Assert.assertEquals(genericRecord.getNativeObject(), bytes); + } + +}