Skip to content

Commit

Permalink
[ISSUE apache#1097] Fix null pointer problem when consumption start t…
Browse files Browse the repository at this point in the history
…ime is null (apache#1098)
  • Loading branch information
ssssssnake committed Dec 20, 2021
1 parent 7e421d4 commit 66f9425
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.lang3.StringUtils;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
Expand Down Expand Up @@ -85,10 +87,14 @@ public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
try {
this.treeMapLock.readLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
if (!msgTreeMap.isEmpty()) {
String consumeStartTimeStamp = MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
if (StringUtils.isNotEmpty(consumeStartTimeStamp) && System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
} else {
break;
}
} else {

break;
}
} finally {
Expand Down

0 comments on commit 66f9425

Please sign in to comment.