Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class DefaultMQProducerTest {
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";

private Random random = new Random();

@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
Expand Down Expand Up @@ -200,7 +203,7 @@ public void run() {

@Test
public void testSetCallbackExecutor() throws MQClientException {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis() + random.nextInt(100);
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Expand Down
189 changes: 99 additions & 90 deletions store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package org.apache.rocketmq.store.index;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MappedFile;

public class IndexFile {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
Expand All @@ -34,23 +34,29 @@ public class IndexFile {
private static int invalidIndex = 0;
private final int hashSlotNum;
private final int indexNum;
private final MappedFile mappedFile;
private final FileChannel fileChannel;
private final MappedByteBuffer mappedByteBuffer;
private String fileName;
private final RandomAccessFile randomAccessFile;
private final IndexHeader indexHeader;

public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
final long endPhyOffset, final long endTimestamp) throws IOException {
int fileTotalSize =
IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
this.mappedFile = new MappedFile(fileName, fileTotalSize);
this.fileChannel = this.mappedFile.getFileChannel();
this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
this.fileName = fileName;
File indexFile = new File(fileName);
if (!indexFile.exists()) {
File parentFile = new File(indexFile.getAbsoluteFile().getParent());
if (!parentFile.exists()) {
parentFile.mkdirs();
}
indexFile.createNewFile();
}
this.randomAccessFile = new RandomAccessFile(fileName,"rw");
this.randomAccessFile.setLength(fileTotalSize);

this.hashSlotNum = hashSlotNum;
this.indexNum = indexNum;

ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
this.indexHeader = new IndexHeader(byteBuffer);
this.indexHeader = new IndexHeader(randomAccessFile);

if (endPhyOffset > 0) {
this.indexHeader.setBeginPhyOffset(endPhyOffset);
Expand All @@ -64,20 +70,22 @@ public IndexFile(final String fileName, final int hashSlotNum, final int indexNu
}

public String getFileName() {
return this.mappedFile.getFileName();
return this.fileName;
}

public void load() {
this.indexHeader.load();
try {
this.indexHeader.load();
} catch (IOException e) {
log.error("Load file {} index header error", fileName, e);
}
}

public void flush() {
long beginTime = System.currentTimeMillis();
if (this.mappedFile.hold()) {
try {
this.indexHeader.updateByteBuffer();
this.mappedByteBuffer.force();
this.mappedFile.release();
log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime));
} catch (IOException e) {
log.error("flush file {} index header error", fileName, e);
}
}

Expand All @@ -86,7 +94,12 @@ public boolean isWriteFull() {
}

public boolean destroy(final long intervalForcibly) {
return this.mappedFile.destroy(intervalForcibly);
try {
randomAccessFile.close();
} catch (IOException e) {
log.error("Close file {} error", fileName, e);
}
return true;
}

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
Expand All @@ -98,10 +111,10 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi
FileLock fileLock = null;

try {
fileLock = randomAccessFile.getChannel().lock();
randomAccessFile.seek(absSlotPos);
int slotValue = randomAccessFile.readInt();

// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
Expand All @@ -122,12 +135,19 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;

this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);

dos.writeInt(keyHash);
dos.writeLong(phyOffset);
dos.writeInt((int)timeDiff);
dos.writeInt(slotValue);

this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
randomAccessFile.seek(absIndexPos);
randomAccessFile.write(bos.toByteArray());

randomAccessFile.seek(absSlotPos);
randomAccessFile.writeInt(this.indexHeader.getIndexCount());

if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
Expand All @@ -138,7 +158,6 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);

return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
Expand Down Expand Up @@ -188,76 +207,66 @@ public boolean isTimeMatched(final long begin, final long end) {

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}

int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }

if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}

int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

FileLock fileLock = null;
try {
fileLock = randomAccessFile.getChannel().lock();

randomAccessFile.seek(absSlotPos);
int slotValue = randomAccessFile.readInt();
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}

int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;

long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

if (timeDiff < 0) {
break;
}
randomAccessFile.seek(absIndexPos);
int keyHashRead = randomAccessFile.readInt();
long phyOffsetRead = randomAccessFile.readLong();
long timeDiff = (long) randomAccessFile.readInt();
int prevIndexRead = randomAccessFile.readInt();

timeDiff *= 1000L;
if (timeDiff < 0) {
break;
}

long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
timeDiff *= 1000L;

if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}

nextIndexToRead = prevIndexRead;
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}

nextIndexToRead = prevIndexRead;
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}

this.mappedFile.release();
}
}
}
Expand Down
Loading