From d1ff11556c286d11d8041f50c92334eab6760272 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 30 Nov 2017 20:54:58 +0800 Subject: [PATCH] [ROCKETMQ-321] Fix deleting files in the middle --- .../org/apache/rocketmq/store/MappedFile.java | 5 ++++ .../rocketmq/store/MappedFileQueue.java | 3 ++ .../rocketmq/store/MappedFileQueueTest.java | 29 +++++++++++++++++-- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 492ac5f26cf..0a43d47f8ff 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -570,6 +570,11 @@ public void munlock() { log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); } + //testable + File getFile() { + return this.file; + } + @Override public String toString() { return this.fileName; diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index edf4c918460..f2947f13fab 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -367,6 +367,9 @@ public int deleteExpiredFileByTime(final long expiredTime, } else { break; } + } else { + //avoid deleting files in the middle + break; } } } diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index 203dfcd565f..295809927a0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -20,9 +20,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; - import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Test; @@ -182,6 +180,33 @@ public void testDeleteExpiredFileByOffset() { mappedFileQueue.destroy(); } + @Test + public void testDeleteExpiredFileByTime() throws Exception { + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/f/", 1024, null); + + for (int i = 0; i < 100; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertThat(mappedFile).isNotNull(); + byte[] bytes = new byte[512]; + assertThat(mappedFile.appendMessage(bytes)).isTrue(); + } + + assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(50); + long expiredTime = 100 * 1000; + for (int i = 0; i < mappedFileQueue.getMappedFiles().size(); i++) { + MappedFile mappedFile = mappedFileQueue.getMappedFiles().get(i); + if (i < 5) { + mappedFile.getFile().setLastModified(System.currentTimeMillis() - expiredTime * 2); + } + if (i > 20) { + mappedFile.getFile().setLastModified(System.currentTimeMillis() - expiredTime * 2); + } + } + mappedFileQueue.deleteExpiredFileByTime(expiredTime, 0, 0, false); + assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45); + } + @After public void destory() { File file = new File("target/unit_test_store");