Skip to content

Commit

Permalink
Implement multi-key slice async queries for CQL storage backend
Browse files Browse the repository at this point in the history
Fixes #3759

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed May 3, 2023
1 parent 5159135 commit 651c969
Show file tree
Hide file tree
Showing 24 changed files with 429 additions and 474 deletions.
21 changes: 8 additions & 13 deletions docs/advanced-topics/executor-service.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Backend ExecutorService

By default JanusGraph uses `ExecutorService` to process some queries in parallel
(see `storage.parallel-backend-ops` configuration option).
By default, JanusGraph uses `ExecutorService` to process some queries in parallel for storage backend implementations
which don't support multi-key queries (see `storage.parallel-backend-ops` configuration option).
JanusGraph allows to configure the executor service which is used via configuration options
provided in `storage.parallel-backend-executor-service` configuration section.

Currently JanusGraph has the next ExecutorService implementations which can be used (controlled via
Currently, JanusGraph has the next ExecutorService implementations which can be used (controlled via
`storage.parallel-backend-executor-service.class`):

* `fixed` - fixed thread pool size;
Expand All @@ -30,18 +30,13 @@ are typically those configuration options which are provided via configurations
# Cassandra backend ExecutorService

Apart from the general executor service mentioned in the previous section, Cassandra backend uses an additional
`ExecutorService` to process CQL queries by default (see `storage.cql.executor-service.enabled` configuration option).
`ExecutorService` to process result deserialization for CQL queries by default (see `storage.cql.executor-service` configuration options).
The rules by which the Cassandra backend `ExecutorService` is built are the
same as the rules which are used to build parallel backend queries `ExecutorService` (described above).
The only difference is that the configuration for Cassandra backend `ExecutorService` are provided via configuration
options under `storage.cql.executor-service`.
Disabling CQL executor service reduces overhead of thread pool but requires the user to tune maximum throughput thoroughly.
With disabled CQL executor service the parallelism will be controlled internally by the CQL driver via the next properties:
`storage.cql.max-requests-per-connection`, `storage.cql.local-max-connections-per-host`, `storage.cql.remote-max-connections-per-host`.

!!! info
It is recommended to disable CQL executor service in a production environment and properly configure maximum throughput.
CQL executor service does not provide any benefits other than limiting the amount of parallel requests per JanusGraph instance.
options under `storage.cql.executor-service`.

!!! warning
Improper tuning of maximum throughput might result in failures under heavy workloads.
By default, `storage.cql.executor-service` is configured to have a core pool size of number of processors multiplied
by 2. It's recommended to always use the default value unless there is a reason to artificially limit parallelism for
CQL slice query deserialization.
7 changes: 3 additions & 4 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,13 @@ CQL storage backend options
| storage.cql.write-consistency-level | The consistency level of write operations against Cassandra | String | QUORUM | MASKABLE |

### storage.cql.executor-service
Configuration options for CQL executor service which is used to process CQL queries.
Configuration options for CQL executor service which is used to process deserialization of CQL queries.


| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| storage.cql.executor-service.class | The implementation of `ExecutorService` to use. The full name of the class which extends `ExecutorService` which has either a public constructor with `ExecutorServiceConfiguration` argument (preferred constructor) or a public parameterless constructor. Other accepted options are: `fixed` - fixed thread pool of size `core-pool-size`; `cached` - cached thread pool; | String | fixed | LOCAL |
| storage.cql.executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 10 | LOCAL |
| storage.cql.executor-service.enabled | Whether to use CQL executor service to process queries or not. If not used, the parallelism will be controlled internally by the CQL driver via `storage.cql.max-requests-per-connection` parameter which may be preferable in production environments. Disabling executor service reduces overhead of thread pool but might be more difficult to tune. | Boolean | false | LOCAL |
| storage.cql.executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set or set to -1 the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL |
| storage.cql.executor-service.keep-alive-time | Keep alive time in milliseconds for executor service. When the number of threads is greater than the `core-pool-size`, this is the maximum time that excess idle threads will wait for new tasks before terminating. Ignored for `fixed` executor service and may be ignored if custom executor service is used (depending on the implementation of the executor service). | Long | 60000 | LOCAL |
| storage.cql.executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL |
| storage.cql.executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL |
Expand Down Expand Up @@ -590,7 +589,7 @@ Configuration options for executor service which is used for parallel requests w
| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| storage.parallel-backend-executor-service.class | The implementation of `ExecutorService` to use. The full name of the class which extends `ExecutorService` which has either a public constructor with `ExecutorServiceConfiguration` argument (preferred constructor) or a public parameterless constructor. Other accepted options are: `fixed` - fixed thread pool of size `core-pool-size`; `cached` - cached thread pool; | String | fixed | LOCAL |
| storage.parallel-backend-executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL |
| storage.parallel-backend-executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set or set to -1 the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL |
| storage.parallel-backend-executor-service.keep-alive-time | Keep alive time in milliseconds for executor service. When the number of threads is greater than the `core-pool-size`, this is the maximum time that excess idle threads will wait for new tasks before terminating. Ignored for `fixed` executor service and may be ignored if custom executor service is used (depending on the implementation of the executor service). | Long | 60000 | LOCAL |
| storage.parallel-backend-executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL |
| storage.parallel-backend-executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,18 @@ public Backend(Configuration configuration) {
storeManagerLocking = storeManager;
}

threadPool = configuration.get(PARALLEL_BACKEND_OPS) ? buildExecutorService(configuration) : null;
if(configuration.get(PARALLEL_BACKEND_OPS)){
if(storeFeatures.hasMultiQuery()){
log.info(storeManager.getName() + " supports multi-key queries. Thus, option {} is ignored in favor of multi-key queries. " +
"Backend-ops executor pool will not be created for this storage backend.");
threadPool = null;
} else {
threadPool = buildExecutorService(configuration);
}
} else {
threadPool = null;
}

threadPoolShutdownMaxWaitTime = configuration.get(PARALLEL_BACKEND_EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME);

final String lockBackendName = configuration.get(LOCK_BACKEND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,17 @@ public String toString() {
final AtomicInteger failureCount = new AtomicInteger(0);
EntryList[] resultArray = new EntryList[keys.size()];
for (int i = 0; i < keys.size(); i++) {
threadPool.execute(new SliceQueryRunner(new KeySliceQuery(keys.get(i), query),
doneSignal, failureCount, resultArray, i));
final int pos = i;
threadPool.execute(() -> {
try {
resultArray[pos] = edgeStoreQuery(new KeySliceQuery(keys.get(pos), query));
} catch (Exception e) {
failureCount.incrementAndGet();
log.warn("Individual query in multi-transaction failed: ", e);
} finally {
doneSignal.countDown();
}
});
}
try {
doneSignal.await();
Expand All @@ -338,38 +347,6 @@ public String toString() {
}
}

private class SliceQueryRunner implements Runnable {

final KeySliceQuery kq;
final CountDownLatch doneSignal;
final AtomicInteger failureCount;
final Object[] resultArray;
final int resultPosition;

private SliceQueryRunner(KeySliceQuery kq, CountDownLatch doneSignal, AtomicInteger failureCount,
Object[] resultArray, int resultPosition) {
this.kq = kq;
this.doneSignal = doneSignal;
this.failureCount = failureCount;
this.resultArray = resultArray;
this.resultPosition = resultPosition;
}

@Override
public void run() {
try {
List<Entry> result;
result = edgeStoreQuery(kq);
resultArray[resultPosition] = result;
} catch (Exception e) {
failureCount.incrementAndGet();
log.warn("Individual query in multi-transaction failed: ", e);
} finally {
doneSignal.countDown();
}
}
}

public KeyIterator edgeStoreKeys(final SliceQuery sliceQuery) {
if (!storeFeatures.hasScan())
throw new UnsupportedOperationException("The configured storage backend does not support global graph operations - use Faunus instead");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.async;

import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.util.EntryListComputationContext;

import java.util.concurrent.CompletableFuture;

public class AsyncEntryListConvertContext extends EntryListComputationContext {

private final CompletableFuture<EntryList> result = new CompletableFuture<>();

public void complete(EntryList resultData){
this.result.complete(resultData);
}

public CompletableFuture<EntryList> getResult() {
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.async;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @param <T> single chunk of data
*/
public class DataChunks<T, C> {

/**
* Dynamic holder for data chunks to process.
*/
private final Queue<T> dataChunks = new ConcurrentLinkedQueue<>();

/**
* If `false` no more data chunks are expected to be added. Thus, the processing thread is open to finish processing and return.
*/
private volatile boolean lastChunkRetrieved;

/**
* Lock to be acquired by the processing thread to indicate to other potential threads which want to process the same
* chunks of data that there is already a thread working on processing.
* Notice that some threads will process only some chunks of data (older chunks) and may leave some unprocessed chunks.
* Thus, each thread which brings a new chunk of data must check at the end that there is already some thread working on
* processing the data. In case no threads working on processing the data - the thread which added a chunk of data to `dataChunks`
* must acquire the lock and process all chunks which are left for processing.
*/
private final ReentrantLock processingLock = new ReentrantLock();

/**
* Some partially or fully processed data with potential context information which can be used to resume processing
* for the next chunk of data.
*/
private final C processedDataContext;

public DataChunks(C processedDataContext) {
this.processedDataContext = processedDataContext;
}

public Queue<T> getDataChunks() {
return dataChunks;
}

public boolean isLastChunkRetrieved() {
return lastChunkRetrieved;
}

public void setLastChunkRetrieved() {
this.lastChunkRetrieved = true;
}

public ReentrantLock getProcessingLock() {
return processingLock;
}

public C getProcessedDataContext() {
return processedDataContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private static ExecutorService buildExecutorServiceFromClassName(ExecutorService
}

private static int toPoolSize(Integer poolSize){
return poolSize != null ? poolSize :
return poolSize != null && poolSize != -1 ? poolSize :
Runtime.getRuntime().availableProcessors() * THREAD_POOL_SIZE_SCALE_FACTOR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.util;

import org.janusgraph.diskstorage.EntryMetaData;

public class EntryListComputationContext {
public long[] limitAndValuePos;
public byte[] data;
public EntryMetaData[] metadataSchema;
public int pos;
public int offset;
}
Loading

0 comments on commit 651c969

Please sign in to comment.