Skip to content

Commit

Permalink
Add comments to supplyEntryList method
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed May 28, 2023
1 parent 4816363 commit e5902ad
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
6 changes: 2 additions & 4 deletions docs/advanced-topics/executor-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ are typically those configuration options which are provided via configurations

## Cassandra backend ExecutorService

Cassandra backend has multi-key queries support. Thus, `storage.parallel-backend-executor-service` is ignored and not
in use when Cassandra backend is in use.
Nevertheless, Cassandra backend doesn't use `storage.parallel-backend-executor-service` for IO operations it has its
own internal ExecutorService for queries deserialization processing.
Although Cassandra backend doesn't use `storage.parallel-backend-executor-service` due to having multi-key queries support, it
has its own internal ExecutorService for queries deserialization processing.
Usually it's not recommended to configure this executor service because it's considered to be optimal by default.
In case when the default executor service doesn't fit user's use-case for any reason, the configuration options under
`storage.cql.executor-service` can be used to modify it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ public static <E> void supplyEntryList(ChunkedJobDefinition<Iterator<E>, EntryLi
supplyEntryList(chunkedJobDefinition, getter, StaticArrayEntry.StaticBufferHandler.INSTANCE, executorService);
}

/**
* Non-blocking method which adds data chunks processing job to the provided `executorService` and returns immediately.
*/
private static <E,D> void supplyEntryList(ChunkedJobDefinition<Iterator<E>, EntryListComputationContext, EntryList> chunkedJobDefinition,

Check warning on line 424 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java#L424

The method 'supplyEntryList(ChunkedJobDefinition, StaticArrayEntry.GetColVal, StaticArrayEntry.DataHandler, ExecutorService)' has an NPath complexity of 2640, current threshold is 200
StaticArrayEntry.GetColVal<E,D> getter,
StaticArrayEntry.DataHandler<D> dataHandler,
Expand All @@ -428,11 +431,18 @@ private static <E,D> void supplyEntryList(ChunkedJobDefinition<Iterator<E>, Entr

Queue<Iterator<E>> chunksQueue = chunkedJobDefinition.getDataChunks();

// In case another thread is already processing data chunks, we don't need to do anything.
// Thus, we just drop. Even if that another thread is already finishing the processing logic
// and didn't have a chance to process out page yet - it's still OK to drop here because
// we know that after that thread releases the processing lock it will check for any non-processed
// chunks of data and will trigger the processing again if necessary.
if(!chunkedJobDefinition.tryLockProcessing()){
return;
}

try {
// after we acquired the processing lock we need to ensure that the result wasn't finished
// by any other thread already.
if(chunkedJobDefinition.getResult().isDone()){
return;
}
Expand All @@ -442,6 +452,10 @@ private static <E,D> void supplyEntryList(ChunkedJobDefinition<Iterator<E>, Entr
EntryListComputationContext context = chunkedJobDefinition.getProcessedDataContext();

if(context == null){
// In case it's the first page (`context == null`) and there are not any elements
// then we don't need to waste resources generating the context. We know that the final result will
// be `EMPTY_LIST`. Thus, we complete the result with `EMPTY_LIST` without generating the context
// information.
if(chunkedJobDefinition.isLastChunkRetrieved() && chunksQueue.isEmpty() && !elements.hasNext()){
chunkedJobDefinition.complete(EMPTY_LIST);
return;
Expand All @@ -450,6 +464,7 @@ private static <E,D> void supplyEntryList(ChunkedJobDefinition<Iterator<E>, Entr
chunkedJobDefinition.setProcessedDataContext(context);
}

// compute all the available chunks of data one by one
do {
if(elements.hasNext()){
applyElementsComputation(elements, getter, dataHandler, context);
Expand All @@ -460,17 +475,55 @@ private static <E,D> void supplyEntryList(ChunkedJobDefinition<Iterator<E>, Entr
elements = chunksQueue.remove();
} while (true);

// If we processed the last chunk of data then we must complete the result with the final computation.
if(chunkedJobDefinition.isLastChunkRetrieved() && chunksQueue.isEmpty()){
// Usually `context.metadataSchema == null` never happens because it means that there were
// multiple pages where all pages didn't have any elements. In most cases we will have only one
// page with no elements, but there are some storage backends which may actually return multiple
// pages with no data. In such case, as there were no any data for multiple pages, it means that
// the result is `EMPTY_LIST`.
// In case there was any data processed (i.e. `context.metadataSchema != null`) then we must
// retrieve the final result via `convert(context)`.
chunkedJobDefinition.complete(context.metadataSchema == null ? EMPTY_LIST : convert(context));
}

} catch (Throwable throwable){
// This case may happen only in case there is a bug in the processing logic.
// It could be considered as a redundant check, but instead of losing the exception
// and never finish the completion (which may potentially block the consumer thread of the `result` forever),
// it's better to stop the processing and show the exception to the user.
// Ideally this case never happens, but if it does, we need to find and fix the bug in the current method.
chunkedJobDefinition.getResult().completeExceptionally(throwable);
return;
} finally {
chunkedJobDefinition.unlockProcessing();
}

/*
The below checks are necessary for the following reasons:
- In case when a new page data is added into `chunksQueue` there is no guarantee that the producer thread
actually triggered consumption of the pool. It could be that the thread couldn't grab the lock (chunkedJobDefinition.tryLockProcessing())
which would result in the thread to just leave the computing logic. In such case, we need to double verify that `chunksQueue` is
empty before leaving this thread. Otherwise, if `chunksQueue` is not empty (`!chunksQueue.isEmpty()`) the current thread needs to
try triggering the processing logic again.
- Notice, that the operation of adding the last page data to the `chunksQueue` and marking the last page a retrieved
(`chunkedJobDefinition.setLastChunkRetrieved()`) are two separate operations. Thus, it could happen that the producer
thread added the last page data to the `chunksQueue` but didn't have a chance yet of marking it as a last page.
In such situation it could potentially happen that the current consumer thread already processed the last page but didn't
know that it was the last page. As the current thread didn't know that it was the last page (even so, it actually was the last page)
the current thread couldn't finish the result (i.e. in such case current thread wouldn't call `chunkedJobDefinition.complete(...)`).
In case it happened that the current processing thread processed the last chunk of data but didn't have a chance to
finish the computation with the final result AND the producer's triggered thread already tried to trigger
this computation logic but failed do to `processingLock` being held by the current thread, it will result in
the case when we processed all data chunks but never finish the result (i.e. we will never call `chunkedJobDefinition.complete(...)`).
This is a quite rare situation which may happen only if the producer's triggered thread had much more CPU time that
the current processing thread.
Nevertheless, is case such situation happens we need to double-check it and try to trigger the computation finalization
whenever the last chunk is marked a retrieved but the result wasn't completed for any reason. Thus, even so, the below
check `chunkedJobDefinition.isLastChunkRetrieved() && !chunkedJobDefinition.getResult().isDone()` is necessary.
*/
if(!chunksQueue.isEmpty() || chunkedJobDefinition.isLastChunkRetrieved() && !chunkedJobDefinition.getResult().isDone()){
supplyEntryList(chunkedJobDefinition, getter, dataHandler, executorService);
}
Expand Down

0 comments on commit e5902ad

Please sign in to comment.