diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 7ba7de4f4d2..799df728850 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -277,7 +277,7 @@ public boolean resetOffset(long offset) { return false; } - ListIterator iterator = this.mappedFiles.listIterator(); + ListIterator iterator = this.mappedFiles.listIterator(mappedFiles.size()); while (iterator.hasPrevious()) { mappedFileLast = iterator.previous(); diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index 89709989f97..b5ba68f5112 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -18,6 +18,7 @@ import com.sun.jna.NativeLong; import com.sun.jna.Pointer; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -27,7 +28,9 @@ import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; + import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageExtBatch; @@ -52,9 +55,14 @@ public class DefaultMappedFile extends AbstractMappedFile { protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); protected static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); - protected final AtomicInteger wrotePosition = new AtomicInteger(0); - protected final AtomicInteger committedPosition = new AtomicInteger(0); - protected final AtomicInteger flushedPosition = new AtomicInteger(0); + + protected static final AtomicIntegerFieldUpdater WROTE_POSITION_UPDATER; + protected static final AtomicIntegerFieldUpdater COMMITTED_POSITION_UPDATER; + protected static final AtomicIntegerFieldUpdater FLUSHED_POSITION_UPDATER; + + protected volatile int wrotePosition; + protected volatile int committedPosition; + protected volatile int flushedPosition; protected int fileSize; protected FileChannel fileChannel; /** @@ -74,6 +82,12 @@ public class DefaultMappedFile extends AbstractMappedFile { protected long swapMapTime = 0L; protected long mappedByteBufferAccessCountSinceLastSwap = 0L; + static { + WROTE_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition"); + COMMITTED_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "committedPosition"); + FLUSHED_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "flushedPosition"); + } + public DefaultMappedFile() { } @@ -82,7 +96,7 @@ public DefaultMappedFile(final String fileName, final int fileSize) throws IOExc } public DefaultMappedFile(final String fileName, final int fileSize, - final TransientStorePool transientStorePool) throws IOException { + final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize, transientStorePool); } @@ -155,11 +169,11 @@ public boolean getData(int pos, int size, ByteBuffer byteBuffer) { } } else { log.debug("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " - + this.fileFromOffset); + + this.fileFromOffset); } } else { log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size - + ", fileFromOffset: " + this.fileFromOffset); + + ", fileFromOffset: " + this.fileFromOffset); } return false; @@ -177,22 +191,22 @@ public FileChannel getFileChannel() { @Override public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb, - PutMessageContext putMessageContext) { + PutMessageContext putMessageContext) { return appendMessagesInner(msg, cb, putMessageContext); } @Override public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb, - PutMessageContext putMessageContext) { + PutMessageContext putMessageContext) { return appendMessagesInner(messageExtBatch, cb, putMessageContext); } public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, - PutMessageContext putMessageContext) { + PutMessageContext putMessageContext) { assert messageExt != null; assert cb != null; - int currentPos = this.wrotePosition.get(); + int currentPos = WROTE_POSITION_UPDATER.get(this); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = appendMessageBuffer().slice(); @@ -209,7 +223,7 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } - this.wrotePosition.addAndGet(result.getWroteBytes()); + WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } @@ -234,7 +248,7 @@ public boolean appendMessage(final byte[] data) { @Override public boolean appendMessage(ByteBuffer data) { - int currentPos = this.wrotePosition.get(); + int currentPos = WROTE_POSITION_UPDATER.get(this); int remaining = data.remaining(); if ((currentPos + remaining) <= this.fileSize) { @@ -246,7 +260,7 @@ public boolean appendMessage(ByteBuffer data) { } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); } - this.wrotePosition.addAndGet(remaining); + WROTE_POSITION_UPDATER.addAndGet(this, remaining); return true; } return false; @@ -260,7 +274,7 @@ public boolean appendMessage(ByteBuffer data) { */ @Override public boolean appendMessage(final byte[] data, final int offset, final int length) { - int currentPos = this.wrotePosition.get(); + int currentPos = WROTE_POSITION_UPDATER.get(this); if ((currentPos + length) <= this.fileSize) { try { @@ -270,7 +284,7 @@ public boolean appendMessage(final byte[] data, final int offset, final int leng } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); } - this.wrotePosition.addAndGet(length); + WROTE_POSITION_UPDATER.addAndGet(this, length); return true; } @@ -300,11 +314,11 @@ public int flush(final int flushLeastPages) { log.error("Error occurred when force data to disk.", e); } - this.flushedPosition.set(value); + FLUSHED_POSITION_UPDATER.set(this, value); this.release(); } else { - log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); - this.flushedPosition.set(getReadPosition()); + log.warn("in flush, hold failed, flush offset = " + FLUSHED_POSITION_UPDATER.get(this)); + FLUSHED_POSITION_UPDATER.set(this, getReadPosition()); } } return this.getFlushedPosition(); @@ -314,29 +328,29 @@ public int flush(final int flushLeastPages) { public int commit(final int commitLeastPages) { if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. - return this.wrotePosition.get(); + return WROTE_POSITION_UPDATER.get(this); } if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(); this.release(); } else { - log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); + log.warn("in commit, hold failed, commit offset = " + COMMITTED_POSITION_UPDATER.get(this)); } } // All dirty data has been committed to FileChannel. - if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { + if (writeBuffer != null && this.transientStorePool != null && this.fileSize == COMMITTED_POSITION_UPDATER.get(this)) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } - return this.committedPosition.get(); + return COMMITTED_POSITION_UPDATER.get(this); } protected void commit0() { - int writePos = this.wrotePosition.get(); - int lastCommittedPosition = this.committedPosition.get(); + int writePos = WROTE_POSITION_UPDATER.get(this); + int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this); if (writePos - lastCommittedPosition > 0) { try { @@ -345,7 +359,7 @@ protected void commit0() { byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); - this.committedPosition.set(writePos); + COMMITTED_POSITION_UPDATER.set(this, writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } @@ -353,7 +367,7 @@ protected void commit0() { } private boolean isAbleToFlush(final int flushLeastPages) { - int flush = this.flushedPosition.get(); + int flush = FLUSHED_POSITION_UPDATER.get(this); int write = getReadPosition(); if (this.isFull()) { @@ -368,8 +382,8 @@ private boolean isAbleToFlush(final int flushLeastPages) { } protected boolean isAbleToCommit(final int commitLeastPages) { - int commit = this.committedPosition.get(); - int write = this.wrotePosition.get(); + int commit = COMMITTED_POSITION_UPDATER.get(this); + int write = WROTE_POSITION_UPDATER.get(this); if (this.isFull()) { return true; @@ -384,17 +398,17 @@ protected boolean isAbleToCommit(final int commitLeastPages) { @Override public int getFlushedPosition() { - return flushedPosition.get(); + return FLUSHED_POSITION_UPDATER.get(this); } @Override public void setFlushedPosition(int pos) { - this.flushedPosition.set(pos); + FLUSHED_POSITION_UPDATER.set(this, pos); } @Override public boolean isFull() { - return this.fileSize == this.wrotePosition.get(); + return this.fileSize == WROTE_POSITION_UPDATER.get(this); } @Override @@ -411,11 +425,11 @@ public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } else { log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " - + this.fileFromOffset); + + this.fileFromOffset); } } else { log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size - + ", fileFromOffset: " + this.fileFromOffset); + + ", fileFromOffset: " + this.fileFromOffset); } return null; @@ -443,13 +457,13 @@ public SelectMappedBufferResult selectMappedBuffer(int pos) { public boolean cleanup(final long currentRef) { if (this.isAvailable()) { log.error("this file[REF:" + currentRef + "] " + this.fileName - + " have not shutdown, stop unmapping."); + + " have not shutdown, stop unmapping."); return false; } if (this.isCleanupOver()) { log.error("this file[REF:" + currentRef + "] " + this.fileName - + " have cleanup, do not do it again."); + + " have cleanup, do not do it again."); return true; } @@ -494,12 +508,12 @@ public boolean destroy(final long intervalForcibly) { @Override public int getWrotePosition() { - return wrotePosition.get(); + return WROTE_POSITION_UPDATER.get(this); } @Override public void setWrotePosition(int pos) { - this.wrotePosition.set(pos); + WROTE_POSITION_UPDATER.set(this, pos); } /** @@ -507,12 +521,12 @@ public void setWrotePosition(int pos) { */ @Override public int getReadPosition() { - return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); + return this.writeBuffer == null ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this); } @Override public void setCommittedPosition(int pos) { - this.committedPosition.set(pos); + COMMITTED_POSITION_UPDATER.set(this, pos); } @Override @@ -548,11 +562,11 @@ public void warmMappedFile(FlushDiskType type, int pages) { // force flush when prepare load finished if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}", - this.getFileName(), System.currentTimeMillis() - beginTime); + this.getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(), - System.currentTimeMillis() - beginTime); + System.currentTimeMillis() - beginTime); this.mlock(); }