Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,14 @@ public CompletableFuture<List<IndexItem>> queryAsync(
try {
readWriteLock.readLock().lock();
ConcurrentNavigableMap<Long, IndexFile> pendingMap =
this.timeStoreTable.subMap(beginTime, true, endTime, true);
this.timeStoreTable.headMap(endTime, true);
List<CompletableFuture<Void>> futureList = new ArrayList<>(pendingMap.size());
ConcurrentSkipListMap<String /* queueId-offset */, IndexItem> result = new ConcurrentSkipListMap<>();

for (Map.Entry<Long, IndexFile> entry : pendingMap.descendingMap().entrySet()) {
if (entry.getValue().getEndTimestamp() < beginTime) {
break;
}
CompletableFuture<Void> completableFuture = entry.getValue()
.queryAsync(topic, key, maxCount, beginTime, endTime)
.thenAccept(itemList -> itemList.forEach(indexItem -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,4 +351,41 @@ public void concurrentGetTest() throws InterruptedException {
executorService.shutdown();
Assert.assertTrue(result.get());
}

@Test
public void queryCrossFileBoundaryTest() throws InterruptedException, ExecutionException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();

// Create first file with early beginTime
long file1Begin = System.currentTimeMillis();
for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1; i++) {
indexService.putKey(TOPIC_NAME, TOPIC_ID, QUEUE_ID,
Collections.singleton("crossKey"), i * 100L, MESSAGE_SIZE, file1Begin + i * 1000);
}

// Create second file with later beginTime (beyond query range)
long file2Begin = System.currentTimeMillis() + 100_000;
indexService.createNewIndexFile(file2Begin);
for (int i = 0; i < 5; i++) {
indexService.putKey(TOPIC_NAME, TOPIC_ID, QUEUE_ID,
Collections.singleton("crossKey"), i * 100L, MESSAGE_SIZE, file2Begin + i);
}

Assert.assertEquals(2, indexService.getTimeStoreTable().size());

// Query range: beginTime is AFTER file1's beginTime but BEFORE file1's last item timestamp
// This should select file1, NOT file2 (file2 beginTime > queryEnd)
long queryBegin = file1Begin + 5_000;
long queryEnd = file1Begin + 15_000;

List<IndexItem> results = indexService.queryAsync(
TOPIC_NAME, "crossKey", 10, queryBegin, queryEnd).get();

// file1 has items at timestamps: file1Begin, file1Begin+1000, ..., file1Begin+(N-1)*1000
// Items in range [file1Begin+5000, file1Begin+15000] should match
// The bug (subMap) would return empty because file1's key < queryBegin
Assert.assertFalse("Should find index items from file covering query range", results.isEmpty());
Assert.assertTrue("Should find items within query time range", results.size() > 0);
}
}
Loading