Skip to content

Commit

Permalink
[ISSUE #2883] [Part B] Improve produce performance in M/S mode. (#2885)
Browse files Browse the repository at this point in the history
* Optimise lock in WaitNotifyObject

* Remove lock in HAService

* Remove lock in GroupCommitService
  • Loading branch information
areyouok committed Jul 6, 2021
1 parent cba3e05 commit d65778f
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 81 deletions.
63 changes: 35 additions & 28 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -1403,48 +1403,55 @@ public CompletableFuture<PutMessageStatus> future() {
* GroupCommit Service
*/
class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
private final PutMessageSpinLock lock = new PutMessageSpinLock();

public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
lock.lock();
try {
this.requestsWrite.add(request);
} finally {
lock.unlock();
}
this.wakeup();
}

private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
lock.lock();
try {
LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
}

private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}

req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}

this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}

this.requestsRead = new LinkedList<>();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}

Expand Down
53 changes: 30 additions & 23 deletions store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
Expand All @@ -39,6 +38,7 @@
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus;

public class HAService {
Expand Down Expand Up @@ -254,12 +254,16 @@ public String getServiceName() {
class GroupTransferService extends ServiceThread {

private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
private final PutMessageSpinLock lock = new PutMessageSpinLock();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();

public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) {
public void putRequest(final CommitLog.GroupCommitRequest request) {
lock.lock();
try {
this.requestsWrite.add(request);
} finally {
lock.unlock();
}
this.wakeup();
}
Expand All @@ -269,32 +273,35 @@ public void notifyTransferSome() {
}

private void swapRequests() {
List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
lock.lock();
try {
LinkedList<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
}

private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}

if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}

req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}

this.requestsRead.clear();
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}

this.requestsRead = new LinkedList<>();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,43 @@
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

public class WaitNotifyObject {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable =
new HashMap<Long, Boolean>(16);
protected final ConcurrentHashMap<Long/* thread id */, AtomicBoolean/* notified */> waitingThreadTable =
new ConcurrentHashMap<Long, AtomicBoolean>(16);

protected volatile boolean hasNotified = false;
protected AtomicBoolean hasNotified = new AtomicBoolean(false);

public void wakeup() {
synchronized (this) {
if (!this.hasNotified) {
this.hasNotified = true;
boolean needNotify = hasNotified.compareAndSet(false, true);
if (needNotify) {
synchronized (this) {
this.notify();
}
}
}

protected void waitForRunning(long interval) {
if (this.hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
synchronized (this) {
if (this.hasNotified) {
this.hasNotified = false;
this.onWaitEnd();
return;
}

try {
if (this.hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
this.wait(interval);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
this.hasNotified = false;
this.hasNotified.set(false);
this.onWaitEnd();
}
}
Expand All @@ -63,36 +66,37 @@ protected void onWaitEnd() {
}

public void wakeupAll() {
synchronized (this) {
boolean needNotify = false;

for (Map.Entry<Long,Boolean> entry : this.waitingThreadTable.entrySet()) {
needNotify = needNotify || !entry.getValue();
entry.setValue(true);
boolean needNotify = false;
for (Map.Entry<Long,AtomicBoolean> entry : this.waitingThreadTable.entrySet()) {
if (entry.getValue().compareAndSet(false, true)) {
needNotify = true;
}

if (needNotify) {
}
if (needNotify) {
synchronized (this) {
this.notifyAll();
}
}
}

public void allWaitForRunning(long interval) {
long currentThreadId = Thread.currentThread().getId();
AtomicBoolean notified = this.waitingThreadTable.computeIfAbsent(currentThreadId, k -> new AtomicBoolean(false));
if (notified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
synchronized (this) {
Boolean notified = this.waitingThreadTable.get(currentThreadId);
if (notified != null && notified) {
this.waitingThreadTable.put(currentThreadId, false);
this.onWaitEnd();
return;
}

try {
if (notified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
this.wait(interval);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
this.waitingThreadTable.put(currentThreadId, false);
notified.set(false);
this.onWaitEnd();
}
}
Expand Down

0 comments on commit d65778f

Please sign in to comment.