Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class AllocateMappedFileService extends ServiceThread {
private PriorityBlockingQueue<AllocateRequest> requestQueue =
new PriorityBlockingQueue<AllocateRequest>();
private volatile boolean hasException = false;
private MessageStore messageStore;
private DefaultMessageStore messageStore;

public AllocateMappedFileService(MessageStore messageStore) {
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}

Expand Down Expand Up @@ -121,6 +121,9 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next

@Override
public String getServiceName() {
if (messageStore != null && messageStore.getBrokerConfig().isInBrokerContainer()) {
return messageStore.getBrokerConfig().getLoggerIdentifier() + AllocateMappedFileService.class.getSimpleName();
}
return AllocateMappedFileService.class.getSimpleName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.rocketmq.store;

import java.nio.ByteBuffer;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;

/**
* Write messages callback interface
Expand Down
500 changes: 342 additions & 158 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Large diffs are not rendered by default.

50 changes: 38 additions & 12 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -27,6 +28,7 @@
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
Expand Down Expand Up @@ -106,8 +108,9 @@ public void recover() {
if (!mappedFiles.isEmpty()) {

int index = mappedFiles.size() - 3;
if (index < 0)
if (index < 0) {
index = 0;
}

int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
Expand Down Expand Up @@ -168,6 +171,14 @@ public void recover() {
}
}

public long getTotalSize() {
long totalSize = this.mappedFileQueue.getTotalFileSize();
if (isExtReadEnable()) {
totalSize += this.consumeQueueExt.getTotalSize();
}
return totalSize;
}

@Override
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
Expand Down Expand Up @@ -239,12 +250,17 @@ public long getOffsetInQueueByTime(final long timestamp) {
}

@Override
public void truncateDirtyLogicFiles(long phyOffet) {
public void truncateDirtyLogicFiles(long phyOffset) {
truncateDirtyLogicFiles(phyOffset, true);
}

public void truncateDirtyLogicFiles(long phyOffset, boolean deleteFile) {

int logicFileSize = this.mappedFileSize;

this.maxPhysicOffset = phyOffet;
this.maxPhysicOffset = phyOffset;
long maxExtAddr = 1;
boolean shouldDeleteFile = false;
while (true) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile != null) {
Expand All @@ -260,8 +276,8 @@ public void truncateDirtyLogicFiles(long phyOffet) {
long tagsCode = byteBuffer.getLong();

if (0 == i) {
if (offset >= phyOffet) {
this.mappedFileQueue.deleteLastMappedFile();
if (offset >= phyOffset) {
shouldDeleteFile = true;
break;
} else {
int pos = i + CQ_STORE_UNIT_SIZE;
Expand All @@ -278,7 +294,7 @@ public void truncateDirtyLogicFiles(long phyOffet) {

if (offset >= 0 && size > 0) {

if (offset >= phyOffet) {
if (offset >= phyOffset) {
return;
}

Expand All @@ -299,6 +315,15 @@ public void truncateDirtyLogicFiles(long phyOffet) {
}
}
}

if (shouldDeleteFile) {
if (deleteFile) {
this.mappedFileQueue.deleteLastMappedFile();
} else {
this.mappedFileQueue.deleteExpiredFile(Collections.singletonList(this.mappedFileQueue.getLastMappedFile()));
}
}

} else {
break;
}
Expand Down Expand Up @@ -399,6 +424,7 @@ public long getMinOffsetInQueue() {
return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
}

@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
Expand Down Expand Up @@ -695,7 +721,7 @@ private class ConsumeQueueIterator implements ReferredIterator<CqUnit> {
private int relativePos = 0;

public ConsumeQueueIterator(SelectMappedBufferResult sbr) {
this.sbr = sbr;
this.sbr = sbr;
if (sbr != null && sbr.getByteBuffer() != null) {
relativePos = sbr.getByteBuffer().position();
}
Expand All @@ -715,11 +741,11 @@ public CqUnit next() {
if (!hasNext()) {
return null;
}
long queueOffset = (sbr.getStartOffset() + sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE;
long queueOffset = (sbr.getStartOffset() + sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE;
CqUnit cqUnit = new CqUnit(queueOffset,
sbr.getByteBuffer().getLong(),
sbr.getByteBuffer().getInt(),
sbr.getByteBuffer().getLong());
sbr.getByteBuffer().getLong(),
sbr.getByteBuffer().getInt(),
sbr.getByteBuffer().getLong());

if (isExtAddr(cqUnit.getTagsCode())) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
Expand All @@ -730,7 +756,7 @@ public CqUnit next() {
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content! addr={}, offsetPy={}, sizePy={}, topic={}",
cqUnit.getTagsCode(), cqUnit.getPos(), cqUnit.getPos(), getTopic());
cqUnit.getTagsCode(), cqUnit.getPos(), cqUnit.getPos(), getTopic());
}
}
return cqUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public ConsumeQueueExt(final String topic,
}
}

public long getTotalSize() {
return this.mappedFileQueue.getTotalFileSize();
}

/**
* Check whether {@code address} point to extend file.
* <p>
Expand Down
Loading