Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class WALByteBufReader implements Closeable {
public WALByteBufReader(File logFile) throws IOException {
this.logFile = logFile;
this.channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ);
if (!readTailMagic().equals(MAGIC_STRING)) {
if (channel.size() < MAGIC_STRING_BYTES || !readTailMagic().equals(MAGIC_STRING)) {
throw new IOException(String.format("Broken wal file %s", logFile));
}
// load metadata size
Expand Down
82 changes: 39 additions & 43 deletions server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ public boolean hasNext() {
}
}

// find all insert node of current wal file
// find all nodes of current wal file
List<IConsensusRequest> tmpNodes = new ArrayList<>();
long targetIndex = nextSearchIndex;
try (WALByteBufReader walByteBufReader =
Expand All @@ -548,22 +548,25 @@ public boolean hasNext() {
ByteBuffer buffer = walByteBufReader.next();
WALEntryType type = WALEntryType.valueOf(buffer.get());
if (type.needSearch()) {
// see WALInfoEntry#serialize, entry type + memtable id + insert node type
// see WALInfoEntry#serialize, entry type + memtable id + plan node type
buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES);
long searchIndex = buffer.getLong();
long currentIndex = buffer.getLong();
buffer.clear();
if (searchIndex == targetIndex) {
if (currentIndex == targetIndex) {
tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
targetIndex++;
tmpNodes = new ArrayList<>();
// remember to add current insert node
if (searchIndex == targetIndex) {
} else { // different search index, all slices found
if (!tmpNodes.isEmpty()) {
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = new ArrayList<>();
}
// remember to add current plan node
if (currentIndex > targetIndex) {
tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
targetIndex = currentIndex;
}
}
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
} else if (!tmpNodes
.isEmpty()) { // next entry doesn't need to be searched, all slices found
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
targetIndex++;
tmpNodes = new ArrayList<>();
Expand All @@ -575,47 +578,57 @@ public boolean hasNext() {
identifier,
nextSearchIndex);
reset();
hasNext();
return hasNext();
} catch (Exception e) {
logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e);
logger.error(
"Fail to read wal from wal file {}, skip this file.",
filesToSearch[currentFileIndex],
e);
// skip this file when it's broken from the beginning
if (insertNodes.isEmpty() && tmpNodes.isEmpty()) {
currentFileIndex++;
return hasNext();
}
}

// find remaining slices of last insert plan of targetIndex
if (tmpNodes.isEmpty()) { // all insert plans scanned
// find remaining slices of last plan node of targetIndex
if (tmpNodes.isEmpty()) { // all plan nodes scanned
currentFileIndex++;
} else {
int fileIndex = currentFileIndex + 1;
while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
// cannot find any in this file, find all slices of last insert plan
// cannot find any in this file, so all slices of last plan node are found
if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
== WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = Collections.emptyList();
break;
}
// read until find all insert node whose search index equals target index
// read until find all plan nodes whose search index equals target index
try (WALByteBufReader walByteBufReader = new WALByteBufReader(filesToSearch[fileIndex])) {
// first search index are different, so all slices of last plan node are found
if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = Collections.emptyList();
break;
} else {
// read until one node has different search index
while (walByteBufReader.hasNext()) {
ByteBuffer buffer = walByteBufReader.next();
WALEntryType type = WALEntryType.valueOf(buffer.get());
if (type.needSearch()) {
// see WALInfoEntry#serialize, entry type + memtable id + insert node type
// see WALInfoEntry#serialize, entry type + memtable id + plan node type
buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES);
long searchIndex = buffer.getLong();
long currentIndex = buffer.getLong();
buffer.clear();
if (searchIndex == targetIndex) {
if (currentIndex == targetIndex) {
tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
} else { // find all slices of plan node
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = Collections.emptyList();
break;
}
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
} else { // find all slices of plan node
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = Collections.emptyList();
break;
Expand All @@ -628,9 +641,10 @@ public boolean hasNext() {
identifier,
nextSearchIndex);
reset();
hasNext();
return hasNext();
} catch (Exception e) {
logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e);
logger.error(
"Fail to read wal from wal file {}, skip this file.", filesToSearch[fileIndex], e);
}
if (!tmpNodes.isEmpty()) {
fileIndex++;
Expand Down Expand Up @@ -664,25 +678,7 @@ public IndexedConsensusRequest next() {
}

IndexedConsensusRequest request = itr.next();
if (request.getSearchIndex() == nextSearchIndex) {
nextSearchIndex++;
} else if (request.getSearchIndex() > nextSearchIndex) {
logger.warn(
"Search index of wal node-{} are not continuously, skip from {} to {}.",
identifier,
nextSearchIndex,
request.getSearchIndex());
skipTo(request.getSearchIndex() + 1);
} else {
logger.error(
"Search index of wal node-{} are out of order, {} is before {}.",
identifier,
nextSearchIndex,
request.getSearchIndex());
throw new RuntimeException(
String.format("Search index of wal node-%s are out of order", identifier));
}

nextSearchIndex = request.getSearchIndex() + 1;
return request;
}

Expand Down
Loading