Skip to content

Commit

Permalink
Merge 7455bdf into 573b22c
Browse files Browse the repository at this point in the history
  • Loading branch information
Jaskey committed Feb 14, 2017
2 parents 573b22c + 7455bdf commit 6c17fa0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ProcessQueue {
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();//subset of msgTreeMap, used when consume orderly
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
Expand Down Expand Up @@ -235,8 +235,8 @@ public void rollback() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.putAll(this.msgTreeMapTemp);
this.msgTreeMapTemp.clear();
this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
this.consumingMsgOrderlyTreeMap.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
Expand All @@ -249,9 +249,9 @@ public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
this.msgTreeMapTemp.clear();
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(this.consumingMsgOrderlyTreeMap.size() * (-1));
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
Expand All @@ -270,7 +270,7 @@ public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
this.msgTreeMapTemp.remove(msg.getQueueOffset());
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
Expand All @@ -293,7 +293,7 @@ public List<MessageExt> takeMessags(final int batchSize) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
msgTreeMapTemp.put(entry.getKey(), entry.getValue());
consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
Expand Down Expand Up @@ -332,7 +332,7 @@ public void clear() {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.clear();
this.msgTreeMapTemp.clear();
this.consumingMsgOrderlyTreeMap.clear();
this.msgCount.set(0);
this.queueOffsetMax = 0L;
} finally {
Expand Down Expand Up @@ -389,10 +389,10 @@ public void fillProcessQueueInfo(final ProcessQueueInfo info) {
info.setCachedMsgCount(this.msgTreeMap.size());
}

if (!this.msgTreeMapTemp.isEmpty()) {
info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
info.setTransactionMsgCount(this.msgTreeMapTemp.size());
if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());
info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey());
info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size());
}

info.setLocked(this.locked);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ private SendResult sendDefaultImpl(//
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (tmpmq != null) {
mq = tmpmq;
MessageQueue mqAttempt = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqAttempt != null) {
mq = mqAttempt;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
Expand Down

0 comments on commit 6c17fa0

Please sign in to comment.