-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Closed
Copy link
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Describe the bug
When consumer reconnects to broker due to channelInactive, the cnx will be null, but doTransactionAcknowledgeForResponse will call cnx().newAckForReceipt(cmd, requested); and then NullPointerException will be reported.
- connectionClosed
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
...
state.setState(State.Connecting);
....
}- doAcknowledge
if (getState() != State.Ready && getState() != State.Connecting) {
...
}
if (txn != null) {
return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
}
return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);- doTransactionAcknowledgeForResponse
...
return cnx().newAckForReceipt(cmd, requested);To Reproduce
Steps to reproduce the behavior:
- new a consumer
- restart broker
- use the consumer to ack message with a transaction.
- See error
Or you can use this test to reproduce the error.
public void testName() throws Exception {
String topic = NAMESPACE1 + "/test1";
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
for (int i = 0; i < 10; i++) {
producer.newMessage().value(Bytes.toBytes(i)).send();
}
Method method = ConsumerImpl.class.getDeclaredMethod("cnx");
method.setAccessible(true);
ClientCnx cnx = (ClientCnx) method.invoke(consumer);
Method method1 = ConsumerImpl.class.getDeclaredMethod("connectionClosed", ClientCnx.class);
method1.setAccessible(true);
method1.invoke(consumer, cnx);
for (int i = 0; i <10 ; i++) {
Message<byte[]> message = consumer.receive();
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
consumer.acknowledgeAsync(message.getMessageId(), transaction);
transaction.commit().get();
}
}Screenshots
The calling logic can be judged according to the log.
- channelInactive
- connectionClosed
- doAcknowledge
- doTransactionAcknowledgeForResponse
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
