Skip to content

Commit

Permalink
fix code bug & optimize the memory usage in DefaultMappedFile (#4678)
Browse files Browse the repository at this point in the history
* optimize the memory usage in DefaultMappedFile:
use AtomicIntegerFieldUpdater instead of AtomicInteger

* fix code bug:
if the initial cursor of listIterator equals zero,
the previous element will always null.

* modify constant name

Co-authored-by: shanpengyu <shanpengyu@bytedance.com>
  • Loading branch information
SeaItFover and shanpengyu committed Aug 17, 2022
1 parent e3c2111 commit d17a7f0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 42 deletions.
Expand Up @@ -277,7 +277,7 @@ public boolean resetOffset(long offset) {
return false;
}

ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();
ListIterator<MappedFile> iterator = this.mappedFiles.listIterator(mappedFiles.size());

while (iterator.hasPrevious()) {
mappedFileLast = iterator.previous();
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -27,7 +28,9 @@
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExtBatch;
Expand All @@ -52,9 +55,14 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

protected static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
protected final AtomicInteger flushedPosition = new AtomicInteger(0);

protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> WROTE_POSITION_UPDATER;
protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> COMMITTED_POSITION_UPDATER;
protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> FLUSHED_POSITION_UPDATER;

protected volatile int wrotePosition;
protected volatile int committedPosition;
protected volatile int flushedPosition;
protected int fileSize;
protected FileChannel fileChannel;
/**
Expand All @@ -74,6 +82,12 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected long swapMapTime = 0L;
protected long mappedByteBufferAccessCountSinceLastSwap = 0L;

static {
WROTE_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
COMMITTED_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "committedPosition");
FLUSHED_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "flushedPosition");
}

public DefaultMappedFile() {
}

Expand All @@ -82,7 +96,7 @@ public DefaultMappedFile(final String fileName, final int fileSize) throws IOExc
}

public DefaultMappedFile(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}

Expand Down Expand Up @@ -155,11 +169,11 @@ public boolean getData(int pos, int size, ByteBuffer byteBuffer) {
}
} else {
log.debug("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+ this.fileFromOffset);
+ this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+ ", fileFromOffset: " + this.fileFromOffset);
+ ", fileFromOffset: " + this.fileFromOffset);
}

return false;
Expand All @@ -177,22 +191,22 @@ public FileChannel getFileChannel() {

@Override
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
PutMessageContext putMessageContext) {
return appendMessagesInner(msg, cb, putMessageContext);
}

@Override
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
PutMessageContext putMessageContext) {
return appendMessagesInner(messageExtBatch, cb, putMessageContext);
}

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;

int currentPos = this.wrotePosition.get();
int currentPos = WROTE_POSITION_UPDATER.get(this);

if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = appendMessageBuffer().slice();
Expand All @@ -209,7 +223,7 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
Expand All @@ -234,7 +248,7 @@ public boolean appendMessage(final byte[] data) {

@Override
public boolean appendMessage(ByteBuffer data) {
int currentPos = this.wrotePosition.get();
int currentPos = WROTE_POSITION_UPDATER.get(this);
int remaining = data.remaining();

if ((currentPos + remaining) <= this.fileSize) {
Expand All @@ -246,7 +260,7 @@ public boolean appendMessage(ByteBuffer data) {
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
this.wrotePosition.addAndGet(remaining);
WROTE_POSITION_UPDATER.addAndGet(this, remaining);
return true;
}
return false;
Expand All @@ -260,7 +274,7 @@ public boolean appendMessage(ByteBuffer data) {
*/
@Override
public boolean appendMessage(final byte[] data, final int offset, final int length) {
int currentPos = this.wrotePosition.get();
int currentPos = WROTE_POSITION_UPDATER.get(this);

if ((currentPos + length) <= this.fileSize) {
try {
Expand All @@ -270,7 +284,7 @@ public boolean appendMessage(final byte[] data, final int offset, final int leng
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
this.wrotePosition.addAndGet(length);
WROTE_POSITION_UPDATER.addAndGet(this, length);
return true;
}

Expand Down Expand Up @@ -300,11 +314,11 @@ public int flush(final int flushLeastPages) {
log.error("Error occurred when force data to disk.", e);
}

this.flushedPosition.set(value);
FLUSHED_POSITION_UPDATER.set(this, value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
log.warn("in flush, hold failed, flush offset = " + FLUSHED_POSITION_UPDATER.get(this));
FLUSHED_POSITION_UPDATER.set(this, getReadPosition());
}
}
return this.getFlushedPosition();
Expand All @@ -314,29 +328,29 @@ public int flush(final int flushLeastPages) {
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
return WROTE_POSITION_UPDATER.get(this);
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0();
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
log.warn("in commit, hold failed, commit offset = " + COMMITTED_POSITION_UPDATER.get(this));
}
}

// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == COMMITTED_POSITION_UPDATER.get(this)) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}

return this.committedPosition.get();
return COMMITTED_POSITION_UPDATER.get(this);
}

protected void commit0() {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
int writePos = WROTE_POSITION_UPDATER.get(this);
int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this);

if (writePos - lastCommittedPosition > 0) {
try {
Expand All @@ -345,15 +359,15 @@ protected void commit0() {
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
COMMITTED_POSITION_UPDATER.set(this, writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}

private boolean isAbleToFlush(final int flushLeastPages) {
int flush = this.flushedPosition.get();
int flush = FLUSHED_POSITION_UPDATER.get(this);
int write = getReadPosition();

if (this.isFull()) {
Expand All @@ -368,8 +382,8 @@ private boolean isAbleToFlush(final int flushLeastPages) {
}

protected boolean isAbleToCommit(final int commitLeastPages) {
int commit = this.committedPosition.get();
int write = this.wrotePosition.get();
int commit = COMMITTED_POSITION_UPDATER.get(this);
int write = WROTE_POSITION_UPDATER.get(this);

if (this.isFull()) {
return true;
Expand All @@ -384,17 +398,17 @@ protected boolean isAbleToCommit(final int commitLeastPages) {

@Override
public int getFlushedPosition() {
return flushedPosition.get();
return FLUSHED_POSITION_UPDATER.get(this);
}

@Override
public void setFlushedPosition(int pos) {
this.flushedPosition.set(pos);
FLUSHED_POSITION_UPDATER.set(this, pos);
}

@Override
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
return this.fileSize == WROTE_POSITION_UPDATER.get(this);
}

@Override
Expand All @@ -411,11 +425,11 @@ public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+ this.fileFromOffset);
+ this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+ ", fileFromOffset: " + this.fileFromOffset);
+ ", fileFromOffset: " + this.fileFromOffset);
}

return null;
Expand Down Expand Up @@ -443,13 +457,13 @@ public SelectMappedBufferResult selectMappedBuffer(int pos) {
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have not shutdown, stop unmapping.");
+ " have not shutdown, stop unmapping.");
return false;
}

if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have cleanup, do not do it again.");
+ " have cleanup, do not do it again.");
return true;
}

Expand Down Expand Up @@ -494,25 +508,25 @@ public boolean destroy(final long intervalForcibly) {

@Override
public int getWrotePosition() {
return wrotePosition.get();
return WROTE_POSITION_UPDATER.get(this);
}

@Override
public void setWrotePosition(int pos) {
this.wrotePosition.set(pos);
WROTE_POSITION_UPDATER.set(this, pos);
}

/**
* @return The max position which have valid data
*/
@Override
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
return this.writeBuffer == null ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);
}

@Override
public void setCommittedPosition(int pos) {
this.committedPosition.set(pos);
COMMITTED_POSITION_UPDATER.set(this, pos);
}

@Override
Expand Down Expand Up @@ -548,11 +562,11 @@ public void warmMappedFile(FlushDiskType type, int pages) {
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
System.currentTimeMillis() - beginTime);

this.mlock();
}
Expand Down

0 comments on commit d17a7f0

Please sign in to comment.