From ef210bf95c86b6a951998b622c20d4792f8f9731 Mon Sep 17 00:00:00 2001 From: HeimingZ Date: Thu, 8 Sep 2022 16:01:21 +0800 Subject: [PATCH] tolerate search index gap --- .../iotdb/db/wal/io/WALByteBufReader.java | 2 +- .../org/apache/iotdb/db/wal/node/WALNode.java | 82 ++++--- .../db/wal/node/ConsensusReqReaderTest.java | 208 +++++++++++++++++- 3 files changed, 242 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java index a993fe7c57c69..d5c0243795061 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java @@ -44,7 +44,7 @@ public class WALByteBufReader implements Closeable { public WALByteBufReader(File logFile) throws IOException { this.logFile = logFile; this.channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ); - if (!readTailMagic().equals(MAGIC_STRING)) { + if (channel.size() < MAGIC_STRING_BYTES || !readTailMagic().equals(MAGIC_STRING)) { throw new IOException(String.format("Broken wal file %s", logFile)); } // load metadata size diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java index 174ede96f28e1..cd71ca6db3af5 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java @@ -539,7 +539,7 @@ public boolean hasNext() { } } - // find all insert node of current wal file + // find all nodes of current wal file List tmpNodes = new ArrayList<>(); long targetIndex = nextSearchIndex; try (WALByteBufReader walByteBufReader = @@ -548,22 +548,25 @@ public boolean hasNext() { ByteBuffer buffer = walByteBufReader.next(); WALEntryType type = WALEntryType.valueOf(buffer.get()); if (type.needSearch()) { - // see WALInfoEntry#serialize, entry type + memtable id + insert node type + // see WALInfoEntry#serialize, entry type + memtable id + plan node type buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES); - long searchIndex = buffer.getLong(); + long currentIndex = buffer.getLong(); buffer.clear(); - if (searchIndex == targetIndex) { + if (currentIndex == targetIndex) { tmpNodes.add(new MultiLeaderConsensusRequest(buffer)); - } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan - insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); - targetIndex++; - tmpNodes = new ArrayList<>(); - // remember to add current insert node - if (searchIndex == targetIndex) { + } else { // different search index, all slices found + if (!tmpNodes.isEmpty()) { + insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); + tmpNodes = new ArrayList<>(); + } + // remember to add current plan node + if (currentIndex > targetIndex) { tmpNodes.add(new MultiLeaderConsensusRequest(buffer)); + targetIndex = currentIndex; } } - } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan + } else if (!tmpNodes + .isEmpty()) { // next entry doesn't need to be searched, all slices found insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); targetIndex++; tmpNodes = new ArrayList<>(); @@ -575,47 +578,57 @@ public boolean hasNext() { identifier, nextSearchIndex); reset(); - hasNext(); + return hasNext(); } catch (Exception e) { - logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e); + logger.error( + "Fail to read wal from wal file {}, skip this file.", + filesToSearch[currentFileIndex], + e); + // skip this file when it's broken from the beginning + if (insertNodes.isEmpty() && tmpNodes.isEmpty()) { + currentFileIndex++; + return hasNext(); + } } - // find remaining slices of last insert plan of targetIndex - if (tmpNodes.isEmpty()) { // all insert plans scanned + // find remaining slices of last plan node of targetIndex + if (tmpNodes.isEmpty()) { // all plan nodes scanned currentFileIndex++; } else { int fileIndex = currentFileIndex + 1; while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) { - // cannot find any in this file, find all slices of last insert plan + // cannot find any in this file, so all slices of last plan node are found if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName()) == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) { insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); tmpNodes = Collections.emptyList(); break; } - // read until find all insert node whose search index equals target index + // read until find all plan nodes whose search index equals target index try (WALByteBufReader walByteBufReader = new WALByteBufReader(filesToSearch[fileIndex])) { + // first search index are different, so all slices of last plan node are found if (walByteBufReader.getFirstSearchIndex() != targetIndex) { insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); tmpNodes = Collections.emptyList(); break; } else { + // read until one node has different search index while (walByteBufReader.hasNext()) { ByteBuffer buffer = walByteBufReader.next(); WALEntryType type = WALEntryType.valueOf(buffer.get()); if (type.needSearch()) { - // see WALInfoEntry#serialize, entry type + memtable id + insert node type + // see WALInfoEntry#serialize, entry type + memtable id + plan node type buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES); - long searchIndex = buffer.getLong(); + long currentIndex = buffer.getLong(); buffer.clear(); - if (searchIndex == targetIndex) { + if (currentIndex == targetIndex) { tmpNodes.add(new MultiLeaderConsensusRequest(buffer)); - } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan + } else { // find all slices of plan node insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); tmpNodes = Collections.emptyList(); break; } - } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan + } else { // find all slices of plan node insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); tmpNodes = Collections.emptyList(); break; @@ -628,9 +641,10 @@ public boolean hasNext() { identifier, nextSearchIndex); reset(); - hasNext(); + return hasNext(); } catch (Exception e) { - logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e); + logger.error( + "Fail to read wal from wal file {}, skip this file.", filesToSearch[fileIndex], e); } if (!tmpNodes.isEmpty()) { fileIndex++; @@ -664,25 +678,7 @@ public IndexedConsensusRequest next() { } IndexedConsensusRequest request = itr.next(); - if (request.getSearchIndex() == nextSearchIndex) { - nextSearchIndex++; - } else if (request.getSearchIndex() > nextSearchIndex) { - logger.warn( - "Search index of wal node-{} are not continuously, skip from {} to {}.", - identifier, - nextSearchIndex, - request.getSearchIndex()); - skipTo(request.getSearchIndex() + 1); - } else { - logger.error( - "Search index of wal node-{} are out of order, {} is before {}.", - identifier, - nextSearchIndex, - request.getSearchIndex()); - throw new RuntimeException( - String.format("Search index of wal node-%s are out of order", identifier)); - } - + nextSearchIndex = request.getSearchIndex() + 1; return request; } diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java index 9c59f58a26961..c19d64ed67eb3 100644 --- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java @@ -44,6 +44,8 @@ import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -75,9 +77,16 @@ public void tearDown() throws Exception { } /** - * Generate wal files as below: _0-0-1.wal: 1,-1 _1-1-1.wal: 2,2,2 _2-2-1.wal: 3,3 _3-3-1.wal: 3,4 - * _4-4-1.wal: 4 _5-4-1.wal: 4,4,5 _6-5-1.wal: 6 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, - * 3 - InsertRowsNode, 4 - InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode + * Generate wal files as below:
+ * _0-0-1.wal: 1,-1
+ * _1-1-1.wal: 2,2,2
+ * _2-2-1.wal: 3,3
+ * _3-3-1.wal: 3,4
+ * _4-4-1.wal: 4
+ * _5-4-1.wal: 4,4,5
+ * _6-5-1.wal: 6
+ * 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, 3 - InsertRowsNode, 4 - + * InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode */ private void simulateFileScenario01() throws IllegalPathException { InsertTabletNode insertTabletNode; @@ -382,8 +391,12 @@ public void scenario01TestGetReqIterator07() throws Exception { } /** - * Generate wal files as below: _0-0-0.wal: -1,-1 _1-0-0.wal: -1 _2-0-1.wal: -1,1 _3-1-0.wal: -1 1 - * - DeleteDataNode + * Generate wal files as below:
+ * _0-0-0.wal: -1,-1
+ * _1-0-0.wal: -1
+ * _2-0-1.wal: -1,1
+ * _3-1-0.wal: -1
+ * 1 - DeleteDataNode */ private void simulateFileScenario02() throws IllegalPathException { InsertRowNode insertRowNode = getInsertRowNode(devicePath); @@ -402,15 +415,113 @@ private void simulateFileScenario02() throws IllegalPathException { walNode.rollWALFile(); // _3-1-0.wal walNode.log(0, insertRowNode); // -1 - walNode.rollWALFile(); } @Test public void scenario02TestGetReqIterator01() throws Exception { simulateFileScenario02(); + walNode.rollWALFile(); + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + IndexedConsensusRequest request; + PlanNode planNode; + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(1, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(1, ((DeleteDataNode) planNode).getSearchIndex()); + } + } + + /** + * Generate wal files as below:
+ * _0-0-1.wal: broken
+ * _1-0-1.wal: 1,-1
+ * _2-1-1.wal: 2,2,2
+ * _3-2-1.wal: 3,3
+ * _4-3-1.wal: broken
+ * _5-3-1.wal: 3,5
+ * _6-5-1.wal: broken
+ * _7-5-1.wal: broken
+ * _8-5-1.wal: broken
+ * _9-8-1.wal: 8
+ * 1,2,3,5,8 - DeleteDataNode + */ + private void simulateFileScenario03() throws IllegalPathException, IOException { + // _0-0-1.wal + walNode.rollWALFile(); + new File(logDirectory, "_0-0-0.wal").delete(); + new File(logDirectory, "_0-0-1.wal").createNewFile(); + // _1-0-1.wal + DeleteDataNode deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(1); + walNode.log(0, deleteDataNode); // 1 + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(-1); + walNode.log(0, deleteDataNode); // -1 + walNode.rollWALFile(); + // _2-1-1.wal + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(2); + walNode.log(0, deleteDataNode); // 2 + walNode.log(0, deleteDataNode); // 2 + walNode.log(0, deleteDataNode); // 2 + walNode.rollWALFile(); + // _3-2-1.wal + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(3); + walNode.log(0, deleteDataNode); // 3 + walNode.log(0, deleteDataNode); // 3 + walNode.rollWALFile(); + // _4-3-1.wal + walNode.rollWALFile(); + new File(logDirectory, "_4-3-0.wal").delete(); + new File(logDirectory, "_4-3-1.wal").createNewFile(); + // _5-3-1.wal + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(3); + walNode.log(0, deleteDataNode); // 3 + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(5); + walNode.log(0, deleteDataNode); // 5 + walNode.rollWALFile(); + // _6-5-1.wal + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(6); + walNode.log(0, deleteDataNode); // 6 + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(7); + walNode.log(0, deleteDataNode); // 7 + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(8); + walNode.log(0, deleteDataNode); // 8 + walNode.rollWALFile(); + new File(logDirectory, "_6-5-1.wal").delete(); + new File(logDirectory, "_6-5-1.wal").createNewFile(); + // _7-5-1.wal + walNode.rollWALFile(); + new File(logDirectory, "_7-5-0.wal").delete(); + new File(logDirectory, "_7-5-1.wal").createNewFile(); + // _8-5-1.wal + walNode.rollWALFile(); + new File(logDirectory, "_8-5-0.wal").delete(); + new File(logDirectory, "_8-5-1.wal").createNewFile(); + // _9-8-1.wal + deleteDataNode = getDeleteDataNode(devicePath); + deleteDataNode.setSearchIndex(8); + walNode.log(0, deleteDataNode); // 8 + } + + @Test + public void scenario03TestGetReqIterator01() throws Exception { + simulateFileScenario03(); + walNode.rollWALFile(); + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); IndexedConsensusRequest request; PlanNode planNode; + Assert.assertTrue(iterator.hasNext()); request = iterator.next(); Assert.assertEquals(1, request.getRequests().size()); @@ -419,6 +530,91 @@ public void scenario02TestGetReqIterator01() throws Exception { Assert.assertTrue(planNode instanceof DeleteDataNode); Assert.assertEquals(1, ((DeleteDataNode) planNode).getSearchIndex()); } + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(3, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(2, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(3, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(3, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(1, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(5, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertFalse(iterator.hasNext()); + walNode.rollWALFile(); + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(1, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertFalse(iterator.hasNext()); + } + + @Test + public void scenario03TestGetReqIterator02() throws Exception { + simulateFileScenario03(); + walNode.rollWALFile(); + + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(6); + IndexedConsensusRequest request; + PlanNode planNode; + + Assert.assertFalse(iterator.hasNext()); + walNode.rollWALFile(); + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(1, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertFalse(iterator.hasNext()); + + iterator.skipTo(3); + + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(3, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(3, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(1, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(5, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertTrue(iterator.hasNext()); + request = iterator.next(); + Assert.assertEquals(1, request.getRequests().size()); + for (IConsensusRequest innerRequest : request.getRequests()) { + planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer()); + Assert.assertTrue(planNode instanceof DeleteDataNode); + Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex()); + } + Assert.assertFalse(iterator.hasNext()); } public static InsertRowNode getInsertRowNode(String devicePath) throws IllegalPathException {