-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Description
I have been investigating why clusters have been taking a long time to download segments. After adding a few log lines and closely inspecting the code, it looks like that a zookeeper node for a segment is not created till the processing (download and memory mapping) of previous segment is complete.
See the below code blocks in CuratorLoadQueuePeon
In processSegmentChangeRequest(), we have this piece of block that prevents processing of new segment change requests if there is one in progress.
if (currentlyProcessing != null) {
log.debug(
"Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentId()
);
return;
}
In non-failure conditions, currentlyProcessing is set to null only after the zk node for the segment has been deleted
case NodeDeleted:
log.info("Watcher notified of %s deletion", watchedEvent.getPath());
entryRemoved(watchedEvent.getPath());
break;
entryRemoved() calls actionCompleted() which is where we end up setting currentlyProcessing to null
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
Code in ZkCoordinator which does the node deletion in a callback after the download segment request is complete:
request.go(
dataSegmentChangeHandler,
new DataSegmentChangeCallback()
{
boolean hasRun = false;
@Override
public void execute()
{
try {
if (!hasRun) {
>>>>>>> curator.delete().guaranteed().forPath(path);
hasRun = true;
}
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw Throwables.propagate(e);
}
}
}
Consequently, majority of historical nodes stay idle for periods doing nothing even though they have the bandwidth to download the segments.
Even after solving the above problem, we still need to improve the concurrency of downloading segments from cold storage.
In ZkCoordinator, a listener is installed on the PathChildrenCache to detect when a node corresponding to the segment to be processed is added. However, downloading and memory mapping of the segment happens in the same thread that is used to handle this event.
This severely limits the throughput at which segments can be downloaded from the cluster. So to improve the performance, my proposal is to improve the parallelism of the two components in the following ways:
- In CuratorBasedLoadQueuePeon - remove the restriction of only processing one segment at a time. And utilize a thread pool to create zookeeper nodes corresponding to segments that need to be processed (load/drop).
- In ZkCoordinator, utilize a thread pool to download the segments from the historical. This new threadpool, to do the actual segment processing, will be different from the single threaded pool being used by the PathChildren cache for handling child created events.