From 73591b0c558007e478da62e36246917dcdc71d46 Mon Sep 17 00:00:00 2001 From: Jaskey Date: Wed, 15 Feb 2017 11:05:10 +0800 Subject: [PATCH] Fix risk of unable to release putMessage Lock forever. Refactor to use one lock only. --- .../apache/rocketmq/common/BrokerConfig.java | 3 ++ .../org/apache/rocketmq/store/CommitLog.java | 40 +++-------------- .../apache/rocketmq/store/PutMessageLock.java | 25 +++++++++++ .../store/PutMessageReentrantLock.java | 37 ++++++++++++++++ .../rocketmq/store/PutMessageSpinLock.java | 43 +++++++++++++++++++ .../store/config/MessageStoreConfig.java | 4 ++ 6 files changed, 119 insertions(+), 33 deletions(-) create mode 100644 store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java create mode 100644 store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java create mode 100644 store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f79f7267c8c..f736df37a56 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -47,6 +47,9 @@ public class BrokerConfig { private boolean autoCreateSubscriptionGroup = true; private String messageStorePlugIn = ""; + /** + * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1. + */ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int adminBrokerThreadPoolNums = 16; diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3bbe675854f..ab26d2f72b7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -23,8 +23,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -61,12 +59,7 @@ public class CommitLog { private volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; - - //true: Can lock, false : in lock. - private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); - - private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync - + private final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); @@ -81,6 +74,8 @@ public CommitLog(final DefaultMessageStore defaultMessageStore) { this.commitLogService = new CommitRealTimeService(); this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); + } public boolean load() { @@ -567,7 +562,7 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - lockForPutMessage(); //spin... + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -616,7 +611,7 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { @@ -753,7 +748,7 @@ public void destroy() { } public boolean appendData(long startOffset, byte[] data) { - lockForPutMessage(); //spin... + putMessageLock.lock(); try { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); if (null == mappedFile) { @@ -763,7 +758,7 @@ public boolean appendData(long startOffset, byte[] data) { return mappedFile.appendMessage(data); } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } } @@ -798,28 +793,7 @@ public long lockTimeMills() { return diff; } - /** - * Spin util acquired the lock. - */ - private void lockForPutMessage() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.lock(); - } else { - boolean flag; - do { - flag = this.putMessageSpinLock.compareAndSet(true, false); - } - while (!flag); - } - } - private void releasePutMessageLock() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.unlock(); - } else { - this.putMessageSpinLock.compareAndSet(false, true); - } - } public static class GroupCommitRequest { private final long nextOffset; diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java new file mode 100644 index 00000000000..a03e41adf72 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +/** + * Used when trying to put message + */ +public interface PutMessageLock { + void lock(); + void unlock(); +} diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java new file mode 100644 index 00000000000..9198f1c6f51 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Exclusive lock implementation to put message + */ +public class PutMessageReentrantLock implements PutMessageLock { + private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync + + @Override + public void lock() { + putMessageNormalLock.lock(); + } + + @Override + public void unlock() { + putMessageNormalLock.unlock(); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java new file mode 100644 index 00000000000..baa809db9e0 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Spin lock Implementation to put message, suggest using this witb low race conditions + * + */ +public class PutMessageSpinLock implements PutMessageLock { + //true: Can lock, false : in lock. + private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); + + @Override + public void lock() { + boolean flag; + do { + flag = this.putMessageSpinLock.compareAndSet(true, false); + } + while (!flag); + } + + @Override + public void unlock() { + this.putMessageSpinLock.compareAndSet(false, true); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 9cfd1c34caa..2b0759d2563 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -45,6 +45,10 @@ public class MessageStoreConfig { @ImportantField private int commitIntervalCommitLog = 200; + /** + * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.
+ * By default it is set to false indicating using spin lock when putting message. + */ private boolean useReentrantLockWhenPutMessage = false; // Whether schedule flush,default is real-time