diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 70e59e098de..0d926b26704 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -106,6 +106,7 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerIdentity; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -1107,7 +1108,7 @@ public long headSlowTimeMills(BlockingQueue q) { final Runnable peek = q.peek(); if (peek != null) { RequestTask rt = BrokerFastFailure.castRunnable(peek); - slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp(); + slowTimeMills = rt == null ? 0 : SystemClock.elapsedMillis(rt.getCreateNano()); } if (slowTimeMills < 0) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index d3d0bc8ba3a..83bfa426545 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.AbstractBrokerRunnable; +import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -87,7 +88,7 @@ private void cleanExpiredRequest() { rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format( "[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, " + "size of queue: %d", - System.currentTimeMillis() - rt.getCreateTimestamp(), + SystemClock.elapsedMillis(rt.getCreateNano()), this.brokerController.getSendThreadPoolQueue().size())); } } else { @@ -129,7 +130,7 @@ void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, fin break; } - final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); + final long behind = SystemClock.elapsedMillis(rt.getCreateNano()); if (behind >= maxWaitTimeMillsInQueue) { if (blockingQueue.remove(runnable)) { rt.setStopRun(true); diff --git a/common/src/main/java/org/apache/rocketmq/common/SystemClock.java b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java index b7eb3a6581b..fb0d87eab1a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/SystemClock.java +++ b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java @@ -20,4 +20,20 @@ public class SystemClock { public long now() { return System.currentTimeMillis(); } + + public long nano() { + return System.nanoTime(); + } + + private static long nanoToMillis(long nano) { + return nano / 1000_000L; + } + + public static long elapsedMillis(long startNano) { + long diff = System.nanoTime() - startNano; + if (diff < 0) { + diff = 0; + } + return nanoToMillis(diff); + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java index 15c65ebec9d..b70f51ba6ab 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java @@ -26,6 +26,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; @@ -202,7 +203,7 @@ private long headSlowTimeMills(BlockingQueue q) { if (firstRunnable instanceof FutureTaskExt) { final Runnable inner = ((FutureTaskExt) firstRunnable).getRunnable(); if (inner instanceof RequestTask) { - slowTimeMills = System.currentTimeMillis() - ((RequestTask) inner).getCreateTimestamp(); + slowTimeMills = SystemClock.elapsedMillis(((RequestTask) inner).getCreateNano()); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index 85c96056209..59b3f38a99e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; @@ -293,7 +294,7 @@ protected long headSlowTimeMills(BlockingQueue q) { final Runnable peek = q.peek(); if (peek != null) { RequestTask rt = castRunnable(peek); - slowTimeMills = rt == null ? 0 : System.currentTimeMillis() - rt.getCreateTimestamp(); + slowTimeMills = rt == null ? 0 : SystemClock.elapsedMillis(rt.getCreateNano()); } if (slowTimeMills < 0) { @@ -332,7 +333,7 @@ protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, break; } - final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); + final long behind = SystemClock.elapsedMillis(rt.getCreateNano()); if (behind >= maxWaitTimeMillsInQueue) { if (blockingQueue.remove(runnable)) { rt.setStopRun(true); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java index 57ed3606090..5e3c8c93354 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java @@ -27,6 +27,7 @@ public class RequestTask implements Runnable { private final RemotingCommand request; private volatile boolean stopRun = false; + private final long createNano = System.nanoTime(); public RequestTask(final Runnable runnable, final Channel channel, final RemotingCommand request) { this.runnable = runnable; this.channel = channel; @@ -40,6 +41,7 @@ public int hashCode() { result = 31 * result + (channel != null ? channel.hashCode() : 0); result = 31 * result + (request != null ? request.hashCode() : 0); result = 31 * result + (isStopRun() ? 1 : 0); + result = 31 * result + (int) (getCreateNano() ^ (getCreateNano() >>> 32)); return result; } @@ -54,6 +56,8 @@ public boolean equals(final Object o) { if (getCreateTimestamp() != that.getCreateTimestamp()) return false; + if (getCreateNano() != that.getCreateNano()) + return false; if (isStopRun() != that.isStopRun()) return false; if (channel != null ? !channel.equals(that.channel) : that.channel != null) @@ -66,6 +70,10 @@ public long getCreateTimestamp() { return createTimestamp; } + public long getCreateNano() { + return createNano; + } + public boolean isStopRun() { return stopRun; } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index d7e141d31c5..1b81f4632de 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -31,8 +31,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; + import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; @@ -74,7 +76,7 @@ public class CommitLog implements Swappable { protected volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; - + private volatile long beginNanoInLock = 0L; protected final PutMessageLock putMessageLock; protected final TopicQueueLock topicQueueLock; @@ -727,6 +729,10 @@ public long getBeginTimeInLock() { return beginTimeInLock; } + public long getBeginNanoInLock() { + return beginNanoInLock; + } + public String generateKey(StringBuilder keyBuilder, MessageExt messageExt) { keyBuilder.setLength(0); keyBuilder.append(messageExt.getTopic()); @@ -837,7 +843,9 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + long beginLockNano = this.defaultMessageStore.getSystemClock().nano(); this.beginTimeInLock = beginLockTimestamp; + this.beginNanoInLock = beginLockNano; // Here settings are stored timestamp, in order to ensure an orderly // global @@ -851,6 +859,7 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } @@ -868,6 +877,7 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); } result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); @@ -878,17 +888,21 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); default: beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + elapsedTimeInLock = SystemClock.elapsedMillis(beginLockNano); beginTimeInLock = 0; + beginNanoInLock = 0; } finally { putMessageLock.unlock(); } @@ -995,7 +1009,9 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc putMessageLock.lock(); try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + long beginLockNano = this.defaultMessageStore.getSystemClock().nano(); this.beginTimeInLock = beginLockTimestamp; + this.beginNanoInLock = beginLockNano; // Here settings are stored timestamp, in order to ensure an orderly // global @@ -1007,6 +1023,7 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc if (null == mappedFile) { log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; + beginLockNano = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } @@ -1022,6 +1039,7 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc // XXX: warn and notify me log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); } result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); @@ -1029,15 +1047,18 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: default: beginTimeInLock = 0; + beginNanoInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; + beginNanoInLock = 0; } finally { putMessageLock.unlock(); } @@ -1213,10 +1234,9 @@ public void checkSelf() { public long lockTimeMills() { long diff = 0; - long begin = this.beginTimeInLock; - if (begin > 0) { - diff = this.defaultMessageStore.now() - begin; - } + long begin = this.beginNanoInLock; + + diff = SystemClock.elapsedMillis(begin); if (diff < 0) { diff = 0; diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index dc8e3efdbce..d30f4a97af7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -639,9 +639,7 @@ private PutMessageResult waitForPutResult(CompletableFuture pu @Override public boolean isOSPageCacheBusy() { - long begin = this.getCommitLog().getBeginTimeInLock(); - long diff = this.systemClock.now() - begin; - + long diff = this.lockTimeMills(); return diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); } @@ -1756,7 +1754,7 @@ public void run0() { if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { try { if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { - long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); + long lockTime = SystemClock.elapsedMillis(DefaultMessageStore.this.commitLog.getBeginNanoInLock()); if (lockTime > 1000 && lockTime < 10000000) { String stack = UtilAll.jstack(); diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 39906eae094..483fed87b2f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -36,6 +36,8 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; + +import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExtBatch; @@ -74,6 +76,7 @@ public class DLedgerCommitLog extends CommitLog { private final MessageSerializer messageSerializer; private volatile long beginTimeInDledgerLock = 0; + private volatile long beginNanoInDledgerLock = 0L; //This offset separate the old commitlog from dledger commitlog private long dividedCommitlogOffset = -1; @@ -391,6 +394,11 @@ public long getBeginTimeInLock() { return beginTimeInDledgerLock; } + @Override + public long getBeginNanoInLock() { + return beginNanoInDledgerLock; + } + private void setMessageInfo(MessageExtBrokerInner msg, int tranType) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); @@ -446,6 +454,7 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner long queueOffset; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + beginNanoInDledgerLock = this.defaultMessageStore.getSystemClock().nano(); queueOffset = getQueueOffsetByKey(msg, tranType); encodeResult.setQueueOffsetKey(queueOffset, false); AppendEntryRequest request = new AppendEntryRequest(); @@ -462,13 +471,14 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + elapsedTimeInLock = SystemClock.elapsedMillis(beginNanoInDledgerLock); appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); } catch (Exception e) { log.error("Put message error", e); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { beginTimeInDledgerLock = 0; + beginNanoInDledgerLock = 0L; putMessageLock.unlock(); } @@ -565,6 +575,7 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess int msgNum = 0; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + beginNanoInDledgerLock = this.defaultMessageStore.getSystemClock().nano(); queueOffset = getQueueOffsetByKey(messageExtBatch, tranType); encodeResult.setQueueOffsetKey(queueOffset, true); BatchAppendEntryRequest request = new BatchAppendEntryRequest(); @@ -600,7 +611,7 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess msgNum++; } - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + elapsedTimeInLock = SystemClock.elapsedMillis(beginNanoInDledgerLock); appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); appendResult.setMsgNum(msgNum); @@ -609,6 +620,7 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { beginTimeInDledgerLock = 0; + beginNanoInDledgerLock = 0L; putMessageLock.unlock(); } @@ -690,10 +702,9 @@ public void checkSelf() { @Override public long lockTimeMills() { long diff = 0; - long begin = this.beginTimeInDledgerLock; - if (begin > 0) { - diff = this.defaultMessageStore.now() - begin; - } + long begin = this.beginNanoInDledgerLock; + + diff = SystemClock.elapsedMillis(begin); if (diff < 0) { diff = 0;