Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
Expand Down Expand Up @@ -1107,7 +1108,7 @@ public long headSlowTimeMills(BlockingQueue<Runnable> q) {
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
slowTimeMills = rt == null ? 0 : SystemClock.elapsedMillis(rt.getCreateNano());
}

if (slowTimeMills < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
Expand Down Expand Up @@ -87,7 +88,7 @@ private void cleanExpiredRequest() {
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format(
"[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, "
+ "size of queue: %d",
System.currentTimeMillis() - rt.getCreateTimestamp(),
SystemClock.elapsedMillis(rt.getCreateNano()),
this.brokerController.getSendThreadPoolQueue().size()));
}
} else {
Expand Down Expand Up @@ -129,7 +130,7 @@ void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, fin
break;
}

final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
final long behind = SystemClock.elapsedMillis(rt.getCreateNano());
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
Expand Down
16 changes: 16 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/SystemClock.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,20 @@ public class SystemClock {
public long now() {
return System.currentTimeMillis();
}

public long nano() {
return System.nanoTime();
}

private static long nanoToMillis(long nano) {
return nano / 1000_000L;
}

public static long elapsedMillis(long startNano) {
long diff = System.nanoTime() - startNano;
if (diff < 0) {
diff = 0;
}
return nanoToMillis(diff);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
Expand Down Expand Up @@ -202,7 +203,7 @@ private long headSlowTimeMills(BlockingQueue<Runnable> q) {
if (firstRunnable instanceof FutureTaskExt) {
final Runnable inner = ((FutureTaskExt<?>) firstRunnable).getRunnable();
if (inner instanceof RequestTask) {
slowTimeMills = System.currentTimeMillis() - ((RequestTask) inner).getCreateTimestamp();
slowTimeMills = SystemClock.elapsedMillis(((RequestTask) inner).getCreateNano());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
Expand Down Expand Up @@ -293,7 +294,7 @@ protected long headSlowTimeMills(BlockingQueue<Runnable> q) {
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = castRunnable(peek);
slowTimeMills = rt == null ? 0 : System.currentTimeMillis() - rt.getCreateTimestamp();
slowTimeMills = rt == null ? 0 : SystemClock.elapsedMillis(rt.getCreateNano());
}

if (slowTimeMills < 0) {
Expand Down Expand Up @@ -332,7 +333,7 @@ protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor,
break;
}

final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
final long behind = SystemClock.elapsedMillis(rt.getCreateNano());
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class RequestTask implements Runnable {
private final RemotingCommand request;
private volatile boolean stopRun = false;

private final long createNano = System.nanoTime();
public RequestTask(final Runnable runnable, final Channel channel, final RemotingCommand request) {
this.runnable = runnable;
this.channel = channel;
Expand All @@ -40,6 +41,7 @@ public int hashCode() {
result = 31 * result + (channel != null ? channel.hashCode() : 0);
result = 31 * result + (request != null ? request.hashCode() : 0);
result = 31 * result + (isStopRun() ? 1 : 0);
result = 31 * result + (int) (getCreateNano() ^ (getCreateNano() >>> 32));
return result;
}

Expand All @@ -54,6 +56,8 @@ public boolean equals(final Object o) {

if (getCreateTimestamp() != that.getCreateTimestamp())
return false;
if (getCreateNano() != that.getCreateNano())
return false;
if (isStopRun() != that.isStopRun())
return false;
if (channel != null ? !channel.equals(that.channel) : that.channel != null)
Expand All @@ -66,6 +70,10 @@ public long getCreateTimestamp() {
return createTimestamp;
}

public long getCreateNano() {
return createNano;
}

public boolean isStopRun() {
return stopRun;
}
Expand Down
32 changes: 26 additions & 6 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
Expand Down Expand Up @@ -74,7 +76,7 @@ public class CommitLog implements Swappable {
protected volatile long confirmOffset = -1L;

private volatile long beginTimeInLock = 0;

private volatile long beginNanoInLock = 0L;
protected final PutMessageLock putMessageLock;

protected final TopicQueueLock topicQueueLock;
Expand Down Expand Up @@ -727,6 +729,10 @@ public long getBeginTimeInLock() {
return beginTimeInLock;
}

public long getBeginNanoInLock() {
return beginNanoInLock;
}

public String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
keyBuilder.setLength(0);
keyBuilder.append(messageExt.getTopic());
Expand Down Expand Up @@ -837,7 +843,9 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
long beginLockNano = this.defaultMessageStore.getSystemClock().nano();
this.beginTimeInLock = beginLockTimestamp;
this.beginNanoInLock = beginLockNano;

// Here settings are stored timestamp, in order to ensure an orderly
// global
Expand All @@ -851,6 +859,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}

Expand All @@ -868,6 +877,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
Expand All @@ -878,17 +888,21 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}

elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
elapsedTimeInLock = SystemClock.elapsedMillis(beginLockNano);
beginTimeInLock = 0;
beginNanoInLock = 0;
} finally {
putMessageLock.unlock();
}
Expand Down Expand Up @@ -995,7 +1009,9 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
long beginLockNano = this.defaultMessageStore.getSystemClock().nano();
this.beginTimeInLock = beginLockTimestamp;
this.beginNanoInLock = beginLockNano;

// Here settings are stored timestamp, in order to ensure an orderly
// global
Expand All @@ -1007,6 +1023,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
beginLockNano = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}

Expand All @@ -1022,22 +1039,26 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
default:
beginTimeInLock = 0;
beginNanoInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}

elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
beginNanoInLock = 0;
} finally {
putMessageLock.unlock();
}
Expand Down Expand Up @@ -1213,10 +1234,9 @@ public void checkSelf() {

public long lockTimeMills() {
long diff = 0;
long begin = this.beginTimeInLock;
if (begin > 0) {
diff = this.defaultMessageStore.now() - begin;
}
long begin = this.beginNanoInLock;

diff = SystemClock.elapsedMillis(begin);

if (diff < 0) {
diff = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,7 @@ private PutMessageResult waitForPutResult(CompletableFuture<PutMessageResult> pu

@Override
public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;

long diff = this.lockTimeMills();
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
Expand Down Expand Up @@ -1756,7 +1754,7 @@ public void run0() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
long lockTime = SystemClock.elapsedMillis(DefaultMessageStore.this.commitLog.getBeginNanoInLock());
if (lockTime > 1000 && lockTime < 10000000) {

String stack = UtilAll.jstack();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBatch;
Expand Down Expand Up @@ -74,6 +76,7 @@ public class DLedgerCommitLog extends CommitLog {

private final MessageSerializer messageSerializer;
private volatile long beginTimeInDledgerLock = 0;
private volatile long beginNanoInDledgerLock = 0L;

//This offset separate the old commitlog from dledger commitlog
private long dividedCommitlogOffset = -1;
Expand Down Expand Up @@ -391,6 +394,11 @@ public long getBeginTimeInLock() {
return beginTimeInDledgerLock;
}

@Override
public long getBeginNanoInLock() {
return beginNanoInDledgerLock;
}

private void setMessageInfo(MessageExtBrokerInner msg, int tranType) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
Expand Down Expand Up @@ -446,6 +454,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
beginNanoInDledgerLock = this.defaultMessageStore.getSystemClock().nano();
queueOffset = getQueueOffsetByKey(msg, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest();
Expand All @@ -462,13 +471,14 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);

String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
elapsedTimeInLock = SystemClock.elapsedMillis(beginNanoInDledgerLock);
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
} catch (Exception e) {
log.error("Put message error", e);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
beginTimeInDledgerLock = 0;
beginNanoInDledgerLock = 0L;
putMessageLock.unlock();
}

Expand Down Expand Up @@ -565,6 +575,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
int msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
beginNanoInDledgerLock = this.defaultMessageStore.getSystemClock().nano();
queueOffset = getQueueOffsetByKey(messageExtBatch, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
Expand Down Expand Up @@ -600,7 +611,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
msgNum++;
}

elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
elapsedTimeInLock = SystemClock.elapsedMillis(beginNanoInDledgerLock);
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
appendResult.setMsgNum(msgNum);
Expand All @@ -609,6 +620,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} finally {
beginTimeInDledgerLock = 0;
beginNanoInDledgerLock = 0L;
putMessageLock.unlock();
}

Expand Down Expand Up @@ -690,10 +702,9 @@ public void checkSelf() {
@Override
public long lockTimeMills() {
long diff = 0;
long begin = this.beginTimeInDledgerLock;
if (begin > 0) {
diff = this.defaultMessageStore.now() - begin;
}
long begin = this.beginNanoInDledgerLock;

diff = SystemClock.elapsedMillis(begin);

if (diff < 0) {
diff = 0;
Expand Down