Skip to content

Commit

Permalink
[Java Client] Fix messages sent by producers without schema cannot be…
Browse files Browse the repository at this point in the history
… decoded (#15622)

### Motivation

When I tried to consume a topic via a consumer with Avro schema while
the topic was produced by a producer without schema, the consumption
failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion`
doesn't check if `schemaVersion` is an empty byte array. If yes, a
`BytesSchemaVersion` of an empty array will be passed to `cache.get` and
then passed to `loadSchema`.

https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98

However, `LookupService#getSchema` cannot accept an empty byte array as
the version, so `loadSchema` failed.

The root cause is that the schema version was set unexpectly when
messages were sent by a producer without schema. At broker side, the
returned schema version is never null. If the schema version was an
empty array, then it means the message doesn't have schema. However, at
Java client side, the empty byte array is treated as an existing schema
and the schema version field will be set. When consumer receives the
message, it will try to load schema whose version is an empty array.

### Modifications

- When a producer receives a response whose schema version is an empty
  byte array, just ignore it.
- Make `MesasgeImpl#getSchemaVersion` return null if the schema version
  is an empty byte array so that the consumer can consume messages
  produced by older version producers without schema. And return the
  internal schema for `getRegetReaderSchema` when `getSchemaVersion`
  returns null.
- Fix the existing tests. Since producer without schema won't set the
  `schema_version` field after this patch, some tests that rely on the
  precise stats should be modified.
- Add `testConsumeAvroMessagesWithoutSchema` to cover the case that
  messages without schema are compatible with the schema.

This patch also modifies the existing behavior when
`schemaValidationEnforced` is false and messages are produced by a
producer without schema and consumed by a consumer with schema.

1. If the message is incompatible with the schema
   - Before: `getSchemaVersion` returns an empty array and `getValue`
     fails with `SerializationException`:

     > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY

   - After: `getSchemaVersion` returns `null` and `getValue` fails with
     `SchemaSerializationException`.

2. Otherwise (the message is compatible with the schema)
   - Before: `getSchemaVersion` returns an empty array and `getValue`
     fails with `SerializationException`.
   - After: `getSchemaVersion` returns `null` and `getValue` returns the
     correctly decoded object.

(cherry picked from commit ecd275d)
  • Loading branch information
BewareMyPower authored and codelipenghui committed May 20, 2022
1 parent 08219ae commit a4e6e2e
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ public void testPreciseBacklog() throws PulsarClientException, PulsarAdminExcept
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);

topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
consumer.acknowledge(message);

Expand Down Expand Up @@ -1545,7 +1545,7 @@ public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException

topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
}

@Test(timeOut = 30000)
Expand Down Expand Up @@ -1584,7 +1584,7 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti

TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 470);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);

for (int i = 0; i < 5; i++) {
Expand All @@ -1594,7 +1594,7 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 238);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ public void testGetBacklogSizeByMessageId() throws Exception{
completableFuture = batchProducer.sendAsync("a".getBytes());
}
completableFuture.get();
Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(350L));
Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(320L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ private void verifyRGProdConsStats(String[] topicStrings,

log.debug("verifyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size());

// Pulsar runtime adds some additional bytes in the exchanges: a 45-byte per-message
// Pulsar runtime adds some additional bytes in the exchanges: a 42-byte per-message
// metadata of some kind, plus more as the number of messages increases.
// Hence the ">=" assertion with ExpectedNumBytesSent/Received in the following checks.
final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
Expand Down Expand Up @@ -769,8 +769,8 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
Assert.assertNotEquals(ninthPercentileValue, 0);
}

// Empirically, there appears to be a 45-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 45;
// Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 42;

private static final int PUBLISH_INTERVAL_SECS = 10;
private static final int NUM_PRODUCERS = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,7 @@ public void testCompaction() throws Exception {
assertEquals(cm.get(0).value, 10);
cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, 870);
assertEquals(cm.get(0).value, 840);

pulsarClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand All @@ -62,6 +61,7 @@
import org.testng.annotations.Test;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -305,7 +305,13 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex
+ " if SchemaValidationEnabled is enabled");
}
Message<V2Data> msg3 = c.receive();
Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
assertNull(msg3.getSchemaVersion());
try {
msg3.getValue();
fail("Schema should be incompatible");
} catch (SchemaSerializationException e) {
assertTrue(e.getCause() instanceof EOFException);
}
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
Expand Down Expand Up @@ -366,7 +372,13 @@ public void newNativeAvroProducerForMessageSchemaOnTopicWithMultiVersionSchema()
+ " if SchemaValidationEnabled is enabled");
}
Message<V2Data> msg3 = c.receive();
Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
assertNull(msg3.getSchemaVersion());
try {
msg3.getValue();
fail("Schema should be incompatible");
} catch (SchemaSerializationException e) {
assertTrue(e.getCause() instanceof EOFException);
}
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
Expand Down Expand Up @@ -1253,4 +1265,38 @@ public void testAutoKeyValueConsumeGenericObjectNullValues(KeyValueEncodingType

}
}

@Test
public void testConsumeAvroMessagesWithoutSchema() throws Exception {
if (schemaValidationEnforced) {
return;
}
final String topic = "test-consume-avro-messages-without-schema-" + UUID.randomUUID();
final Schema<V1Data> schema = Schema.AVRO(V1Data.class);
final Consumer<V1Data> consumer = pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

final int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
producer.send(schema.encode(new V1Data(i)));
}

for (int i = 0; i < numMessages; i++) {
final Message<V1Data> msg = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(msg);
log.info("Received {} from {}", msg.getValue().i, topic);
assertEquals(msg.getValue().i, i);
assertEquals(msg.getReaderSchema().orElse(Schema.BYTES).getSchemaInfo(), schema.getSchemaInfo());
consumer.acknowledge(msg);
}

producer.close();
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;

import com.google.common.base.Throwables;
import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -1119,18 +1118,18 @@ public void testAvroSchemaWithHttpLookup() throws Exception {
stopBroker();
isTcpLookup = false;
setup();
testEmptySchema();
testIncompatibleSchema();
}

@Test
public void testAvroSchemaWithTcpLookup() throws Exception {
stopBroker();
isTcpLookup = true;
setup();
testEmptySchema();
testIncompatibleSchema();
}

private void testEmptySchema() throws Exception {
private void testIncompatibleSchema() throws Exception {
final String namespace = "test-namespace-" + randomName(16);
String ns = PUBLIC_TENANT + "/" + namespace;
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
Expand Down Expand Up @@ -1164,12 +1163,14 @@ private void testEmptySchema() throws Exception {
producer.send("test".getBytes(StandardCharsets.UTF_8));
Message<User> message1 = consumer.receive();
Assert.assertEquals(test, message1.getValue());
Message<User> message2 = consumer.receive();
try {
Message<User> message2 = consumer.receive();
message2.getValue();
} catch (Throwable ex) {
Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
} catch (SchemaSerializationException e) {
final String schemaString =
new String(Schema.AVRO(User.class).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
Assert.assertTrue(e.getMessage().contains(schemaString));
Assert.assertTrue(e.getMessage().contains("payload (4 bytes)"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
Expand All @@ -40,6 +41,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.schema.AbstractSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
Expand Down Expand Up @@ -382,23 +384,28 @@ public Optional<Schema<?>> getReaderSchema() {
if (schema == null) {
return Optional.empty();
}
byte[] schemaVersion = getSchemaVersion();
if (schemaVersion == null) {
return Optional.of(schema);
}
if (schema instanceof AutoConsumeSchema) {
byte[] schemaVersion = getSchemaVersion();
return Optional.of(((AutoConsumeSchema) schema)
.atSchemaVersion(schemaVersion));
} else if (schema instanceof AbstractSchema) {
byte[] schemaVersion = getSchemaVersion();
return Optional.of(((AbstractSchema<?>) schema)
.atSchemaVersion(schemaVersion));
} else {
return Optional.of(schema);
}
}

// For messages produced by older version producers without schema, the schema version is an empty byte array
// rather than null.
@Override
public byte[] getSchemaVersion() {
if (msgMetadata.hasSchemaVersion()) {
return msgMetadata.getSchemaVersion();
byte[] schemaVersion = msgMetadata.getSchemaVersion();
return (schemaVersion.length == 0) ? null : schemaVersion;
} else {
return null;
}
Expand Down Expand Up @@ -464,8 +471,19 @@ private KeyValueSchemaImpl getKeyValueSchema() {
}
}


private T decode(byte[] schemaVersion) {
try {
return decodeBySchema(schemaVersion);
} catch (ArrayIndexOutOfBoundsException e) {
// It usually means the message was produced without schema check while the message is not compatible with
// the current schema. Therefore, convert it to SchemaSerializationException with a better description.
final int payloadSize = payload.readableBytes();
throw new SchemaSerializationException("payload (" + payloadSize + " bytes) cannot be decoded with schema "
+ new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
}
}

private T decodeBySchema(byte[] schemaVersion) {
T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null;
if (value != null) {
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,14 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
}
} else {
log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName);
SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
schemaCache.putIfAbsent(schemaHash, v);
msg.getMessageBuilder().setSchemaVersion(v);
// In broker, if schema version is an empty byte array, it means the topic doesn't have schema. In this
// case, we should not cache the schema version so that the schema version of the message metadata will
// be null, instead of an empty array.
if (v.length != 0) {
SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
schemaCache.putIfAbsent(schemaHash, v);
msg.getMessageBuilder().setSchemaVersion(v);
}
msg.setSchemaState(MessageImpl.SchemaState.Ready);
}
cnx.ctx().channel().eventLoop().execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,24 @@

import java.util.Optional;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;

@Data
@Getter
@AllArgsConstructor
public class ProducerResponse {
private String producerName;
private long lastSequenceId;
private byte[] schemaVersion;

private Optional<Long> topicEpoch;

// Shadow the default getter generated by lombok. In broker, if the schema version is an empty byte array, it means
// the topic doesn't have schema.
public byte[] getSchemaVersion() {
if (schemaVersion != null && schemaVersion.length != 0) {
return schemaVersion;
} else {
return null;
}
}
}

0 comments on commit a4e6e2e

Please sign in to comment.