New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IOTDB-860] Emend the async log applier #1635
[IOTDB-860] Emend the async log applier #1635
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix itself has a potential problem that when new logs keep coming in, it is very likely to time out.
Now I see you want to make sure when the snapshot is taken, all previous logs are applied. This side effect has been noticed but back to then, AsyncApplier is merely in the experimental stage. Since you have fixed one of the problems, I would like to give some pieces of advice to make AsyncApplier complete:
- When starting to take a snapshot, record the current commitIndex.
- Block until all logs whose indices <= the recorded commitIndex are applied. (Use RaftLogManager to do so instead of LogApplier)
- Prevent the log cleaner thread to clean logs that are not applied.
- Change
StorageEngine.getInstance().syncCloseAllProcessor();
intakeSnapshot()
to send a flush plan within the group. (So the file sequence will not be broken by snapshots) - When committed logs are recovered during start-up, re-apply all of them. (Notice that operation sequences in IoTDB are idempotent)
Great suggestion, I think the 1-3 items you mentioned is to make sure that when do snapshot, new logs can not be added and the snapshot task should not take long time. however now the implementation can block the new logs coming in, as when do snapshot we should get the logManager lock, so new logs can not be committed(commited log also need to get the logManager lock ), we just need to wait all the previous committed log applied when do snapshot. I'm going to rethink the 4-5 suggestions. |
b82d6cc
to
8cf4003
Compare
fix conflict
d639d8b
to
5c3f239
Compare
public void syncFlushAllProcessor() { | ||
logger.info("{}: Start flush all storage group processor in one data group", getName()); | ||
ConcurrentHashMap<String, StorageGroupProcessor> processorMap = StorageEngine.getInstance() | ||
.getProcessorMap(); | ||
if (processorMap.size() == 0) { | ||
logger.info("{}: no need to flush processor", getName()); | ||
return; | ||
} | ||
List<Path> storageGroups = new ArrayList<>(); | ||
for (Map.Entry<String, StorageGroupProcessor> entry : processorMap.entrySet()) { | ||
Path path = new Path(entry.getKey()); | ||
storageGroups.add(path); | ||
} | ||
FlushPlan plan = new FlushPlan(null, true, storageGroups); | ||
dataGroupMember.flushFileWhenDoSnapshot(plan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid that we cannot flush the whole storage group when time partitioning is enabled. Because in that case, a storage group will be managed by several data groups, if you flush one storage group without notifying other data groups, the file integrity of other data groups will be broken.
So you should either flush other related groups (which is rather hard to find), or only flush partitions that belong to the data group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thanks for your advice, I'd like to only flush the partitions that belong to the data group
No description provided.