Skip to content

Commit

Permalink
Implement multi-key slice async queries for CQL storage backend [cql-…
Browse files Browse the repository at this point in the history
…tests] [tp-tests]

- Adds multiQuery support to CQL storage backend.
- Ensure `storage.parallel-backend-ops` thread pool is created only for storage backends which don't support multiQuery (multi-key slice operations).
- Change purpose of `storage.cql.executor-service` to be used for results deserialization jobs only (not for IO operations).

Fixes #2406
Fixes #3747
Fixes #3759
Related to #3170

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed May 6, 2023
1 parent e332af6 commit 02385f0
Show file tree
Hide file tree
Showing 35 changed files with 1,021 additions and 760 deletions.
21 changes: 8 additions & 13 deletions docs/advanced-topics/executor-service.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Backend ExecutorService

By default JanusGraph uses `ExecutorService` to process some queries in parallel
(see `storage.parallel-backend-ops` configuration option).
By default, JanusGraph uses `ExecutorService` to process some queries in parallel for storage backend implementations
which don't support multi-key queries (see `storage.parallel-backend-ops` configuration option).
JanusGraph allows to configure the executor service which is used via configuration options
provided in `storage.parallel-backend-executor-service` configuration section.

Currently JanusGraph has the next ExecutorService implementations which can be used (controlled via
Currently, JanusGraph has the next ExecutorService implementations which can be used (controlled via
`storage.parallel-backend-executor-service.class`):

* `fixed` - fixed thread pool size;
Expand All @@ -30,18 +30,13 @@ are typically those configuration options which are provided via configurations
# Cassandra backend ExecutorService

Apart from the general executor service mentioned in the previous section, Cassandra backend uses an additional
`ExecutorService` to process CQL queries by default (see `storage.cql.executor-service.enabled` configuration option).
`ExecutorService` to process result deserialization for CQL queries by default (see `storage.cql.executor-service` configuration options).
The rules by which the Cassandra backend `ExecutorService` is built are the
same as the rules which are used to build parallel backend queries `ExecutorService` (described above).
The only difference is that the configuration for Cassandra backend `ExecutorService` are provided via configuration
options under `storage.cql.executor-service`.
Disabling CQL executor service reduces overhead of thread pool but requires the user to tune maximum throughput thoroughly.
With disabled CQL executor service the parallelism will be controlled internally by the CQL driver via the next properties:
`storage.cql.max-requests-per-connection`, `storage.cql.local-max-connections-per-host`, `storage.cql.remote-max-connections-per-host`.

!!! info
It is recommended to disable CQL executor service in a production environment and properly configure maximum throughput.
CQL executor service does not provide any benefits other than limiting the amount of parallel requests per JanusGraph instance.
options under `storage.cql.executor-service`.

!!! warning
Improper tuning of maximum throughput might result in failures under heavy workloads.
By default, `storage.cql.executor-service` is configured to have a core pool size of number of processors multiplied
by 2. It's recommended to always use the default value unless there is a reason to artificially limit parallelism for
CQL slice query deserialization.
18 changes: 9 additions & 9 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,15 @@ Notice that `janusgraph-cql` and `janusgraph-scylla` are mutually exclusive. Use
provide both dependencies in the same classpath. See [ScyllaDB Storage Backend documentation](storage-backend/scylladb.md) for more
information about how to make `scylla` `storage.backend` options available.

##### Disable CQL ExecutorService by default

Previously CQL ExecutorService was enabled by default mostly for historical reasons.
CQL ExecutorService managed by JanusGraph adds additional overhead without bringing any usefulness other than
limiting amount of parallel queries which can (and should) be controlled via the underlying CQL driver.

In case previous behaviour is desired then `storage.cql.executor-service.enabled` configuration option should be set to `true`,
but it's recommended to tune CQL queries parallelism using CQL driver configuration options (like `storage.cql.max-requests-per-connection`,
`storage.cql.local-max-connections-per-host`) and / or `storage.parallel-backend-ops.*` configuration options.
##### CQL ExecutorService purpose change

Previously CQL ExecutorService was used to control parallelism of both CQL IO operations and results deserialization.
Starting from JanusGraph 1.0.0 CQL ExecutorService is now used for CQL results deserialization only. All CQL IO operations
are now using internal async approach.
The default pool size is now set to have a value of `number of cores multiplied by 2`. This ExecutorService is now
mandatory and cannot be disabled. The default ExecutorService core pool size is not recommended to be changed as
the default value is considered to be optimal unless users want to artificially limit parallelism of CQL results deserialization
jobs.

##### Removal of deprecated classes/methods/functionalities

Expand Down
8 changes: 4 additions & 4 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ CQL storage backend options
| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| storage.cql.atomic-batch-mutate | True to use Cassandra atomic batch mutation, false to use non-atomic batches | Boolean | false | MASKABLE |
| storage.cql.back-pressure-limit | The maximum number of concurrent requests which are allowed to be processed by CQL driver. Any concurrent CQL requests which are above the provided limit are going to be back pressured using fair Semaphore. If no value is provided or the value is set to `0` then the value will be calculated based on CQL driver session provided parameters by using formula [advanced.connection.max-requests-per-connection * advanced.connection.pool.local.size * available_nodes_amount]. It's not recommended to use any value which is above this limit because it may result in CQL driver overload but it's suggested to have a lower value to keep the driver healthy under pressure. In situations when remote nodes connections are in use then the bigger value might be relevant as well to improve parallelism. In case the value `-1` is provided then the back pressure for CQL requests is turned off. In case the back pressure is turned off then it is advised to tune CQL driver for the ongoing workload. | Integer | (no default value) | MASKABLE |
| storage.cql.batch-statement-size | The number of statements in each batch | Integer | 20 | MASKABLE |
| storage.cql.compaction-strategy-class | The compaction strategy to use for JanusGraph tables | String | (no default value) | FIXED |
| storage.cql.compaction-strategy-options | Compaction strategy options. This list is interpreted as a map. It must have an even number of elements in [key,val,key,val,...] form. | String[] | (no default value) | FIXED |
Expand Down Expand Up @@ -445,14 +446,13 @@ CQL storage backend options
| storage.cql.write-consistency-level | The consistency level of write operations against Cassandra | String | QUORUM | MASKABLE |

### storage.cql.executor-service
Configuration options for CQL executor service which is used to process CQL queries.
Configuration options for CQL executor service which is used to process deserialization of CQL queries.


| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| storage.cql.executor-service.class | The implementation of `ExecutorService` to use. The full name of the class which extends `ExecutorService` which has either a public constructor with `ExecutorServiceConfiguration` argument (preferred constructor) or a public parameterless constructor. Other accepted options are: `fixed` - fixed thread pool of size `core-pool-size`; `cached` - cached thread pool; | String | fixed | LOCAL |
| storage.cql.executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 10 | LOCAL |
| storage.cql.executor-service.enabled | Whether to use CQL executor service to process queries or not. If not used, the parallelism will be controlled internally by the CQL driver via `storage.cql.max-requests-per-connection` parameter which may be preferable in production environments. Disabling executor service reduces overhead of thread pool but might be more difficult to tune. | Boolean | false | LOCAL |
| storage.cql.executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set or set to -1 the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL |
| storage.cql.executor-service.keep-alive-time | Keep alive time in milliseconds for executor service. When the number of threads is greater than the `core-pool-size`, this is the maximum time that excess idle threads will wait for new tasks before terminating. Ignored for `fixed` executor service and may be ignored if custom executor service is used (depending on the implementation of the executor service). | Long | 60000 | LOCAL |
| storage.cql.executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL |
| storage.cql.executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL |
Expand Down Expand Up @@ -590,7 +590,7 @@ Configuration options for executor service which is used for parallel requests w
| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| storage.parallel-backend-executor-service.class | The implementation of `ExecutorService` to use. The full name of the class which extends `ExecutorService` which has either a public constructor with `ExecutorServiceConfiguration` argument (preferred constructor) or a public parameterless constructor. Other accepted options are: `fixed` - fixed thread pool of size `core-pool-size`; `cached` - cached thread pool; | String | fixed | LOCAL |
| storage.parallel-backend-executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL |
| storage.parallel-backend-executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set or set to -1 the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL |
| storage.parallel-backend-executor-service.keep-alive-time | Keep alive time in milliseconds for executor service. When the number of threads is greater than the `core-pool-size`, this is the maximum time that excess idle threads will wait for new tasks before terminating. Ignored for `fixed` executor service and may be ignored if custom executor service is used (depending on the implementation of the executor service). | Long | 60000 | LOCAL |
| storage.parallel-backend-executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL |
| storage.parallel-backend-executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,18 @@ public Backend(Configuration configuration) {
storeManagerLocking = storeManager;
}

threadPool = configuration.get(PARALLEL_BACKEND_OPS) ? buildExecutorService(configuration) : null;
if(configuration.get(PARALLEL_BACKEND_OPS)){
if(storeFeatures.hasMultiQuery()){
log.info(storeManager.getName() + " supports multi-key queries. Thus, option {} is ignored in favor of multi-key queries. " +
"Backend-ops executor pool will not be created for this storage backend.");
threadPool = null;
} else {
threadPool = buildExecutorService(configuration);
}
} else {
threadPool = null;
}

threadPoolShutdownMaxWaitTime = configuration.get(PARALLEL_BACKEND_EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME);

final String lockBackendName = configuration.get(LOCK_BACKEND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,17 @@ public String toString() {
final AtomicInteger failureCount = new AtomicInteger(0);
EntryList[] resultArray = new EntryList[keys.size()];
for (int i = 0; i < keys.size(); i++) {
threadPool.execute(new SliceQueryRunner(new KeySliceQuery(keys.get(i), query),
doneSignal, failureCount, resultArray, i));
final int pos = i;
threadPool.execute(() -> {
try {
resultArray[pos] = edgeStoreQuery(new KeySliceQuery(keys.get(pos), query));
} catch (Exception e) {
failureCount.incrementAndGet();
log.warn("Individual query in multi-transaction failed: ", e);
} finally {
doneSignal.countDown();
}
});
}
try {
doneSignal.await();
Expand All @@ -338,38 +347,6 @@ public String toString() {
}
}

private class SliceQueryRunner implements Runnable {

final KeySliceQuery kq;
final CountDownLatch doneSignal;
final AtomicInteger failureCount;
final Object[] resultArray;
final int resultPosition;

private SliceQueryRunner(KeySliceQuery kq, CountDownLatch doneSignal, AtomicInteger failureCount,
Object[] resultArray, int resultPosition) {
this.kq = kq;
this.doneSignal = doneSignal;
this.failureCount = failureCount;
this.resultArray = resultArray;
this.resultPosition = resultPosition;
}

@Override
public void run() {
try {
List<Entry> result;
result = edgeStoreQuery(kq);
resultArray[resultPosition] = result;
} catch (Exception e) {
failureCount.incrementAndGet();
log.warn("Individual query in multi-transaction failed: ", e);
} finally {
doneSignal.countDown();
}
}
}

public KeyIterator edgeStoreKeys(final SliceQuery sliceQuery) {
if (!storeFeatures.hasScan())
throw new UnsupportedOperationException("The configured storage backend does not support global graph operations - use Faunus instead");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private static ExecutorService buildExecutorServiceFromClassName(ExecutorService
}

private static int toPoolSize(Integer poolSize){
return poolSize != null ? poolSize :
return poolSize != null && poolSize != -1 ? poolSize :
Runtime.getRuntime().availableProcessors() * THREAD_POOL_SIZE_SCALE_FACTOR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.util;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @param <T> single chunk of data
*/
public class ChunkedJobDefinition<T, C, R> {

/**
* Dynamic holder for data chunks to process.
*/
private final Queue<T> dataChunks = new ConcurrentLinkedQueue<>();

/**
* If `false` no more data chunks are expected to be added. Thus, the processing thread is open to finish processing and return.
*/
private volatile boolean lastChunkRetrieved;

/**
* Lock to be acquired by the processing thread to indicate to other potential threads which want to process the same
* chunks of data that there is already a thread working on processing.
* Notice that some threads will process only some chunks of data (older chunks) and may leave some unprocessed chunks.
* Thus, each thread which brings a new chunk of data must check at the end that there is already some thread working on
* processing the data. In case no threads working on processing the data - the thread which added a chunk of data to `dataChunks`
* must acquire the lock and process all chunks which are left for processing.
*/
private final ReentrantLock processingLock = new ReentrantLock();

/**
* Optional context information which can be used to resume processing for the next chunk of data.
*/
private volatile C processedDataContext;

/**
* Final computed result
*/
private final CompletableFuture<R> result = new CompletableFuture<>();

public Queue<T> getDataChunks() {
return dataChunks;
}

public boolean isLastChunkRetrieved() {
return lastChunkRetrieved;
}

public void setLastChunkRetrieved() {
this.lastChunkRetrieved = true;
}

public ReentrantLock getProcessingLock() {
return processingLock;
}

public C getProcessedDataContext() {
return processedDataContext;
}

public void setProcessedDataContext(C processedDataContext) {
this.processedDataContext = processedDataContext;
}

public void complete(R resultData){
this.result.complete(resultData);
}

public CompletableFuture<R> getResult() {
return result;
}

}
Loading

0 comments on commit 02385f0

Please sign in to comment.