Skip to content

Commit

Permalink
[fix][broker] Fix schema does not replicate successfully (#17049)
Browse files Browse the repository at this point in the history
### Motivation

#11441 supports replicate schema to remote clusters.
But there is a mistake that the returned schema state is incorrect.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770

Because the replicator used MessageImpl will not have the schema.
And this will cause the producer to skip the schema upload.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149

We should remove

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768

To return the correct schema state.

And then we should also provide the correct schema hash.
If the message is used by the replicator, the schema hash should
be based on the replicator schema. Otherwise, it should use based
on the schema of the message.

### Modification

- Fixed the incorrect returned schema state
- Provide the method for getting schema hash for MessageImpl
  • Loading branch information
codelipenghui committed Aug 18, 2022
1 parent e4dcf5a commit 7689133
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 37 deletions.
2 changes: 1 addition & 1 deletion build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function test_group_other() {
-Dexclude='**/ManagedLedgerTest.java,
**/OffloadersCacheTest.java
**/PrimitiveSchemaTest.java,
BlobStoreManagedLedgerOffloaderTest.java'
BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false

mvn_test -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
**/OffloadersCacheTest.java'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
Expand All @@ -70,6 +71,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -78,6 +80,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand Down Expand Up @@ -392,19 +395,24 @@ public void testReplicationWithSchema() throws Exception {
final String subName = "my-sub";

@Cleanup
Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();
@Cleanup
Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();
@Cleanup
Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();

List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);
admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest);
admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest);


for (int i = 0; i < 10; i++) {
producer.send(new Schemas.PersonOne(i));
}

Awaitility.await().untilAsserted(() -> {
assertTrue(admin1.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
assertTrue(admin2.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
assertTrue(admin3.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0);
});

@Cleanup
Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
Expand All @@ -424,8 +432,7 @@ public void testReplicationWithSchema() throws Exception {
.subscriptionName(subName)
.subscribe();

for (int i = 0; i < 3; i++) {
producers.get(i).send(new Schemas.PersonOne(i));
for (int i = 0; i < 10; i++) {
Message<Schemas.PersonOne> msg1 = consumer1.receive();
Message<Schemas.PersonOne> msg2 = consumer2.receive();
Message<Schemas.PersonOne> msg3 = consumer3.receive();
Expand Down Expand Up @@ -1395,15 +1402,21 @@ public void testReplicatorWithFailedAck() throws Exception {

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
.getNow(null).get();
MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2");

Awaitility.await().timeout(50, TimeUnit.SECONDS)
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
replicator.getState()));

assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor();

// Make sure all the data has replicated to the remote cluster before close the cursor.
Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition));

cursor.setState(State.Closed);

Field field = ManagedCursorImpl.class.getDeclaredField("state");
Expand All @@ -1412,22 +1425,16 @@ public void testReplicatorWithFailedAck() throws Exception {

producer1.produce(10);

Position deletedPos = cursor.getMarkDeletedPosition();
Position readPos = cursor.getReadPosition();

Awaitility.await().timeout(30, TimeUnit.SECONDS).until(
() -> cursor.getMarkDeletedPosition().getEntryId() != (cursor.getReadPosition().getEntryId() - 1));

assertNotEquals((readPos.getEntryId() - 1), deletedPos.getEntryId());
// The cursor is closed, so the mark delete position will not move forward.
assertEquals(cursor.getMarkDeletedPosition(), lastPosition);

field.set(cursor, State.Open);

Awaitility.await().timeout(30, TimeUnit.SECONDS).until(
() -> cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1));

deletedPos = cursor.getMarkDeletedPosition();
readPos = cursor.getReadPosition();
assertEquals((readPos.getEntryId() - 1), deletedPos.getEntryId());
() -> {
log.info("++++++++++++ {}, {}", cursor.getMarkDeletedPosition(), cursor.getReadPosition());
return cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1);
});
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -64,6 +65,8 @@ public class MessageImpl<T> implements Message<T> {
private ByteBuf payload;

private Schema<T> schema;

private SchemaHash schemaHash;
private SchemaInfo schemaInfoForReplicator;
private SchemaState schemaState = SchemaState.None;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();
Expand Down Expand Up @@ -91,6 +94,7 @@ public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, ByteBuffer
msg.payload = Unpooled.wrappedBuffer(payload);
msg.properties = null;
msg.schema = schema;
msg.schemaHash = SchemaHash.of(schema);
msg.uncompressedSize = payload.remaining();
return msg;
}
Expand Down Expand Up @@ -431,9 +435,14 @@ public SchemaInfo getSchemaInfo() {
return schema.getSchemaInfo();
}

public SchemaHash getSchemaHash() {
return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash;
}

public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
if (msgMetadata.hasReplicatedFrom()) {
this.schemaInfoForReplicator = schemaInfo;
this.schemaHash = SchemaHash.of(schemaInfo);
} else {
throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
}
Expand Down Expand Up @@ -763,9 +772,6 @@ int getUncompressedSize() {
}

SchemaState getSchemaState() {
if (getSchemaInfo() == null) {
return SchemaState.Ready;
}
return schemaState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,7 @@ private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e);
return false;
}
SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
byte[] schemaVersion = schemaCache.get(schemaHash);
byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
if (schemaVersion != null) {
msgMetadataBuilder.setSchemaVersion(schemaVersion);
msg.setSchemaState(MessageImpl.SchemaState.Ready);
Expand All @@ -726,8 +725,7 @@ private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
}

private boolean rePopulateMessageSchema(MessageImpl msg) {
SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
byte[] schemaVersion = schemaCache.get(schemaHash);
byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
if (schemaVersion == null) {
return false;
}
Expand Down Expand Up @@ -758,8 +756,7 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
// case, we should not cache the schema version so that the schema version of the message metadata will
// be null, instead of an empty array.
if (v.length != 0) {
SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
schemaCache.putIfAbsent(schemaHash, v);
schemaCache.putIfAbsent(msg.getSchemaHash(), v);
msg.getMessageBuilder().setSchemaVersion(v);
}
msg.setSchemaState(MessageImpl.SchemaState.Ready);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testGetStats() throws Exception {
//
// Code under tests is using CompletableFutures. Theses may hang indefinitely if code is broken.
// That's why a test timeout is defined.
@Test(timeOut = 5000)
@Test(timeOut = 10000)
public void testParallelSubscribeAsync() throws Exception {
String topicName = "parallel-subscribe-async-topic";
MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public static SchemaHash of(SchemaData schemaData) {
return of(schemaData.getData(), schemaData.getType());
}

private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
public static SchemaHash of(SchemaInfo schemaInfo) {
return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(),
schemaInfo == null ? null : schemaInfo.getType());
}

public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
}

Expand Down

0 comments on commit 7689133

Please sign in to comment.