Skip to content

Commit

Permalink
Eliminate array copy (#2886)
Browse files Browse the repository at this point in the history
[Part C] Improve produce performance in M/S mode.
  • Loading branch information
areyouok committed Jul 6, 2021
1 parent d65778f commit 3183122
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ public SelectMappedBufferResult getCommitLogData(long offset) {
}

@Override
public boolean appendToCommitLog(long startOffset, byte[] data) {
return next.appendToCommitLog(startOffset, data);
public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ public void destroy() {
this.mappedFileQueue.destroy();
}

public boolean appendData(long startOffset, byte[] data) {
public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
putMessageLock.lock();
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
Expand All @@ -1194,7 +1194,7 @@ public boolean appendData(long startOffset, byte[] data) {
return false;
}

return mappedFile.appendMessage(data);
return mappedFile.appendMessage(data, dataStart, dataLength);
} finally {
putMessageLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,13 +921,13 @@ public SelectMappedBufferResult getCommitLogData(final long offset) {
}

@Override
public boolean appendToCommitLog(long startOffset, byte[] data) {
public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
if (this.shutdown) {
log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
return false;
}

boolean result = this.commitLog.appendData(startOffset, data);
boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
if (result) {
this.reputMessageService.wakeup();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,11 @@ GetMessageResult getMessage(final String group, final String topic, final int qu
*
* @param startOffset starting offset.
* @param data data to append.
* @param dataStart the start index of data array
* @param dataLength the length of data array
* @return true if success; false otherwise.
*/
boolean appendToCommitLog(final long startOffset, final byte[] data);
boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength);

/**
* Execute file deletion manually.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ public void destroy() {
}

@Override
public boolean appendData(long startOffset, byte[] data) {
public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
//the old ha service will invoke method, here to prevent it
return false;
}
Expand Down
10 changes: 4 additions & 6 deletions store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ private boolean processReadEvent() {

private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();

while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
Expand All @@ -459,13 +458,12 @@ private boolean dispatchReadRequest() {
}

if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
byte[] bodyData = byteBufferRead.array();
int dataStart = this.dispatchPosition + msgHeaderSize;

HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);

this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize;

if (!reportSlaveMaxOffsetPlus()) {
Expand Down

0 comments on commit 3183122

Please sign in to comment.