Skip to content

Commit

Permalink
[Schema] Fix AutoConsumeSchema decode null schema version data (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 committed Jun 4, 2021
1 parent 0420520 commit d8567a8
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public void validate(byte[] message) {
schemaMap.get(SchemaVersion.Latest).validate(message);
}

public void validate(byte[] message, byte[] schemaVersion) {
SchemaVersion sv = getSchemaVersion(schemaVersion);
ensureSchemaInitialized(sv);
schemaMap.get(sv).validate(message);
}

@Override
public byte[] encode(GenericRecord message) {
throw new UnsupportedOperationException("AutoConsumeSchema is not intended to be used for encoding");
Expand All @@ -92,7 +98,7 @@ public boolean supportSchemaVersioning() {
}

public Schema<?> atSchemaVersion(byte[] schemaVersion) {
SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
SchemaVersion sv = getSchemaVersion(schemaVersion);
fetchSchemaIfNeeded(sv);
ensureSchemaInitialized(sv);
Schema<?> topicVersionedSchema = schemaMap.get(sv);
Expand All @@ -105,7 +111,7 @@ public Schema<?> atSchemaVersion(byte[] schemaVersion) {

@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
SchemaVersion sv = getSchemaVersion(schemaVersion);
fetchSchemaIfNeeded(sv);
ensureSchemaInitialized(sv);
return adapt(schemaMap.get(sv).decode(bytes, schemaVersion), schemaVersion);
Expand All @@ -114,8 +120,8 @@ public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
@Override
public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
this.schemaInfoProvider = schemaInfoProvider;
if (schemaMap.containsKey(SchemaVersion.Latest)) {
schemaMap.get(SchemaVersion.Latest).setSchemaInfoProvider(schemaInfoProvider);
for (Schema<?> schema : schemaMap.values()) {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
}

Expand All @@ -128,10 +134,7 @@ public SchemaInfo getSchemaInfo() {
}

public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
if (schemaVersion == null) {
return Schema.BYTES.getSchemaInfo();
}
SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
SchemaVersion sv = getSchemaVersion(schemaVersion);
if (schemaMap.containsKey(sv)) {
return schemaMap.get(sv).getSchemaInfo();
}
Expand Down Expand Up @@ -251,7 +254,7 @@ protected GenericRecord adapt(Object value, byte[] schemaVersion) {
if (value instanceof GenericRecord) {
return (GenericRecord) value;
}
BytesSchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
SchemaVersion sv = getSchemaVersion(schemaVersion);
if (!schemaMap.containsKey(sv)) {
throw new IllegalStateException("Cannot decode a message without schema");
}
Expand All @@ -267,7 +270,7 @@ public Schema<?> getInternalSchema() {
}

public Schema<?> getInternalSchema(byte[] schemaVersion) {
return schemaMap.get(BytesSchemaVersion.of(schemaVersion));
return schemaMap.get(getSchemaVersion(schemaVersion));
}

/**
Expand Down Expand Up @@ -308,13 +311,26 @@ public void fetchSchemaIfNeeded(SchemaVersion schemaVersion) throws SchemaSerial
}
}

private static SchemaVersion getSchemaVersion(byte[] schemaVersion) {
if (schemaVersion != null) {
return BytesSchemaVersion.of(schemaVersion);
}
return BytesSchemaVersion.of(new byte[0]);
}

@Override
public String toString() {
if (schemaMap.containsKey(SchemaVersion.Latest)
&& schemaMap.get(SchemaVersion.Latest).getSchemaInfo() != null) {
return "AUTO_CONSUME(schematype=" + schemaMap.get(SchemaVersion.Latest).getSchemaInfo().getType() + ")";
} else {
if (schemaMap.isEmpty()) {
return "AUTO_CONSUME(uninitialized)";
}
StringBuilder sb = new StringBuilder("AUTO_CONSUME(");
for (Map.Entry<SchemaVersion, Schema<?>> entry : schemaMap.entrySet()) {
sb.append("{schemaVersion=").append(entry.getKey())
.append(",schemaType=").append(entry.getValue().getSchemaInfo().getType())
.append("}");
}
sb.append(")");
return sb.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
import org.apache.pulsar.common.schema.KeyValue;
Expand Down Expand Up @@ -225,5 +226,4 @@ public void testSetKeyBytesEncodingTypeSeparated() {
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;

import java.nio.ByteBuffer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.junit.Assert;
import org.testng.annotations.Test;

/**
* AutoConsumeSchema test.
*/
@Slf4j
public class AutoConsumeSchemaTest {

@Test
public void decodeDataWithNullSchemaVersion() {
Schema<GenericRecord> autoConsumeSchema = new AutoConsumeSchema();
byte[] bytes = "bytes data".getBytes();
MessageImpl<GenericRecord> message = MessageImpl.create(
new MessageMetadata(), ByteBuffer.wrap(bytes), autoConsumeSchema);
Assert.assertNull(message.getSchemaVersion());
GenericRecord genericRecord = message.getValue();
Assert.assertEquals(genericRecord.getNativeObject(), bytes);
}

}

0 comments on commit d8567a8

Please sign in to comment.