From 85c8bf5be3ffc57d328ad2ee52d262dc9b383e66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=81=E8=88=AC?= Date: Tue, 13 Mar 2018 21:10:12 +0800 Subject: [PATCH 1/4] add index file latency test --- .../main/java/org/apache/rocketmq/store/index/IndexFile.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index 28505984f49..085b536c4e4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -35,7 +35,7 @@ public class IndexFile { private final int hashSlotNum; private final int indexNum; private final MappedFile mappedFile; - private final FileChannel fileChannel; +// private final FileChannel fileChannel; private final MappedByteBuffer mappedByteBuffer; private final IndexHeader indexHeader; @@ -44,7 +44,7 @@ public IndexFile(final String fileName, final int hashSlotNum, final int indexNu int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); this.mappedFile = new MappedFile(fileName, fileTotalSize); - this.fileChannel = this.mappedFile.getFileChannel(); +// this.fileChannel = this.mappedFile.getFileChannel(); this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer(); this.hashSlotNum = hashSlotNum; this.indexNum = indexNum; From 14d27d0dd58e412c4d01972358c923e551007802 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=81=E8=88=AC?= Date: Mon, 26 Mar 2018 15:53:14 +0800 Subject: [PATCH 2/4] fix index file using random access file --- .../rocketmq/store/index/IndexFile.java | 200 ++++++++++-------- .../rocketmq/store/index/IndexHeader.java | 61 ++++-- .../rocketmq/store/index/IndexFileTest.java | 4 +- 3 files changed, 149 insertions(+), 116 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index 085b536c4e4..1437fa4466a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -16,16 +16,15 @@ */ package org.apache.rocketmq.store.index; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; +import java.io.RandomAccessFile; import java.nio.channels.FileLock; import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.store.MappedFile; public class IndexFile { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -34,23 +33,20 @@ public class IndexFile { private static int invalidIndex = 0; private final int hashSlotNum; private final int indexNum; - private final MappedFile mappedFile; -// private final FileChannel fileChannel; - private final MappedByteBuffer mappedByteBuffer; + private String fileName; + private final RandomAccessFile randomAccessFile; private final IndexHeader indexHeader; public IndexFile(final String fileName, final int hashSlotNum, final int indexNum, final long endPhyOffset, final long endTimestamp) throws IOException { - int fileTotalSize = - IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); - this.mappedFile = new MappedFile(fileName, fileTotalSize); -// this.fileChannel = this.mappedFile.getFileChannel(); - this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer(); + int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); + this.fileName = fileName; + this.randomAccessFile = new RandomAccessFile(fileName,"rw"); + this.hashSlotNum = hashSlotNum; this.indexNum = indexNum; - ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); - this.indexHeader = new IndexHeader(byteBuffer); + this.indexHeader = new IndexHeader(randomAccessFile); if (endPhyOffset > 0) { this.indexHeader.setBeginPhyOffset(endPhyOffset); @@ -61,24 +57,43 @@ public IndexFile(final String fileName, final int hashSlotNum, final int indexNu this.indexHeader.setBeginTimestamp(endTimestamp); this.indexHeader.setEndTimestamp(endTimestamp); } + + initFile(fileTotalSize); + } + + private void initFile(int size){ + try{ + int indexCount = indexHeader.getIndexCount(); + if(indexCount<2){ + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + for (int i = 0; i < 1024*32; i++) { + byteArrayOutputStream.write(0); + } + + randomAccessFile.seek((long)IndexHeader.INDEX_HEADER_SIZE); + for (int i = IndexHeader.INDEX_HEADER_SIZE; i < size; i+=1024*1024) { + randomAccessFile.write(byteArrayOutputStream.toByteArray()); + } + } + }catch (IOException e){ + log.error("init IndexFile failed", e); + } } public String getFileName() { - return this.mappedFile.getFileName(); + return this.fileName; } public void load() { - this.indexHeader.load(); + try { + this.indexHeader.load(); + } catch (IOException e) { + log.error("Load file {} index header error", fileName, e); + } } public void flush() { - long beginTime = System.currentTimeMillis(); - if (this.mappedFile.hold()) { - this.indexHeader.updateByteBuffer(); - this.mappedByteBuffer.force(); - this.mappedFile.release(); - log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime)); - } + //Do nothing } public boolean isWriteFull() { @@ -86,7 +101,12 @@ public boolean isWriteFull() { } public boolean destroy(final long intervalForcibly) { - return this.mappedFile.destroy(intervalForcibly); + try { + randomAccessFile.close(); + } catch (IOException e) { + log.error("Close file {} error", fileName, e); + } + return true; } public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { @@ -98,10 +118,10 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi FileLock fileLock = null; try { + fileLock = randomAccessFile.getChannel().lock(); + randomAccessFile.seek(absSlotPos); + int slotValue = randomAccessFile.readInt(); - // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, - // false); - int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } @@ -122,12 +142,19 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; - this.mappedByteBuffer.putInt(absIndexPos, keyHash); - this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); - this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); - this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + dos.writeInt(keyHash); + dos.writeLong(phyOffset); + dos.writeInt((int)timeDiff); + dos.writeInt(slotValue); - this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); + randomAccessFile.seek(absIndexPos); + randomAccessFile.write(bos.toByteArray()); + + randomAccessFile.seek(absSlotPos); + randomAccessFile.writeInt(this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); @@ -139,6 +166,7 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); + this.indexHeader.updateByteBuffer(); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); @@ -188,76 +216,66 @@ public boolean isTimeMatched(final long begin, final long end) { public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { - if (this.mappedFile.hold()) { - int keyHash = indexKeyHashMethod(key); - int slotPos = keyHash % this.hashSlotNum; - int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; - - FileLock fileLock = null; - try { - if (lock) { - // fileLock = this.fileChannel.lock(absSlotPos, - // hashSlotSize, true); - } - - int slotValue = this.mappedByteBuffer.getInt(absSlotPos); - // if (fileLock != null) { - // fileLock.release(); - // fileLock = null; - // } - - if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() - || this.indexHeader.getIndexCount() <= 1) { - } else { - for (int nextIndexToRead = slotValue; ; ) { - if (phyOffsets.size() >= maxNum) { - break; - } - - int absIndexPos = - IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize - + nextIndexToRead * indexSize; + int keyHash = indexKeyHashMethod(key); + int slotPos = keyHash % this.hashSlotNum; + int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; + + FileLock fileLock = null; + try { + fileLock = randomAccessFile.getChannel().lock(); + + randomAccessFile.seek(absSlotPos); + int slotValue = randomAccessFile.readInt(); + if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() + || this.indexHeader.getIndexCount() <= 1) { + } else { + for (int nextIndexToRead = slotValue; ; ) { + if (phyOffsets.size() >= maxNum) { + break; + } - int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); - long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); + int absIndexPos = + IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + + nextIndexToRead * indexSize; - long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); - int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); - if (timeDiff < 0) { - break; - } + randomAccessFile.seek(absIndexPos); + int keyHashRead = randomAccessFile.readInt(); + long phyOffsetRead = randomAccessFile.readLong(); + long timeDiff = (long) randomAccessFile.readInt(); + int prevIndexRead = randomAccessFile.readInt(); - timeDiff *= 1000L; + if (timeDiff < 0) { + break; + } - long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; - boolean timeMatched = (timeRead >= begin) && (timeRead <= end); + timeDiff *= 1000L; - if (keyHash == keyHashRead && timeMatched) { - phyOffsets.add(phyOffsetRead); - } + long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; + boolean timeMatched = (timeRead >= begin) && (timeRead <= end); - if (prevIndexRead <= invalidIndex - || prevIndexRead > this.indexHeader.getIndexCount() - || prevIndexRead == nextIndexToRead || timeRead < begin) { - break; - } + if (keyHash == keyHashRead && timeMatched) { + phyOffsets.add(phyOffsetRead); + } - nextIndexToRead = prevIndexRead; + if (prevIndexRead <= invalidIndex + || prevIndexRead > this.indexHeader.getIndexCount() + || prevIndexRead == nextIndexToRead || timeRead < begin) { + break; } + + nextIndexToRead = prevIndexRead; } - } catch (Exception e) { - log.error("selectPhyOffset exception ", e); - } finally { - if (fileLock != null) { - try { - fileLock.release(); - } catch (IOException e) { - log.error("Failed to release the lock", e); - } + } + } catch (Exception e) { + log.error("selectPhyOffset exception ", e); + } finally { + if (fileLock != null) { + try { + fileLock.release(); + } catch (IOException e) { + log.error("Failed to release the lock", e); } - - this.mappedFile.release(); } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java index 44021cd5895..0f3e7e2786c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java @@ -16,10 +16,23 @@ */ package org.apache.rocketmq.store.index; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +/** + * Index hdeader protocol is as below + * Long beginTimestamp + * Long endTimestamp + * Long beginPhyOffset + * Long endPhyOffset + * Int hashSlotCount + * Int indexCount + */ public class IndexHeader { public static final int INDEX_HEADER_SIZE = 40; private static int beginTimestampIndex = 0; @@ -28,7 +41,7 @@ public class IndexHeader { private static int endPhyoffsetIndex = 24; private static int hashSlotcountIndex = 32; private static int indexCountIndex = 36; - private final ByteBuffer byteBuffer; + private final RandomAccessFile randomAccessFile; private AtomicLong beginTimestamp = new AtomicLong(0); private AtomicLong endTimestamp = new AtomicLong(0); private AtomicLong beginPhyOffset = new AtomicLong(0); @@ -37,31 +50,39 @@ public class IndexHeader { private AtomicInteger indexCount = new AtomicInteger(1); - public IndexHeader(final ByteBuffer byteBuffer) { - this.byteBuffer = byteBuffer; + public IndexHeader(RandomAccessFile randomAccessFile) { + this.randomAccessFile = randomAccessFile; } - public void load() { - this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex)); - this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex)); - this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex)); - this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex)); + public void load() throws IOException { + randomAccessFile.seek(beginTimestampIndex); + this.beginTimestamp.set(randomAccessFile.readLong()); + this.endTimestamp.set(randomAccessFile.readLong()); + this.beginPhyOffset.set(randomAccessFile.readLong()); + this.endPhyOffset.set(randomAccessFile.readLong()); - this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex)); - this.indexCount.set(byteBuffer.getInt(indexCountIndex)); + this.hashSlotCount.set(randomAccessFile.readInt()); + this.indexCount.set(randomAccessFile.readInt()); if (this.indexCount.get() <= 0) { this.indexCount.set(1); } } - public void updateByteBuffer() { - this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get()); - this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get()); - this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get()); - this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get()); - this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get()); - this.byteBuffer.putInt(indexCountIndex, this.indexCount.get()); + public void updateByteBuffer() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + dos.writeLong(this.beginTimestamp.get()); + dos.writeLong(this.endTimestamp.get()); + dos.writeLong(this.beginPhyOffset.get()); + dos.writeLong(this.endPhyOffset.get()); + + dos.writeInt(this.hashSlotCount.get()); + dos.writeInt(this.indexCount.get()); + + randomAccessFile.seek(beginTimestampIndex); + randomAccessFile.write(bos.toByteArray()); } public long getBeginTimestamp() { @@ -70,7 +91,6 @@ public long getBeginTimestamp() { public void setBeginTimestamp(long beginTimestamp) { this.beginTimestamp.set(beginTimestamp); - this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp); } public long getEndTimestamp() { @@ -79,7 +99,6 @@ public long getEndTimestamp() { public void setEndTimestamp(long endTimestamp) { this.endTimestamp.set(endTimestamp); - this.byteBuffer.putLong(endTimestampIndex, endTimestamp); } public long getBeginPhyOffset() { @@ -88,7 +107,6 @@ public long getBeginPhyOffset() { public void setBeginPhyOffset(long beginPhyOffset) { this.beginPhyOffset.set(beginPhyOffset); - this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset); } public long getEndPhyOffset() { @@ -97,7 +115,6 @@ public long getEndPhyOffset() { public void setEndPhyOffset(long endPhyOffset) { this.endPhyOffset.set(endPhyOffset); - this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset); } public AtomicInteger getHashSlotCount() { @@ -106,7 +123,6 @@ public AtomicInteger getHashSlotCount() { public void incHashSlotCount() { int value = this.hashSlotCount.incrementAndGet(); - this.byteBuffer.putInt(hashSlotcountIndex, value); } public int getIndexCount() { @@ -115,6 +131,5 @@ public int getIndexCount() { public void incIndexCount() { int value = this.indexCount.incrementAndGet(); - this.byteBuffer.putInt(indexCountIndex, value); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java index 7ad5b38db1d..ee36d791dbf 100644 --- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java @@ -30,8 +30,8 @@ import static org.assertj.core.api.Assertions.assertThat; public class IndexFileTest { - private final int HASH_SLOT_NUM = 100; - private final int INDEX_NUM = 400; + private final int HASH_SLOT_NUM = 5000000; + private final int INDEX_NUM = 2000000; @Test public void testPutKey() throws Exception { From e8602bad83eb6424719b3b3d535de6377d6ed5aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=81=E8=88=AC?= Date: Mon, 26 Mar 2018 16:12:07 +0800 Subject: [PATCH 3/4] add index file test --- .../rocketmq/store/index/IndexFile.java | 30 ++++--------------- .../rocketmq/store/index/IndexFileTest.java | 6 ++-- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index 1437fa4466a..bfb33423ee2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -42,6 +42,7 @@ public IndexFile(final String fileName, final int hashSlotNum, final int indexNu int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); this.fileName = fileName; this.randomAccessFile = new RandomAccessFile(fileName,"rw"); + this.randomAccessFile.setLength(fileTotalSize); this.hashSlotNum = hashSlotNum; this.indexNum = indexNum; @@ -57,27 +58,6 @@ public IndexFile(final String fileName, final int hashSlotNum, final int indexNu this.indexHeader.setBeginTimestamp(endTimestamp); this.indexHeader.setEndTimestamp(endTimestamp); } - - initFile(fileTotalSize); - } - - private void initFile(int size){ - try{ - int indexCount = indexHeader.getIndexCount(); - if(indexCount<2){ - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - for (int i = 0; i < 1024*32; i++) { - byteArrayOutputStream.write(0); - } - - randomAccessFile.seek((long)IndexHeader.INDEX_HEADER_SIZE); - for (int i = IndexHeader.INDEX_HEADER_SIZE; i < size; i+=1024*1024) { - randomAccessFile.write(byteArrayOutputStream.toByteArray()); - } - } - }catch (IOException e){ - log.error("init IndexFile failed", e); - } } public String getFileName() { @@ -93,7 +73,11 @@ public void load() { } public void flush() { - //Do nothing + try { + this.indexHeader.updateByteBuffer(); + } catch (IOException e) { + log.error("flush file {} index header error", fileName, e); + } } public boolean isWriteFull() { @@ -165,8 +149,6 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); - - this.indexHeader.updateByteBuffer(); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java index ee36d791dbf..7ed081e959a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java @@ -31,16 +31,18 @@ public class IndexFileTest { private final int HASH_SLOT_NUM = 5000000; - private final int INDEX_NUM = 2000000; + private final int INDEX_NUM = 1000000; @Test public void testPutKey() throws Exception { IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0); + long start = System.currentTimeMillis(); for (long i = 0; i < (INDEX_NUM - 1); i++) { boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); assertThat(putResult).isTrue(); } - + long end = System.currentTimeMillis(); + System.out.println("total:" + INDEX_NUM + " cost:" + (end - start)); // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); assertThat(putResult).isFalse(); From 49a62b3edd37e139b95d435ddc82a9ef7932cee9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=81=E8=88=AC?= Date: Mon, 26 Mar 2018 16:44:29 +0800 Subject: [PATCH 4/4] fix testcase and index file --- .../producer/DefaultMQProducerTest.java | 5 +++- .../rocketmq/store/index/IndexFile.java | 9 +++++++ .../rocketmq/store/index/IndexHeader.java | 26 +++++++------------ 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ded22ada914..7571d912dd2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,6 +73,8 @@ public class DefaultMQProducerTest { private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; + private Random random = new Random(); + @Before public void init() throws Exception { String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); @@ -200,7 +203,7 @@ public void run() { @Test public void testSetCallbackExecutor() throws MQClientException { - String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis() + random.nextInt(100); producer = new DefaultMQProducer(producerGroupTemp); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index bfb33423ee2..a51272689a2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -18,6 +18,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileLock; @@ -41,6 +42,14 @@ public IndexFile(final String fileName, final int hashSlotNum, final int indexNu final long endPhyOffset, final long endTimestamp) throws IOException { int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); this.fileName = fileName; + File indexFile = new File(fileName); + if (!indexFile.exists()) { + File parentFile = new File(indexFile.getAbsoluteFile().getParent()); + if (!parentFile.exists()) { + parentFile.mkdirs(); + } + indexFile.createNewFile(); + } this.randomAccessFile = new RandomAccessFile(fileName,"rw"); this.randomAccessFile.setLength(fileTotalSize); diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java index 0f3e7e2786c..682a67ca0f1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java @@ -20,7 +20,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -36,11 +35,6 @@ public class IndexHeader { public static final int INDEX_HEADER_SIZE = 40; private static int beginTimestampIndex = 0; - private static int endTimestampIndex = 8; - private static int beginPhyoffsetIndex = 16; - private static int endPhyoffsetIndex = 24; - private static int hashSlotcountIndex = 32; - private static int indexCountIndex = 36; private final RandomAccessFile randomAccessFile; private AtomicLong beginTimestamp = new AtomicLong(0); private AtomicLong endTimestamp = new AtomicLong(0); @@ -70,19 +64,19 @@ public void load() throws IOException { } public void updateByteBuffer() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); - dos.writeLong(this.beginTimestamp.get()); - dos.writeLong(this.endTimestamp.get()); - dos.writeLong(this.beginPhyOffset.get()); - dos.writeLong(this.endPhyOffset.get()); + dos.writeLong(this.beginTimestamp.get()); + dos.writeLong(this.endTimestamp.get()); + dos.writeLong(this.beginPhyOffset.get()); + dos.writeLong(this.endPhyOffset.get()); - dos.writeInt(this.hashSlotCount.get()); - dos.writeInt(this.indexCount.get()); + dos.writeInt(this.hashSlotCount.get()); + dos.writeInt(this.indexCount.get()); - randomAccessFile.seek(beginTimestampIndex); - randomAccessFile.write(bos.toByteArray()); + randomAccessFile.seek(beginTimestampIndex); + randomAccessFile.write(bos.toByteArray()); } public long getBeginTimestamp() {