diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ded22ada914..7571d912dd2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,6 +73,8 @@ public class DefaultMQProducerTest { private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; + private Random random = new Random(); + @Before public void init() throws Exception { String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); @@ -200,7 +203,7 @@ public void run() { @Test public void testSetCallbackExecutor() throws MQClientException { - String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis() + random.nextInt(100); producer = new DefaultMQProducer(producerGroupTemp); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index 28505984f49..a51272689a2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -16,16 +16,16 @@ */ package org.apache.rocketmq.store.index; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; +import java.io.RandomAccessFile; import java.nio.channels.FileLock; import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.store.MappedFile; public class IndexFile { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -34,23 +34,29 @@ public class IndexFile { private static int invalidIndex = 0; private final int hashSlotNum; private final int indexNum; - private final MappedFile mappedFile; - private final FileChannel fileChannel; - private final MappedByteBuffer mappedByteBuffer; + private String fileName; + private final RandomAccessFile randomAccessFile; private final IndexHeader indexHeader; public IndexFile(final String fileName, final int hashSlotNum, final int indexNum, final long endPhyOffset, final long endTimestamp) throws IOException { - int fileTotalSize = - IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); - this.mappedFile = new MappedFile(fileName, fileTotalSize); - this.fileChannel = this.mappedFile.getFileChannel(); - this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer(); + int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); + this.fileName = fileName; + File indexFile = new File(fileName); + if (!indexFile.exists()) { + File parentFile = new File(indexFile.getAbsoluteFile().getParent()); + if (!parentFile.exists()) { + parentFile.mkdirs(); + } + indexFile.createNewFile(); + } + this.randomAccessFile = new RandomAccessFile(fileName,"rw"); + this.randomAccessFile.setLength(fileTotalSize); + this.hashSlotNum = hashSlotNum; this.indexNum = indexNum; - ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); - this.indexHeader = new IndexHeader(byteBuffer); + this.indexHeader = new IndexHeader(randomAccessFile); if (endPhyOffset > 0) { this.indexHeader.setBeginPhyOffset(endPhyOffset); @@ -64,20 +70,22 @@ public IndexFile(final String fileName, final int hashSlotNum, final int indexNu } public String getFileName() { - return this.mappedFile.getFileName(); + return this.fileName; } public void load() { - this.indexHeader.load(); + try { + this.indexHeader.load(); + } catch (IOException e) { + log.error("Load file {} index header error", fileName, e); + } } public void flush() { - long beginTime = System.currentTimeMillis(); - if (this.mappedFile.hold()) { + try { this.indexHeader.updateByteBuffer(); - this.mappedByteBuffer.force(); - this.mappedFile.release(); - log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime)); + } catch (IOException e) { + log.error("flush file {} index header error", fileName, e); } } @@ -86,7 +94,12 @@ public boolean isWriteFull() { } public boolean destroy(final long intervalForcibly) { - return this.mappedFile.destroy(intervalForcibly); + try { + randomAccessFile.close(); + } catch (IOException e) { + log.error("Close file {} error", fileName, e); + } + return true; } public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { @@ -98,10 +111,10 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi FileLock fileLock = null; try { + fileLock = randomAccessFile.getChannel().lock(); + randomAccessFile.seek(absSlotPos); + int slotValue = randomAccessFile.readInt(); - // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, - // false); - int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } @@ -122,12 +135,19 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; - this.mappedByteBuffer.putInt(absIndexPos, keyHash); - this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); - this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); - this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + dos.writeInt(keyHash); + dos.writeLong(phyOffset); + dos.writeInt((int)timeDiff); + dos.writeInt(slotValue); - this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); + randomAccessFile.seek(absIndexPos); + randomAccessFile.write(bos.toByteArray()); + + randomAccessFile.seek(absSlotPos); + randomAccessFile.writeInt(this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); @@ -138,7 +158,6 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); - return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); @@ -188,76 +207,66 @@ public boolean isTimeMatched(final long begin, final long end) { public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { - if (this.mappedFile.hold()) { - int keyHash = indexKeyHashMethod(key); - int slotPos = keyHash % this.hashSlotNum; - int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; - - FileLock fileLock = null; - try { - if (lock) { - // fileLock = this.fileChannel.lock(absSlotPos, - // hashSlotSize, true); - } - - int slotValue = this.mappedByteBuffer.getInt(absSlotPos); - // if (fileLock != null) { - // fileLock.release(); - // fileLock = null; - // } - - if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() - || this.indexHeader.getIndexCount() <= 1) { - } else { - for (int nextIndexToRead = slotValue; ; ) { - if (phyOffsets.size() >= maxNum) { - break; - } - - int absIndexPos = - IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize - + nextIndexToRead * indexSize; + int keyHash = indexKeyHashMethod(key); + int slotPos = keyHash % this.hashSlotNum; + int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; + + FileLock fileLock = null; + try { + fileLock = randomAccessFile.getChannel().lock(); + + randomAccessFile.seek(absSlotPos); + int slotValue = randomAccessFile.readInt(); + if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() + || this.indexHeader.getIndexCount() <= 1) { + } else { + for (int nextIndexToRead = slotValue; ; ) { + if (phyOffsets.size() >= maxNum) { + break; + } - int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); - long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); + int absIndexPos = + IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + + nextIndexToRead * indexSize; - long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); - int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); - if (timeDiff < 0) { - break; - } + randomAccessFile.seek(absIndexPos); + int keyHashRead = randomAccessFile.readInt(); + long phyOffsetRead = randomAccessFile.readLong(); + long timeDiff = (long) randomAccessFile.readInt(); + int prevIndexRead = randomAccessFile.readInt(); - timeDiff *= 1000L; + if (timeDiff < 0) { + break; + } - long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; - boolean timeMatched = (timeRead >= begin) && (timeRead <= end); + timeDiff *= 1000L; - if (keyHash == keyHashRead && timeMatched) { - phyOffsets.add(phyOffsetRead); - } + long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; + boolean timeMatched = (timeRead >= begin) && (timeRead <= end); - if (prevIndexRead <= invalidIndex - || prevIndexRead > this.indexHeader.getIndexCount() - || prevIndexRead == nextIndexToRead || timeRead < begin) { - break; - } + if (keyHash == keyHashRead && timeMatched) { + phyOffsets.add(phyOffsetRead); + } - nextIndexToRead = prevIndexRead; + if (prevIndexRead <= invalidIndex + || prevIndexRead > this.indexHeader.getIndexCount() + || prevIndexRead == nextIndexToRead || timeRead < begin) { + break; } + + nextIndexToRead = prevIndexRead; } - } catch (Exception e) { - log.error("selectPhyOffset exception ", e); - } finally { - if (fileLock != null) { - try { - fileLock.release(); - } catch (IOException e) { - log.error("Failed to release the lock", e); - } + } + } catch (Exception e) { + log.error("selectPhyOffset exception ", e); + } finally { + if (fileLock != null) { + try { + fileLock.release(); + } catch (IOException e) { + log.error("Failed to release the lock", e); } - - this.mappedFile.release(); } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java index 44021cd5895..682a67ca0f1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java @@ -16,19 +16,26 @@ */ package org.apache.rocketmq.store.index; -import java.nio.ByteBuffer; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +/** + * Index hdeader protocol is as below + * Long beginTimestamp + * Long endTimestamp + * Long beginPhyOffset + * Long endPhyOffset + * Int hashSlotCount + * Int indexCount + */ public class IndexHeader { public static final int INDEX_HEADER_SIZE = 40; private static int beginTimestampIndex = 0; - private static int endTimestampIndex = 8; - private static int beginPhyoffsetIndex = 16; - private static int endPhyoffsetIndex = 24; - private static int hashSlotcountIndex = 32; - private static int indexCountIndex = 36; - private final ByteBuffer byteBuffer; + private final RandomAccessFile randomAccessFile; private AtomicLong beginTimestamp = new AtomicLong(0); private AtomicLong endTimestamp = new AtomicLong(0); private AtomicLong beginPhyOffset = new AtomicLong(0); @@ -37,31 +44,39 @@ public class IndexHeader { private AtomicInteger indexCount = new AtomicInteger(1); - public IndexHeader(final ByteBuffer byteBuffer) { - this.byteBuffer = byteBuffer; + public IndexHeader(RandomAccessFile randomAccessFile) { + this.randomAccessFile = randomAccessFile; } - public void load() { - this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex)); - this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex)); - this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex)); - this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex)); + public void load() throws IOException { + randomAccessFile.seek(beginTimestampIndex); + this.beginTimestamp.set(randomAccessFile.readLong()); + this.endTimestamp.set(randomAccessFile.readLong()); + this.beginPhyOffset.set(randomAccessFile.readLong()); + this.endPhyOffset.set(randomAccessFile.readLong()); - this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex)); - this.indexCount.set(byteBuffer.getInt(indexCountIndex)); + this.hashSlotCount.set(randomAccessFile.readInt()); + this.indexCount.set(randomAccessFile.readInt()); if (this.indexCount.get() <= 0) { this.indexCount.set(1); } } - public void updateByteBuffer() { - this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get()); - this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get()); - this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get()); - this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get()); - this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get()); - this.byteBuffer.putInt(indexCountIndex, this.indexCount.get()); + public void updateByteBuffer() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + dos.writeLong(this.beginTimestamp.get()); + dos.writeLong(this.endTimestamp.get()); + dos.writeLong(this.beginPhyOffset.get()); + dos.writeLong(this.endPhyOffset.get()); + + dos.writeInt(this.hashSlotCount.get()); + dos.writeInt(this.indexCount.get()); + + randomAccessFile.seek(beginTimestampIndex); + randomAccessFile.write(bos.toByteArray()); } public long getBeginTimestamp() { @@ -70,7 +85,6 @@ public long getBeginTimestamp() { public void setBeginTimestamp(long beginTimestamp) { this.beginTimestamp.set(beginTimestamp); - this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp); } public long getEndTimestamp() { @@ -79,7 +93,6 @@ public long getEndTimestamp() { public void setEndTimestamp(long endTimestamp) { this.endTimestamp.set(endTimestamp); - this.byteBuffer.putLong(endTimestampIndex, endTimestamp); } public long getBeginPhyOffset() { @@ -88,7 +101,6 @@ public long getBeginPhyOffset() { public void setBeginPhyOffset(long beginPhyOffset) { this.beginPhyOffset.set(beginPhyOffset); - this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset); } public long getEndPhyOffset() { @@ -97,7 +109,6 @@ public long getEndPhyOffset() { public void setEndPhyOffset(long endPhyOffset) { this.endPhyOffset.set(endPhyOffset); - this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset); } public AtomicInteger getHashSlotCount() { @@ -106,7 +117,6 @@ public AtomicInteger getHashSlotCount() { public void incHashSlotCount() { int value = this.hashSlotCount.incrementAndGet(); - this.byteBuffer.putInt(hashSlotcountIndex, value); } public int getIndexCount() { @@ -115,6 +125,5 @@ public int getIndexCount() { public void incIndexCount() { int value = this.indexCount.incrementAndGet(); - this.byteBuffer.putInt(indexCountIndex, value); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java index 7ad5b38db1d..7ed081e959a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java @@ -30,17 +30,19 @@ import static org.assertj.core.api.Assertions.assertThat; public class IndexFileTest { - private final int HASH_SLOT_NUM = 100; - private final int INDEX_NUM = 400; + private final int HASH_SLOT_NUM = 5000000; + private final int INDEX_NUM = 1000000; @Test public void testPutKey() throws Exception { IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0); + long start = System.currentTimeMillis(); for (long i = 0; i < (INDEX_NUM - 1); i++) { boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); assertThat(putResult).isTrue(); } - + long end = System.currentTimeMillis(); + System.out.println("total:" + INDEX_NUM + " cost:" + (end - start)); // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); assertThat(putResult).isFalse();