From b2c669b5ade8de9918da36b020ef67db4821f87b Mon Sep 17 00:00:00 2001 From: Oleksandr Porunov Date: Wed, 3 May 2023 04:35:43 +0100 Subject: [PATCH] Implement multi-key slice async queries for CQL storage backend Fixes #3759 Signed-off-by: Oleksandr Porunov --- docs/advanced-topics/executor-service.md | 21 ++- docs/changelog.md | 18 +-- docs/configs/janusgraph-cfg.md | 8 +- .../org/janusgraph/diskstorage/Backend.java | 13 +- .../diskstorage/BackendTransaction.java | 45 ++----- .../configuration/ExecutorServiceBuilder.java | 2 +- .../util/ChunkedJobDefinition.java | 89 +++++++++++++ .../util/EntryListComputationContext.java | 33 +++++ .../util/StaticArrayEntryList.java | 120 ++++++++++++++--- .../GraphDatabaseConfiguration.java | 4 +- .../diskstorage/cql/CQLConfigOptions.java | 38 +++--- .../cql/CQLKeyColumnValueStore.java | 68 +++++----- .../diskstorage/cql/CQLStoreManager.java | 49 ++++++- .../builder/CQLMutateManyFunctionBuilder.java | 39 ++---- .../cql/builder/CQLStoreFeaturesBuilder.java | 2 +- .../AbstractCQLMutateManyLoggedFunction.java | 71 ---------- ...AbstractCQLMutateManyUnloggedFunction.java | 97 -------------- ...ecutorServiceMutateManyLoggedFunction.java | 63 ++++++++- ...utorServiceMutateManyUnloggedFunction.java | 73 ++++++++++- .../CQLSimpleMutateManyLoggedFunction.java | 49 ------- .../CQLSimpleMutateManyUnloggedFunction.java | 67 ---------- .../slice/AbstractCQLSliceFunction.java | 65 ---------- .../function/slice/AsyncCQLSliceFunction.java | 122 ++++++++++++++++++ .../CQLExecutorServiceSliceFunction.java | 70 ---------- .../slice/CQLSimpleSliceFunction.java | 53 -------- .../cql/function/slice/CQLSliceFunction.java | 4 +- .../cql/util/PassAllQueryBackPressure.java | 27 ++++ .../cql/util/QueryBackPressure.java | 23 ++++ .../cql/util/SemaphoreQueryBackPressure.java | 43 ++++++ .../diskstorage/cql/CQLConfigTest.java | 3 - .../janusgraph/graphdb/cql/CQLGraphTest.java | 60 ++++++--- 31 files changed, 769 insertions(+), 670 deletions(-) create mode 100644 janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/ChunkedJobDefinition.java create mode 100644 janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryListComputationContext.java delete mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyLoggedFunction.java delete mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyUnloggedFunction.java delete mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyLoggedFunction.java delete mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyUnloggedFunction.java delete mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AbstractCQLSliceFunction.java create mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSliceFunction.java delete mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLExecutorServiceSliceFunction.java delete mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSimpleSliceFunction.java create mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/PassAllQueryBackPressure.java create mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/QueryBackPressure.java create mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/SemaphoreQueryBackPressure.java diff --git a/docs/advanced-topics/executor-service.md b/docs/advanced-topics/executor-service.md index ae761e89b0b..e64067531b8 100644 --- a/docs/advanced-topics/executor-service.md +++ b/docs/advanced-topics/executor-service.md @@ -1,11 +1,11 @@ # Backend ExecutorService -By default JanusGraph uses `ExecutorService` to process some queries in parallel -(see `storage.parallel-backend-ops` configuration option). +By default, JanusGraph uses `ExecutorService` to process some queries in parallel for storage backend implementations +which don't support multi-key queries (see `storage.parallel-backend-ops` configuration option). JanusGraph allows to configure the executor service which is used via configuration options provided in `storage.parallel-backend-executor-service` configuration section. -Currently JanusGraph has the next ExecutorService implementations which can be used (controlled via +Currently, JanusGraph has the next ExecutorService implementations which can be used (controlled via `storage.parallel-backend-executor-service.class`): * `fixed` - fixed thread pool size; @@ -30,18 +30,13 @@ are typically those configuration options which are provided via configurations # Cassandra backend ExecutorService Apart from the general executor service mentioned in the previous section, Cassandra backend uses an additional -`ExecutorService` to process CQL queries by default (see `storage.cql.executor-service.enabled` configuration option). +`ExecutorService` to process result deserialization for CQL queries by default (see `storage.cql.executor-service` configuration options). The rules by which the Cassandra backend `ExecutorService` is built are the same as the rules which are used to build parallel backend queries `ExecutorService` (described above). The only difference is that the configuration for Cassandra backend `ExecutorService` are provided via configuration -options under `storage.cql.executor-service`. -Disabling CQL executor service reduces overhead of thread pool but requires the user to tune maximum throughput thoroughly. -With disabled CQL executor service the parallelism will be controlled internally by the CQL driver via the next properties: -`storage.cql.max-requests-per-connection`, `storage.cql.local-max-connections-per-host`, `storage.cql.remote-max-connections-per-host`. - -!!! info - It is recommended to disable CQL executor service in a production environment and properly configure maximum throughput. - CQL executor service does not provide any benefits other than limiting the amount of parallel requests per JanusGraph instance. +options under `storage.cql.executor-service`. !!! warning - Improper tuning of maximum throughput might result in failures under heavy workloads. + By default, `storage.cql.executor-service` is configured to have a core pool size of number of processors multiplied + by 2. It's recommended to always use the default value unless there is a reason to artificially limit parallelism for + CQL slice query deserialization. diff --git a/docs/changelog.md b/docs/changelog.md index 244b1389154..08fecdd2f77 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -259,15 +259,15 @@ Notice that `janusgraph-cql` and `janusgraph-scylla` are mutually exclusive. Use provide both dependencies in the same classpath. See [ScyllaDB Storage Backend documentation](storage-backend/scylladb.md) for more information about how to make `scylla` `storage.backend` options available. -##### Disable CQL ExecutorService by default - -Previously CQL ExecutorService was enabled by default mostly for historical reasons. -CQL ExecutorService managed by JanusGraph adds additional overhead without bringing any usefulness other than -limiting amount of parallel queries which can (and should) be controlled via the underlying CQL driver. - -In case previous behaviour is desired then `storage.cql.executor-service.enabled` configuration option should be set to `true`, -but it's recommended to tune CQL queries parallelism using CQL driver configuration options (like `storage.cql.max-requests-per-connection`, -`storage.cql.local-max-connections-per-host`) and / or `storage.parallel-backend-ops.*` configuration options. +##### CQL ExecutorService purpose change + +Previously CQL ExecutorService was used to control parallelism of both CQL IO operations and results deserialization. +Starting from JanusGraph 1.0.0 CQL ExecutorService is now used for CQL results deserialization only. All CQL IO operations +are now using internal async approach. +The default pool size is now set to have a value of `number of cores multiplied by 2`. This ExecutorService is now +mandatory and cannot be disabled. The default ExecutorService core pool size is not recommended to be changed as +the default value is considered to be optimal unless users want to artificially limit parallelism of CQL results deserialization +jobs. ##### Removal of deprecated classes/methods/functionalities diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index 042c19c4398..188a070851e 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -413,6 +413,7 @@ CQL storage backend options | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | | storage.cql.atomic-batch-mutate | True to use Cassandra atomic batch mutation, false to use non-atomic batches | Boolean | false | MASKABLE | +| storage.cql.back-pressure-limit | The maximum number of concurrent requests which are allowed to be processed by CQL driver. Any concurrent CQL requests which are above the provided limit are going to be back pressured using fair Semaphore. If no value is provided or the value is set to `0` then the value will be calculated based on CQL driver session provided parameters by using formula [advanced.connection.max-requests-per-connection * advanced.connection.pool.local.size * available_nodes_amount]. It's not recommended to use any value which is above this limit because it may result in CQL driver overload but it's suggested to have a lower value to keep the driver healthy under pressure. In situations when remote nodes connections are in use then the bigger value might be relevant as well to improve parallelism. In case the value `-1` is provided then the back pressure for CQL requests is turned off. In case the back pressure is turned off then it is advised to tune CQL driver for the ongoing workload. | Integer | (no default value) | MASKABLE | | storage.cql.batch-statement-size | The number of statements in each batch | Integer | 20 | MASKABLE | | storage.cql.compaction-strategy-class | The compaction strategy to use for JanusGraph tables | String | (no default value) | FIXED | | storage.cql.compaction-strategy-options | Compaction strategy options. This list is interpreted as a map. It must have an even number of elements in [key,val,key,val,...] form. | String[] | (no default value) | FIXED | @@ -445,14 +446,13 @@ CQL storage backend options | storage.cql.write-consistency-level | The consistency level of write operations against Cassandra | String | QUORUM | MASKABLE | ### storage.cql.executor-service -Configuration options for CQL executor service which is used to process CQL queries. +Configuration options for CQL executor service which is used to process deserialization of CQL queries. | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | | storage.cql.executor-service.class | The implementation of `ExecutorService` to use. The full name of the class which extends `ExecutorService` which has either a public constructor with `ExecutorServiceConfiguration` argument (preferred constructor) or a public parameterless constructor. Other accepted options are: `fixed` - fixed thread pool of size `core-pool-size`; `cached` - cached thread pool; | String | fixed | LOCAL | -| storage.cql.executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 10 | LOCAL | -| storage.cql.executor-service.enabled | Whether to use CQL executor service to process queries or not. If not used, the parallelism will be controlled internally by the CQL driver via `storage.cql.max-requests-per-connection` parameter which may be preferable in production environments. Disabling executor service reduces overhead of thread pool but might be more difficult to tune. | Boolean | false | LOCAL | +| storage.cql.executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set or set to -1 the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL | | storage.cql.executor-service.keep-alive-time | Keep alive time in milliseconds for executor service. When the number of threads is greater than the `core-pool-size`, this is the maximum time that excess idle threads will wait for new tasks before terminating. Ignored for `fixed` executor service and may be ignored if custom executor service is used (depending on the implementation of the executor service). | Long | 60000 | LOCAL | | storage.cql.executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL | | storage.cql.executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL | @@ -590,7 +590,7 @@ Configuration options for executor service which is used for parallel requests w | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | | storage.parallel-backend-executor-service.class | The implementation of `ExecutorService` to use. The full name of the class which extends `ExecutorService` which has either a public constructor with `ExecutorServiceConfiguration` argument (preferred constructor) or a public parameterless constructor. Other accepted options are: `fixed` - fixed thread pool of size `core-pool-size`; `cached` - cached thread pool; | String | fixed | LOCAL | -| storage.parallel-backend-executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL | +| storage.parallel-backend-executor-service.core-pool-size | Core pool size for executor service. May be ignored if custom executor service is used (depending on the implementation of the executor service).If not set or set to -1 the core pool size will be equal to number of processors multiplied by 2. | Integer | (no default value) | LOCAL | | storage.parallel-backend-executor-service.keep-alive-time | Keep alive time in milliseconds for executor service. When the number of threads is greater than the `core-pool-size`, this is the maximum time that excess idle threads will wait for new tasks before terminating. Ignored for `fixed` executor service and may be ignored if custom executor service is used (depending on the implementation of the executor service). | Long | 60000 | LOCAL | | storage.parallel-backend-executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL | | storage.parallel-backend-executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL | diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java index 8c68331b1bc..c98220fb034 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java @@ -269,7 +269,18 @@ public Backend(Configuration configuration) { storeManagerLocking = storeManager; } - threadPool = configuration.get(PARALLEL_BACKEND_OPS) ? buildExecutorService(configuration) : null; + if(configuration.get(PARALLEL_BACKEND_OPS)){ + if(storeFeatures.hasMultiQuery()){ + log.info(storeManager.getName() + " supports multi-key queries. Thus, option {} is ignored in favor of multi-key queries. " + + "Backend-ops executor pool will not be created for this storage backend."); + threadPool = null; + } else { + threadPool = buildExecutorService(configuration); + } + } else { + threadPool = null; + } + threadPoolShutdownMaxWaitTime = configuration.get(PARALLEL_BACKEND_EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME); final String lockBackendName = configuration.get(LOCK_BACKEND); diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java index 4e9623c9762..9041388b4d2 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java @@ -318,8 +318,17 @@ public String toString() { final AtomicInteger failureCount = new AtomicInteger(0); EntryList[] resultArray = new EntryList[keys.size()]; for (int i = 0; i < keys.size(); i++) { - threadPool.execute(new SliceQueryRunner(new KeySliceQuery(keys.get(i), query), - doneSignal, failureCount, resultArray, i)); + final int pos = i; + threadPool.execute(() -> { + try { + resultArray[pos] = edgeStoreQuery(new KeySliceQuery(keys.get(pos), query)); + } catch (Exception e) { + failureCount.incrementAndGet(); + log.warn("Individual query in multi-transaction failed: ", e); + } finally { + doneSignal.countDown(); + } + }); } try { doneSignal.await(); @@ -338,38 +347,6 @@ public String toString() { } } - private class SliceQueryRunner implements Runnable { - - final KeySliceQuery kq; - final CountDownLatch doneSignal; - final AtomicInteger failureCount; - final Object[] resultArray; - final int resultPosition; - - private SliceQueryRunner(KeySliceQuery kq, CountDownLatch doneSignal, AtomicInteger failureCount, - Object[] resultArray, int resultPosition) { - this.kq = kq; - this.doneSignal = doneSignal; - this.failureCount = failureCount; - this.resultArray = resultArray; - this.resultPosition = resultPosition; - } - - @Override - public void run() { - try { - List result; - result = edgeStoreQuery(kq); - resultArray[resultPosition] = result; - } catch (Exception e) { - failureCount.incrementAndGet(); - log.warn("Individual query in multi-transaction failed: ", e); - } finally { - doneSignal.countDown(); - } - } - } - public KeyIterator edgeStoreKeys(final SliceQuery sliceQuery) { if (!storeFeatures.hasScan()) throw new UnsupportedOperationException("The configured storage backend does not support global graph operations - use Faunus instead"); diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/configuration/ExecutorServiceBuilder.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/configuration/ExecutorServiceBuilder.java index 58ce661d82c..ee491346d79 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/configuration/ExecutorServiceBuilder.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/configuration/ExecutorServiceBuilder.java @@ -149,7 +149,7 @@ private static ExecutorService buildExecutorServiceFromClassName(ExecutorService } private static int toPoolSize(Integer poolSize){ - return poolSize != null ? poolSize : + return poolSize != null && poolSize != -1 ? poolSize : Runtime.getRuntime().availableProcessors() * THREAD_POOL_SIZE_SCALE_FACTOR; } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/ChunkedJobDefinition.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/ChunkedJobDefinition.java new file mode 100644 index 00000000000..88b46301023 --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/ChunkedJobDefinition.java @@ -0,0 +1,89 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.util; + +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; + +/** + * + * @param single chunk of data + */ +public class ChunkedJobDefinition { + + /** + * Dynamic holder for data chunks to process. + */ + private final Queue dataChunks = new ConcurrentLinkedQueue<>(); + + /** + * If `false` no more data chunks are expected to be added. Thus, the processing thread is open to finish processing and return. + */ + private volatile boolean lastChunkRetrieved; + + /** + * Lock to be acquired by the processing thread to indicate to other potential threads which want to process the same + * chunks of data that there is already a thread working on processing. + * Notice that some threads will process only some chunks of data (older chunks) and may leave some unprocessed chunks. + * Thus, each thread which brings a new chunk of data must check at the end that there is already some thread working on + * processing the data. In case no threads working on processing the data - the thread which added a chunk of data to `dataChunks` + * must acquire the lock and process all chunks which are left for processing. + */ + private final ReentrantLock processingLock = new ReentrantLock(); + + /** + * Optional context information which can be used to resume processing for the next chunk of data. + */ + private volatile C processedDataContext; + + /** + * Final computed result + */ + private final CompletableFuture result = new CompletableFuture<>(); + + public Queue getDataChunks() { + return dataChunks; + } + + public boolean isLastChunkRetrieved() { + return lastChunkRetrieved; + } + + public void setLastChunkRetrieved() { + this.lastChunkRetrieved = true; + } + + public ReentrantLock getProcessingLock() { + return processingLock; + } + + public C getProcessedDataContext() { + return processedDataContext; + } + + public void setProcessedDataContext(C processedDataContext) { + this.processedDataContext = processedDataContext; + } + + public void complete(R resultData){ + this.result.complete(resultData); + } + + public CompletableFuture getResult() { + return result; + } +} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryListComputationContext.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryListComputationContext.java new file mode 100644 index 00000000000..72b06f3bdfd --- /dev/null +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/EntryListComputationContext.java @@ -0,0 +1,33 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.util; + +import org.janusgraph.diskstorage.EntryMetaData; + +public class EntryListComputationContext { + public long[] limitAndValuePos; + public byte[] data; + public EntryMetaData[] metadataSchema; + public int pos; + public int offset; + + public EntryListComputationContext(long[] limitAndValuePos, byte[] data, EntryMetaData[] metadataSchema, int pos, int offset) { + this.limitAndValuePos = limitAndValuePos; + this.data = data; + this.metadataSchema = metadataSchema; + this.pos = pos; + this.offset = offset; + } +} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java index 4188e093564..2199e7b8fb7 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java @@ -26,9 +26,12 @@ import java.nio.ByteBuffer; import java.util.AbstractList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ExecutorService; import static org.janusgraph.diskstorage.util.ArrayUtil.growSpace; @@ -398,11 +401,89 @@ private static EntryList of(Iterable elements, StaticArrayEntry.GetColV private static EntryList of(Iterator elements, StaticArrayEntry.GetColVal getter, StaticArrayEntry.DataHandler dataHandler) { Preconditions.checkArgument(elements!=null && getter!=null && dataHandler!=null); if (!elements.hasNext()) return EMPTY_LIST; + + EntryListComputationContext context = generateComputationContext(); + applyElementsComputation(elements, getter, dataHandler, context); + return convert(context); + } + + public static EntryListComputationContext generateComputationContext(){ long[] limitAndValuePos = new long[10]; - byte[] data = new byte[limitAndValuePos.length*15]; - EntryMetaData[] metadataSchema = null; - int pos=0; - int offset=0; + return new EntryListComputationContext(limitAndValuePos, new byte[limitAndValuePos.length*15], null, 0, 0); + } + + public static void supplyEntryList(ChunkedJobDefinition, EntryListComputationContext, EntryList> chunkedJobDefinition, + StaticArrayEntry.GetColVal getter, + ExecutorService executorService) { + supplyEntryList(chunkedJobDefinition, getter, StaticArrayEntry.StaticBufferHandler.INSTANCE, executorService); + } + + private static void supplyEntryList(ChunkedJobDefinition, EntryListComputationContext, EntryList> chunkedJobDefinition, + StaticArrayEntry.GetColVal getter, + StaticArrayEntry.DataHandler dataHandler, + ExecutorService executorService){ + assert chunkedJobDefinition !=null && getter!=null && dataHandler!=null; + + executorService.execute(() -> { + + if(!chunkedJobDefinition.getProcessingLock().tryLock() || chunkedJobDefinition.getResult().isDone()){ + return; + } + + Queue> chunksQueue = chunkedJobDefinition.getDataChunks(); + Iterator elements = chunksQueue.isEmpty() ? Collections.emptyIterator() : chunksQueue.remove(); + + EntryListComputationContext context = chunkedJobDefinition.getProcessedDataContext(); + + if(context == null){ + if(chunkedJobDefinition.isLastChunkRetrieved() && chunksQueue.isEmpty() && !elements.hasNext()){ + chunkedJobDefinition.complete(EMPTY_LIST); + return; + } + context = generateComputationContext(); + chunkedJobDefinition.setProcessedDataContext(context); + } + + try { + + do { + if(elements.hasNext()){ + applyElementsComputation(elements, getter, dataHandler, context); + } + if(chunksQueue.isEmpty()){ + break; + } + elements = chunksQueue.remove(); + } while (true); + + if(chunkedJobDefinition.isLastChunkRetrieved() && chunksQueue.isEmpty()){ + if(context.metadataSchema == null){ + chunkedJobDefinition.complete(EMPTY_LIST); + } else { + chunkedJobDefinition.complete(convert(context)); + } + } + + } catch (Throwable throwable){ + chunkedJobDefinition.getResult().completeExceptionally(throwable); + return; + } finally { + chunkedJobDefinition.getProcessingLock().unlock(); + } + + if(!chunksQueue.isEmpty() || chunkedJobDefinition.isLastChunkRetrieved() && !chunkedJobDefinition.getResult().isDone()){ + supplyEntryList(chunkedJobDefinition, getter, dataHandler, executorService); + } + }); + } + + private static void applyElementsComputation(Iterator elements, StaticArrayEntry.GetColVal getter, + StaticArrayEntry.DataHandler dataHandler, EntryListComputationContext context){ + long[] limitAndValuePos = context.limitAndValuePos; + byte[] data = context.data; + EntryMetaData[] metadataSchema = context.metadataSchema; + int pos=context.pos; + int offset=context.offset; while (elements.hasNext()) { E element = elements.next(); if (element==null) throw new IllegalArgumentException("Unexpected null element in result set"); @@ -429,23 +510,32 @@ private static EntryList of(Iterator elements, StaticArrayEntry.GetColV pos++; } assert offset<=data.length; - if (data.length > offset + (offset >> 1)) { + + context.limitAndValuePos = limitAndValuePos; + context.data = data; + context.metadataSchema = metadataSchema; + context.pos = pos; + context.offset = offset; + } + + private static StaticArrayEntryList convert(EntryListComputationContext context){ + if (context.data.length > context.offset + (context.offset >> 1)) { // Resize to preserve memory. This happens when either of the following conditions is true: // 1) current memory space is 1.5x more than minimum required space // 2) 1.5 x minimum required space will overflow, in which case the wasted memory space is likely still considerable - byte[] newData = new byte[offset]; - System.arraycopy(data,0,newData,0,offset); - data=newData; + byte[] newData = new byte[context.offset]; + System.arraycopy(context.data,0,newData,0,context.offset); + context.data=newData; } - if (pos PARALLEL_BACKEND_EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME = new ConfigOption<>( PARALLEL_BACKEND_EXECUTOR_SERVICE, diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java index caf54c2ce7e..ad39355ab00 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java @@ -18,6 +18,7 @@ import org.janusgraph.diskstorage.configuration.ConfigElement; import org.janusgraph.diskstorage.configuration.ConfigNamespace; import org.janusgraph.diskstorage.configuration.ConfigOption; +import org.janusgraph.diskstorage.configuration.ExecutorServiceBuilder; import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions; @@ -200,6 +201,22 @@ public interface CQLConfigOptions { ConfigOption.Type.MASKABLE, 1024); + ConfigOption BACK_PRESSURE_LIMIT = new ConfigOption<>( + CQL_NS, + "back-pressure-limit", + "The maximum number of concurrent requests which are allowed to be processed by CQL driver. " + + "Any concurrent CQL requests which are above the provided limit are going to be back pressured using fair Semaphore. " + + "If no value is provided or the value is set to `0` then the value will be calculated based on CQL driver " + + "session provided parameters by using formula ["+DefaultDriverOption.CONNECTION_MAX_REQUESTS.getPath()+" * " + +DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE.getPath()+" * available_nodes_amount]. " + + "It's not recommended to use any value which is above this limit because it may result in CQL driver overload " + + "but it's suggested to have a lower value to keep the driver healthy under pressure. In situations when remote " + + "nodes connections are in use then the bigger value might be relevant as well to improve parallelism. " + + "In case the value `-1` is provided then the back pressure for CQL requests is turned off. In case the back pressure " + + "is turned off then it is advised to tune CQL driver for the ongoing workload.", + ConfigOption.Type.MASKABLE, + Integer.class); + ConfigOption HEARTBEAT_INTERVAL = new ConfigOption<>( CQL_NS, "heartbeat-interval", @@ -601,27 +618,14 @@ public interface CQLConfigOptions { ConfigNamespace EXECUTOR_SERVICE = new ConfigNamespace( CQL_NS, "executor-service", - "Configuration options for CQL executor service which is used to process CQL queries."); - - ConfigOption EXECUTOR_SERVICE_ENABLED = new ConfigOption<>( - EXECUTOR_SERVICE, - "enabled", - "Whether to use CQL executor service to process queries or not. If not used, the parallelism will be " + - "controlled internally by the CQL driver via `"+MAX_REQUESTS_PER_CONNECTION.toStringWithoutRoot()+"` parameter " + - "which may be preferable in production environments. " + - "Disabling executor service reduces overhead of thread pool but might be more difficult to tune.", - ConfigOption.Type.LOCAL, - Boolean.class, - false); + "Configuration options for CQL executor service which is used to process deserialization of CQL queries."); ConfigOption EXECUTOR_SERVICE_CORE_POOL_SIZE = new ConfigOption<>( EXECUTOR_SERVICE, GraphDatabaseConfiguration.PARALLEL_BACKEND_EXECUTOR_SERVICE_CORE_POOL_SIZE.getName(), - "Core pool size for executor service. May be ignored if custom executor service is used " + - "(depending on the implementation of the executor service).", + GraphDatabaseConfiguration.PARALLEL_BACKEND_EXECUTOR_SERVICE_CORE_POOL_SIZE.getDescription(), ConfigOption.Type.LOCAL, - Integer.class, - 10); + Integer.class); ConfigOption EXECUTOR_SERVICE_MAX_POOL_SIZE = new ConfigOption<>( EXECUTOR_SERVICE, @@ -645,7 +649,7 @@ public interface CQLConfigOptions { GraphDatabaseConfiguration.PARALLEL_BACKEND_EXECUTOR_SERVICE_CLASS.getDescription(), ConfigOption.Type.LOCAL, String.class, - "fixed"); + ExecutorServiceBuilder.FIXED_THREAD_POOL_CLASS); ConfigOption EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME = new ConfigOption<>( EXECUTOR_SERVICE, diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java index 0cfdb545267..6719f39a965 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java @@ -34,8 +34,6 @@ import com.datastax.oss.driver.api.querybuilder.schema.compaction.CompactionStrategy; import com.datastax.oss.driver.api.querybuilder.select.Select; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; -import io.vavr.Tuple; -import io.vavr.Tuple3; import io.vavr.collection.Array; import io.vavr.collection.Iterator; import io.vavr.control.Try; @@ -48,9 +46,9 @@ import org.janusgraph.diskstorage.StaticBuffer; import org.janusgraph.diskstorage.TemporaryBackendException; import org.janusgraph.diskstorage.configuration.Configuration; -import org.janusgraph.diskstorage.cql.function.slice.CQLExecutorServiceSliceFunction; -import org.janusgraph.diskstorage.cql.function.slice.CQLSimpleSliceFunction; +import org.janusgraph.diskstorage.cql.function.slice.AsyncCQLSliceFunction; import org.janusgraph.diskstorage.cql.function.slice.CQLSliceFunction; +import org.janusgraph.diskstorage.cql.util.QueryBackPressure; import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; @@ -60,14 +58,12 @@ import org.janusgraph.diskstorage.keycolumnvalue.MultiSlicesQuery; import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; -import org.janusgraph.diskstorage.util.RecordIterator; -import org.janusgraph.diskstorage.util.StaticArrayBuffer; -import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.Function; @@ -247,13 +243,9 @@ public CQLKeyColumnValueStore(final CQLStoreManager storeManager, final String t this.insertColumnWithTTL = null; } - Optional executorService = this.storeManager.getExecutorService(); - - if(executorService.isPresent()){ - cqlSliceFunction = new CQLExecutorServiceSliceFunction(session, getSlice, getter, executorService.get()); - } else { - cqlSliceFunction = new CQLSimpleSliceFunction(session, getSlice, getter); - } + ExecutorService executorService = this.storeManager.getExecutorService(); + QueryBackPressure queriesBackPressure = storeManager.getQueriesBackPressure(); + cqlSliceFunction = new AsyncCQLSliceFunction(session, getSlice, getter, executorService, queriesBackPressure); // @formatter:on } @@ -388,40 +380,42 @@ public String getName() { @Override public EntryList getSlice(final KeySliceQuery query, final StoreTransaction txh) throws BackendException { - return cqlSliceFunction.getSlice(query, txh); + try { + return cqlSliceFunction.getSlice(query, txh).get(); + } catch (Throwable throwable) { + throw EXCEPTION_MAPPER.apply(throwable); + } } @Override public Map getSlice(final List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { - throw new UnsupportedOperationException("The CQL backend does not support multi-key queries"); - } - - public static class CQLResultSetIterator implements RecordIterator> { - private java.util.Iterator resultSetIterator; + Map> futureResult = new HashMap<>(keys.size()); - public CQLResultSetIterator(ResultSet rs) { - resultSetIterator = rs.iterator(); + for(StaticBuffer key : keys){ + futureResult.put(key, cqlSliceFunction.getSlice(new KeySliceQuery(key, query), txh)); } - @Override - public boolean hasNext() { - return resultSetIterator.hasNext(); + Map result = new HashMap<>(keys.size()); + + Throwable firstException = null; + for(Map.Entry> entry : futureResult.entrySet()){ + try{ + result.put(entry.getKey(), entry.getValue().get()); + } catch (Throwable throwable){ + if(firstException == null){ + firstException = throwable; + } else { + firstException.addSuppressed(throwable); + } + } } - @Override - public Tuple3 next() { - Row nextRow = resultSetIterator.next(); - return nextRow == null - ? null - : Tuple.of(StaticArrayBuffer.of(nextRow.getByteBuffer(COLUMN_COLUMN_NAME)), - StaticArrayBuffer.of(nextRow.getByteBuffer(VALUE_COLUMN_NAME)), nextRow); + if(firstException != null){ + throw EXCEPTION_MAPPER.apply(firstException); } - @Override - public void close() throws IOException { - // NOP - } + return result; } public BatchableStatement deleteColumn(final StaticBuffer key, final StaticBuffer column) { diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java index b635fe549be..f3aa460f1bf 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java @@ -16,10 +16,13 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import io.vavr.Tuple; import io.vavr.collection.Array; import io.vavr.collection.HashMap; @@ -40,6 +43,9 @@ import org.janusgraph.diskstorage.cql.builder.CQLStoreFeaturesBuilder; import org.janusgraph.diskstorage.cql.builder.CQLStoreFeaturesWrapper; import org.janusgraph.diskstorage.cql.function.mutate.CQLMutateManyFunction; +import org.janusgraph.diskstorage.cql.util.PassAllQueryBackPressure; +import org.janusgraph.diskstorage.cql.util.QueryBackPressure; +import org.janusgraph.diskstorage.cql.util.SemaphoreQueryBackPressure; import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; @@ -54,9 +60,9 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.truncate; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createKeyspace; @@ -64,6 +70,7 @@ import static io.vavr.API.$; import static io.vavr.API.Case; import static io.vavr.API.Match; +import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BACK_PRESSURE_LIMIT; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYSPACE; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.NETWORK_TOPOLOGY_REPLICATION_STRATEGY; @@ -107,6 +114,7 @@ public class CQLStoreManager extends DistributedStoreManager implements KeyColum private final StoreFeatures storeFeatures; private final Map openStores; private final Deployment deployment; + private final QueryBackPressure queriesBackPressure; /** * Constructor for the {@link CQLStoreManager} given a JanusGraph {@link Configuration}. @@ -135,7 +143,7 @@ public CQLStoreManager(final Configuration configuration, final CQLMutateManyFun this.keyspace = determineKeyspaceName(configuration); this.openStores = new ConcurrentHashMap<>(); this.session = sessionBuilder.build(getStorageConfig(), hostnames, port, connectionTimeoutMS, baseConfigurationLoaderBuilder); - + session.getMetadata().getNodes().size(); try{ this.threadPoolShutdownMaxWaitTime = configuration.get(EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME); @@ -143,8 +151,15 @@ public CQLStoreManager(final Configuration configuration, final CQLMutateManyFun initializeJmxMetrics(); initializeKeyspace(); + int backPressureLimit = getBackPressureLimit(configuration, session); + if(backPressureLimit == -1){ + queriesBackPressure = new PassAllQueryBackPressure(); + } else { + queriesBackPressure = new SemaphoreQueryBackPressure(new Semaphore(backPressureLimit, true)); + } + CQLMutateManyFunctionWrapper mutateManyFunctionWrapper = mutateManyFunctionBuilder - .build(session, configuration, times, assignTimestamp, openStores, this::sleepAfterWrite); + .build(session, configuration, times, assignTimestamp, openStores, this::sleepAfterWrite, queriesBackPressure); this.executorService = mutateManyFunctionWrapper.getExecutorService(); this.executeManyFunction = mutateManyFunctionWrapper.getMutateManyFunction(); @@ -158,6 +173,27 @@ public CQLStoreManager(final Configuration configuration, final CQLMutateManyFun } } + private static int getBackPressureLimit(final Configuration configuration, final CqlSession session){ + if(configuration.has(BACK_PRESSURE_LIMIT)){ + final int backPressureLimit = configuration.get(BACK_PRESSURE_LIMIT); + if(backPressureLimit == 0){ + return getDefaultBackPressureLimit(session); + } + Preconditions.checkArgument(backPressureLimit >= -1, BACK_PRESSURE_LIMIT.toStringWithoutRoot() + +" must be at -1 (to disable back_pressure), 0 (to calculate back_pressure), " + + "or a positive number to set CQL requests back pressure limit."); + return backPressureLimit; + } + return getDefaultBackPressureLimit(session); + } + + private static int getDefaultBackPressureLimit(final CqlSession session){ + DriverExecutionProfile profile = session.getContext().getConfig().getDefaultProfile(); + return profile.getInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS) * + profile.getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE) * + session.getMetadata().getNodes().size(); + } + private void initializeJmxMetrics() { final Configuration configuration = getStorageConfig(); if (configuration.get(METRICS_JMX_ENABLED) && configuration.get(BASIC_METRICS) && session.getMetrics().isPresent()) { @@ -196,8 +232,8 @@ void initializeKeyspace(){ .build()); } - Optional getExecutorService() { - return Optional.ofNullable(executorService); + ExecutorService getExecutorService() { + return executorService; } CqlSession getSession() { @@ -315,4 +351,7 @@ public Object getHadoopManager() { return new CqlHadoopStoreManager(this.session); } + public QueryBackPressure getQueriesBackPressure() { + return queriesBackPressure; + } } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLMutateManyFunctionBuilder.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLMutateManyFunctionBuilder.java index f7278c8d7d6..4a2f7604e59 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLMutateManyFunctionBuilder.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLMutateManyFunctionBuilder.java @@ -26,8 +26,7 @@ import org.janusgraph.diskstorage.cql.function.mutate.CQLExecutorServiceMutateManyLoggedFunction; import org.janusgraph.diskstorage.cql.function.mutate.CQLExecutorServiceMutateManyUnloggedFunction; import org.janusgraph.diskstorage.cql.function.mutate.CQLMutateManyFunction; -import org.janusgraph.diskstorage.cql.function.mutate.CQLSimpleMutateManyLoggedFunction; -import org.janusgraph.diskstorage.cql.function.mutate.CQLSimpleMutateManyUnloggedFunction; +import org.janusgraph.diskstorage.cql.util.QueryBackPressure; import org.janusgraph.diskstorage.util.time.TimestampProvider; import java.util.Map; @@ -39,7 +38,6 @@ import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BATCH_STATEMENT_SIZE; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_CLASS; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_CORE_POOL_SIZE; -import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_ENABLED; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_KEEP_ALIVE_TIME; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_MAX_POOL_SIZE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.BASIC_METRICS; @@ -52,37 +50,26 @@ public class CQLMutateManyFunctionBuilder { public CQLMutateManyFunctionWrapper build(final CqlSession session, final Configuration configuration, final TimestampProvider times, final boolean assignTimestamp, final Map openStores, - ConsumerWithBackendException sleepAfterWriteFunction) { + ConsumerWithBackendException sleepAfterWriteFunction, + QueryBackPressure queriesBackPressure) { - ExecutorService executorService; - CQLMutateManyFunction mutateManyFunction; + ExecutorService executorService = buildExecutorService(configuration); int batchSize = configuration.get(BATCH_STATEMENT_SIZE); boolean atomicBatch = configuration.get(ATOMIC_BATCH_MUTATE); - if (configuration.get(EXECUTOR_SERVICE_ENABLED)) { - executorService = buildExecutorService(configuration); - try { - if (atomicBatch) { - mutateManyFunction = new CQLExecutorServiceMutateManyLoggedFunction(times, - assignTimestamp, openStores, session, executorService, sleepAfterWriteFunction); - } else { - mutateManyFunction = new CQLExecutorServiceMutateManyUnloggedFunction(batchSize, - session, openStores, times, executorService, assignTimestamp, sleepAfterWriteFunction); - } - } catch (RuntimeException e) { - executorService.shutdown(); - throw e; - } - } else { - executorService = null; + CQLMutateManyFunction mutateManyFunction; + try { if (atomicBatch) { - mutateManyFunction = new CQLSimpleMutateManyLoggedFunction(times, - assignTimestamp, openStores, session, sleepAfterWriteFunction); + mutateManyFunction = new CQLExecutorServiceMutateManyLoggedFunction(times, + assignTimestamp, openStores, session, executorService, sleepAfterWriteFunction, queriesBackPressure); } else { - mutateManyFunction = new CQLSimpleMutateManyUnloggedFunction(batchSize, - session, openStores, times, assignTimestamp, sleepAfterWriteFunction); + mutateManyFunction = new CQLExecutorServiceMutateManyUnloggedFunction(batchSize, + session, openStores, times, executorService, assignTimestamp, sleepAfterWriteFunction, queriesBackPressure); } + } catch (RuntimeException e) { + executorService.shutdown(); + throw e; } return new CQLMutateManyFunctionWrapper(executorService, mutateManyFunction); diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java index 5f94877b89e..a49d175f1c8 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java @@ -61,7 +61,7 @@ public CQLStoreFeaturesWrapper build(final CqlSession session, final Configurati fb.keyConsistent((onlyUseLocalConsistency ? local : global), local); fb.locking(useExternalLocking); fb.optimisticLocking(true); - fb.multiQuery(false); + fb.multiQuery(true); if (!configuration.get(TTL_ENABLED)) { fb.cellTTL(false).storeTTL(false); diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyLoggedFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyLoggedFunction.java deleted file mode 100644 index e7b4ed76893..00000000000 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyLoggedFunction.java +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2021 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.cql.function.mutate; - -import com.datastax.oss.driver.api.core.cql.BatchStatement; -import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; -import com.datastax.oss.driver.api.core.cql.DefaultBatchType; -import org.janusgraph.diskstorage.BackendException; -import org.janusgraph.diskstorage.StaticBuffer; -import org.janusgraph.diskstorage.common.DistributedStoreManager; -import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; -import org.janusgraph.diskstorage.cql.function.ConsumerWithBackendException; -import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; -import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; -import org.janusgraph.diskstorage.util.time.TimestampProvider; - -import java.util.Map; - -import static org.janusgraph.diskstorage.cql.CQLTransaction.getTransaction; - -public abstract class AbstractCQLMutateManyLoggedFunction extends AbstractCQLMutateManyFunction implements CQLMutateManyFunction { - - - public AbstractCQLMutateManyLoggedFunction(TimestampProvider times, boolean assignTimestamp, - Map openStores, - ConsumerWithBackendException sleepAfterWriteFunction) { - super(sleepAfterWriteFunction, assignTimestamp, times, openStores); - } - - // Use a single logged batch - @Override - public void mutateMany(Map> mutations, StoreTransaction txh) throws BackendException { - - final DistributedStoreManager.MaskedTimestamp commitTime = createMaskedTimestampFunction.apply(txh); - - BatchStatementBuilder builder = BatchStatement.builder(DefaultBatchType.LOGGED); - builder.setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel()); - - mutations.forEach((tableName, tableMutations) -> { - final CQLKeyColumnValueStore columnValueStore = getColumnValueStore(tableName); - tableMutations.forEach((key, keyMutations) -> { - deletionsFunction - .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key) - .forEach(builder::addStatement); - additionsFunction - .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key) - .forEach(builder::addStatement); - }); - }); - - execute(builder.build()); - - sleepAfterWriteFunction.accept(commitTime); - - } - - protected abstract void execute(BatchStatement batchStatement) throws BackendException; - -} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyUnloggedFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyUnloggedFunction.java deleted file mode 100644 index 7a7a49c16d0..00000000000 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyUnloggedFunction.java +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2021 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.cql.function.mutate; - -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.BatchStatement; -import com.datastax.oss.driver.api.core.cql.BatchableStatement; -import com.datastax.oss.driver.api.core.cql.BoundStatement; -import com.datastax.oss.driver.api.core.cql.DefaultBatchType; -import io.vavr.collection.Iterator; -import io.vavr.collection.Seq; -import org.janusgraph.diskstorage.BackendException; -import org.janusgraph.diskstorage.StaticBuffer; -import org.janusgraph.diskstorage.common.DistributedStoreManager; -import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; -import org.janusgraph.diskstorage.cql.function.ConsumerWithBackendException; -import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; -import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; -import org.janusgraph.diskstorage.util.time.TimestampProvider; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.EXCEPTION_MAPPER; -import static org.janusgraph.diskstorage.cql.CQLTransaction.getTransaction; - -public abstract class AbstractCQLMutateManyUnloggedFunction extends AbstractCQLMutateManyFunction implements CQLMutateManyFunction { - - private final CqlSession session; - private final int batchSize; - - protected AbstractCQLMutateManyUnloggedFunction(TimestampProvider times, boolean assignTimestamp, - CqlSession session, Map openStores, - int batchSize, - ConsumerWithBackendException sleepAfterWriteFunction) { - super(sleepAfterWriteFunction, assignTimestamp, times, openStores); - this.session = session; - this.batchSize = batchSize; - } - - // Create an async un-logged batch per partition key - @Override - public void mutateMany(final Map> mutations, final StoreTransaction txh) throws BackendException { - - final DistributedStoreManager.MaskedTimestamp commitTime = createMaskedTimestampFunction.apply(txh); - - Optional errorAfterExecution = mutate(commitTime, mutations, txh); - - if (errorAfterExecution.isPresent()) { - throw EXCEPTION_MAPPER.apply(errorAfterExecution.get()); - } - - sleepAfterWriteFunction.accept(commitTime); - } - - protected CompletableFuture execAsyncUnlogged(Seq> group, StoreTransaction txh){ - return this.session.executeAsync( - BatchStatement.newInstance(DefaultBatchType.UNLOGGED) - .addAll(group) - .setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel()) - ).toCompletableFuture(); - } - - protected Iterator>> toGroupedBatchableStatementsSequenceIterator( - final DistributedStoreManager.MaskedTimestamp commitTime, - final KCVMutation keyMutations, - final CQLKeyColumnValueStore columnValueStore, - final StaticBuffer key){ - - Iterator> deletions = deletionsFunction - .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key); - Iterator> additions = additionsFunction - .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key); - - return Iterator.concat(deletions, additions) - .grouped(this.batchSize); - } - - protected abstract Optional mutate(final DistributedStoreManager.MaskedTimestamp commitTime, - final Map> mutations, - final StoreTransaction txh); - -} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyLoggedFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyLoggedFunction.java index cf8c5e91446..2d699d66a8f 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyLoggedFunction.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyLoggedFunction.java @@ -17,42 +17,91 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; +import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import io.vavr.concurrent.Future; import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.StaticBuffer; import org.janusgraph.diskstorage.common.DistributedStoreManager; import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; import org.janusgraph.diskstorage.cql.function.ConsumerWithBackendException; +import org.janusgraph.diskstorage.cql.util.QueryBackPressure; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; import org.janusgraph.diskstorage.util.time.TimestampProvider; import java.util.Map; import java.util.concurrent.ExecutorService; import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.EXCEPTION_MAPPER; +import static org.janusgraph.diskstorage.cql.CQLTransaction.getTransaction; -public class CQLExecutorServiceMutateManyLoggedFunction extends AbstractCQLMutateManyLoggedFunction{ +public class CQLExecutorServiceMutateManyLoggedFunction extends AbstractCQLMutateManyFunction implements CQLMutateManyFunction { private final CqlSession session; private final ExecutorService executorService; + private final QueryBackPressure queryBackPressure; + public CQLExecutorServiceMutateManyLoggedFunction(TimestampProvider times, boolean assignTimestamp, Map openStores, CqlSession session, ExecutorService executorService, - ConsumerWithBackendException sleepAfterWriteFunction) { - super(times, assignTimestamp, openStores, sleepAfterWriteFunction); + ConsumerWithBackendException sleepAfterWriteFunction, + QueryBackPressure queryBackPressure) { + super(sleepAfterWriteFunction, assignTimestamp, times, openStores); this.session = session; this.executorService = executorService; + this.queryBackPressure = queryBackPressure; } + // Use a single logged batch @Override - protected void execute(BatchStatement batchStatement) throws BackendException { - final Future result = Future.fromJavaFuture(executorService, - session.executeAsync(batchStatement).toCompletableFuture()); + public void mutateMany(Map> mutations, StoreTransaction txh) throws BackendException { + + final DistributedStoreManager.MaskedTimestamp commitTime = createMaskedTimestampFunction.apply(txh); + + BatchStatementBuilder builder = BatchStatement.builder(DefaultBatchType.LOGGED); + builder.setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel()); + + mutations.forEach((tableName, tableMutations) -> { + final CQLKeyColumnValueStore columnValueStore = getColumnValueStore(tableName); + tableMutations.forEach((key, keyMutations) -> { + deletionsFunction + .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key) + .forEach(builder::addStatement); + additionsFunction + .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key) + .forEach(builder::addStatement); + }); + }); + + execute(builder.build()); + + sleepAfterWriteFunction.accept(commitTime); + + } + + private void execute(BatchStatement batchStatement) throws BackendException { + + queryBackPressure.acquireBeforeQuery(); + final Future result; + + try{ + result = Future.fromJavaFuture(executorService, + session.executeAsync(batchStatement) + .whenComplete((asyncResultSet, throwable) -> queryBackPressure.releaseAfterQuery()) + .toCompletableFuture()); + } catch (RuntimeException e){ + queryBackPressure.releaseAfterQuery(); + throw e; + } result.await(); - if (result.isFailure()) { + if(result.isFailure()){ throw EXCEPTION_MAPPER.apply(result.getCause().get()); } } + } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyUnloggedFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyUnloggedFunction.java index 04957d85ffa..f6144232f5c 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyUnloggedFunction.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLExecutorServiceMutateManyUnloggedFunction.java @@ -16,34 +16,96 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import io.vavr.collection.Iterator; import io.vavr.collection.Seq; import io.vavr.concurrent.Future; +import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.StaticBuffer; import org.janusgraph.diskstorage.common.DistributedStoreManager; import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; import org.janusgraph.diskstorage.cql.function.ConsumerWithBackendException; +import org.janusgraph.diskstorage.cql.util.QueryBackPressure; import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; import org.janusgraph.diskstorage.util.time.TimestampProvider; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -public class CQLExecutorServiceMutateManyUnloggedFunction extends AbstractCQLMutateManyUnloggedFunction{ +import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.EXCEPTION_MAPPER; +import static org.janusgraph.diskstorage.cql.CQLTransaction.getTransaction; +public class CQLExecutorServiceMutateManyUnloggedFunction extends AbstractCQLMutateManyFunction implements CQLMutateManyFunction { + + private final CqlSession session; + private final int batchSize; private final ExecutorService executorService; + private final QueryBackPressure queryBackPressure; public CQLExecutorServiceMutateManyUnloggedFunction(int batchSize, CqlSession session, Map openStores, - TimestampProvider times, ExecutorService executorService, boolean assignTimestamp, - ConsumerWithBackendException sleepAfterWriteFunction) { - super(times, assignTimestamp, session, openStores, batchSize, sleepAfterWriteFunction); + TimestampProvider times, ExecutorService executorService, boolean assignTimestamp, + ConsumerWithBackendException sleepAfterWriteFunction, + QueryBackPressure queryBackPressure) { + super(sleepAfterWriteFunction, assignTimestamp, times, openStores); + this.session = session; + this.batchSize = batchSize; this.executorService = executorService; + this.queryBackPressure = queryBackPressure; } + // Create an async un-logged batch per partition key @Override - protected Optional mutate(DistributedStoreManager.MaskedTimestamp commitTime, Map> mutations, StoreTransaction txh) { + public void mutateMany(final Map> mutations, final StoreTransaction txh) throws BackendException { + + final DistributedStoreManager.MaskedTimestamp commitTime = createMaskedTimestampFunction.apply(txh); + + Optional errorAfterExecution = mutate(commitTime, mutations, txh); + + if (errorAfterExecution.isPresent()) { + throw EXCEPTION_MAPPER.apply(errorAfterExecution.get()); + } + + sleepAfterWriteFunction.accept(commitTime); + } + + protected CompletableFuture execAsyncUnlogged(Seq> group, StoreTransaction txh){ + queryBackPressure.acquireBeforeQuery(); + try{ + return this.session.executeAsync( + BatchStatement.newInstance(DefaultBatchType.UNLOGGED) + .addAll(group) + .setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel()) + ).whenComplete((asyncResultSet, throwable) -> queryBackPressure.releaseAfterQuery()).toCompletableFuture(); + } catch (RuntimeException e){ + queryBackPressure.releaseAfterQuery(); + throw e; + } + } + + protected Iterator>> toGroupedBatchableStatementsSequenceIterator( + final DistributedStoreManager.MaskedTimestamp commitTime, + final KCVMutation keyMutations, + final CQLKeyColumnValueStore columnValueStore, + final StaticBuffer key){ + + Iterator> deletions = deletionsFunction + .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key); + Iterator> additions = additionsFunction + .getBatchableStatementsForColumnOperation(commitTime, keyMutations, columnValueStore, key); + + return Iterator.concat(deletions, additions) + .grouped(this.batchSize); + } + + protected Optional mutate(final DistributedStoreManager.MaskedTimestamp commitTime, + final Map> mutations, + final StoreTransaction txh){ final Future> result = Future.sequence(this.executorService, Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> { final String tableName = tableNameAndMutations.getKey(); final Map tableMutations = tableNameAndMutations.getValue(); @@ -63,4 +125,5 @@ protected Optional mutate(DistributedStoreManager.MaskedTimestamp com return Optional.empty(); } + } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyLoggedFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyLoggedFunction.java deleted file mode 100644 index 2f04a93f3f0..00000000000 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyLoggedFunction.java +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2021 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.cql.function.mutate; - -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.BatchStatement; -import org.janusgraph.diskstorage.BackendException; -import org.janusgraph.diskstorage.common.DistributedStoreManager; -import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; -import org.janusgraph.diskstorage.cql.function.ConsumerWithBackendException; -import org.janusgraph.diskstorage.util.time.TimestampProvider; - -import java.util.Map; - -import static org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.EXCEPTION_MAPPER; - -public class CQLSimpleMutateManyLoggedFunction extends AbstractCQLMutateManyLoggedFunction{ - - private final CqlSession session; - - public CQLSimpleMutateManyLoggedFunction(TimestampProvider times, boolean assignTimestamp, - Map openStores, - CqlSession session, - ConsumerWithBackendException sleepAfterWriteFunction) { - super(times, assignTimestamp, openStores, sleepAfterWriteFunction); - this.session = session; - } - - @Override - protected void execute(BatchStatement batchStatement) throws BackendException { - try{ - this.session.execute(batchStatement); - } catch (Throwable throwable){ - throw EXCEPTION_MAPPER.apply(throwable); - } - } -} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyUnloggedFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyUnloggedFunction.java deleted file mode 100644 index 22244d26a82..00000000000 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/CQLSimpleMutateManyUnloggedFunction.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2021 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.cql.function.mutate; - -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import org.janusgraph.diskstorage.StaticBuffer; -import org.janusgraph.diskstorage.common.DistributedStoreManager; -import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; -import org.janusgraph.diskstorage.cql.function.ConsumerWithBackendException; -import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; -import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; -import org.janusgraph.diskstorage.util.time.TimestampProvider; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -public class CQLSimpleMutateManyUnloggedFunction extends AbstractCQLMutateManyUnloggedFunction{ - - public CQLSimpleMutateManyUnloggedFunction(int batchSize, CqlSession session, Map openStores, - TimestampProvider times, boolean assignTimestamp, - ConsumerWithBackendException sleepAfterWriteFunction) { - super(times, assignTimestamp, session, openStores, batchSize, sleepAfterWriteFunction); - } - - @Override - protected Optional mutate(DistributedStoreManager.MaskedTimestamp commitTime, Map> mutations, StoreTransaction txh) { - List> resultList = new LinkedList<>(); - - mutations.forEach((tableName, tableMutations) -> { - final CQLKeyColumnValueStore columnValueStore = getColumnValueStore(tableName); - - tableMutations.forEach((key, keyMutations) -> - toGroupedBatchableStatementsSequenceIterator(commitTime, keyMutations, columnValueStore, key).forEach(group -> { - CompletableFuture completableFuture = execAsyncUnlogged(group, txh); - resultList.add(completableFuture); - }) - ); - }); - - for(CompletableFuture resultPart : resultList){ - try { - resultPart.get(); - } catch (InterruptedException | ExecutionException e) { - return Optional.of(e); - } - } - - return Optional.empty(); - } -} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AbstractCQLSliceFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AbstractCQLSliceFunction.java deleted file mode 100644 index a112e00c239..00000000000 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AbstractCQLSliceFunction.java +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2021 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.cql.function.slice; - -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.PreparedStatement; -import com.datastax.oss.driver.api.core.cql.Row; -import com.datastax.oss.driver.internal.core.cql.ResultSets; -import io.vavr.Tuple3; -import org.janusgraph.diskstorage.BackendException; -import org.janusgraph.diskstorage.EntryList; -import org.janusgraph.diskstorage.StaticBuffer; -import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; -import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; -import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; -import org.janusgraph.diskstorage.util.StaticArrayEntry; -import org.janusgraph.diskstorage.util.StaticArrayEntryList; - -import java.util.concurrent.CompletableFuture; - -import static org.janusgraph.diskstorage.cql.CQLTransaction.getTransaction; - -public abstract class AbstractCQLSliceFunction implements CQLSliceFunction{ - - private final CqlSession session; - private final PreparedStatement getSlice; - - public AbstractCQLSliceFunction(CqlSession session, PreparedStatement getSlice) { - this.session = session; - this.getSlice = getSlice; - } - - @Override - public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { - return getSlice( - this.session.executeAsync(this.getSlice.boundStatementBuilder() - .setByteBuffer(CQLKeyColumnValueStore.KEY_BINDING, query.getKey().asByteBuffer()) - .setByteBuffer(CQLKeyColumnValueStore.SLICE_START_BINDING, query.getSliceStart().asByteBuffer()) - .setByteBuffer(CQLKeyColumnValueStore.SLICE_END_BINDING, query.getSliceEnd().asByteBuffer()) - .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()) - .setConsistencyLevel(getTransaction(txh).getReadConsistencyLevel()).build()) - .toCompletableFuture() - ); - } - - protected static EntryList fromResultSet(final AsyncResultSet resultSet, final StaticArrayEntry.GetColVal, StaticBuffer> getter) { - return StaticArrayEntryList.ofStaticBuffer(new CQLKeyColumnValueStore.CQLResultSetIterator(ResultSets.newInstance(resultSet)), getter); - } - - protected abstract EntryList getSlice(CompletableFuture completableFutureSlice) throws BackendException; - -} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSliceFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSliceFunction.java new file mode 100644 index 00000000000..4d8f5c56074 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AsyncCQLSliceFunction.java @@ -0,0 +1,122 @@ +// Copyright 2021 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.function.slice; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.Row; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import io.vavr.Tuple; +import io.vavr.Tuple3; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.cql.util.QueryBackPressure; +import org.janusgraph.diskstorage.util.ChunkedJobDefinition; +import org.janusgraph.diskstorage.cql.CQLColValGetter; +import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.EntryListComputationContext; +import org.janusgraph.diskstorage.util.StaticArrayBuffer; +import org.janusgraph.diskstorage.util.StaticArrayEntryList; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +import static org.janusgraph.diskstorage.cql.CQLTransaction.getTransaction; + +public class AsyncCQLSliceFunction implements CQLSliceFunction{ + + private static final Function> ROW_TUPLE_3_FUNCTION = row -> row == null + ? null + : Tuple.of(StaticArrayBuffer.of(row.getByteBuffer(CQLKeyColumnValueStore.COLUMN_COLUMN_NAME)), + StaticArrayBuffer.of(row.getByteBuffer(CQLKeyColumnValueStore.VALUE_COLUMN_NAME)), row); + + private final CqlSession session; + private final PreparedStatement getSlice; + private final CQLColValGetter getter; + private final ExecutorService executorService; + private final QueryBackPressure queryBackPressure; + + public AsyncCQLSliceFunction(CqlSession session, PreparedStatement getSlice, + CQLColValGetter getter, ExecutorService executorService, QueryBackPressure queryBackPressure) { + this.session = session; + this.getSlice = getSlice; + this.getter = getter; + this.executorService = executorService; + this.queryBackPressure = queryBackPressure; + } + + @Override + public CompletableFuture getSlice(KeySliceQuery query, StoreTransaction txh) { + + queryBackPressure.acquireBeforeQuery(); + + ChunkedJobDefinition>, EntryListComputationContext, EntryList> chunkedJobDefinition = new ChunkedJobDefinition<>(); + + try{ + this.session.executeAsync(this.getSlice.boundStatementBuilder() + .setByteBuffer(CQLKeyColumnValueStore.KEY_BINDING, query.getKey().asByteBuffer()) + .setByteBuffer(CQLKeyColumnValueStore.SLICE_START_BINDING, query.getSliceStart().asByteBuffer()) + .setByteBuffer(CQLKeyColumnValueStore.SLICE_END_BINDING, query.getSliceEnd().asByteBuffer()) + .setInt(CQLKeyColumnValueStore.LIMIT_BINDING, query.getLimit()) + .setConsistencyLevel(getTransaction(txh).getReadConsistencyLevel()).build()) + .whenComplete((asyncResultSet, throwable) -> acceptDataChunk(asyncResultSet, throwable, chunkedJobDefinition)); + } catch (RuntimeException e){ + queryBackPressure.releaseAfterQuery(); + throw e; + } + + return chunkedJobDefinition.getResult(); + } + + private void acceptDataChunk(final AsyncResultSet resultSet, final Throwable exception, + final ChunkedJobDefinition>, EntryListComputationContext, EntryList> chunkedJobDefinition) { + + if(exception != null){ + queryBackPressure.releaseAfterQuery(); + chunkedJobDefinition.getResult().completeExceptionally(exception); + return; + } + + if(chunkedJobDefinition.getResult().isCompletedExceptionally()){ + queryBackPressure.releaseAfterQuery(); + return; + } + + try{ + + chunkedJobDefinition.getDataChunks().add(Iterators.transform(resultSet.currentPage().iterator(), ROW_TUPLE_3_FUNCTION)); + + boolean hasMorePages = resultSet.hasMorePages(); + if(hasMorePages){ + resultSet.fetchNextPage().whenComplete((asyncResultSet, throwable) -> acceptDataChunk(asyncResultSet, throwable, chunkedJobDefinition)); + } else { + chunkedJobDefinition.setLastChunkRetrieved(); + queryBackPressure.releaseAfterQuery(); + } + + StaticArrayEntryList.supplyEntryList(chunkedJobDefinition, getter, executorService); + + } catch (RuntimeException e){ + queryBackPressure.releaseAfterQuery(); + throw e; + } + } + +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLExecutorServiceSliceFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLExecutorServiceSliceFunction.java deleted file mode 100644 index e3c7548d18b..00000000000 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLExecutorServiceSliceFunction.java +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2021 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.cql.function.slice; - -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.PreparedStatement; -import io.vavr.concurrent.Future; -import org.janusgraph.diskstorage.BackendException; -import org.janusgraph.diskstorage.EntryList; -import org.janusgraph.diskstorage.PermanentBackendException; -import org.janusgraph.diskstorage.cql.CQLColValGetter; -import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; - -public class CQLExecutorServiceSliceFunction extends AbstractCQLSliceFunction{ - - private final CQLColValGetter getter; - private final ExecutorService executorService; - - public CQLExecutorServiceSliceFunction(CqlSession session, PreparedStatement getSlice, - CQLColValGetter getter, ExecutorService executorService) { - super(session, getSlice); - this.getter = getter; - this.executorService = executorService; - } - - @Override - protected EntryList getSlice(CompletableFuture completableFutureSlice) throws BackendException { - final Future result = Future.fromJavaFuture( - this.executorService, - completableFutureSlice - ).map(resultSet -> fromResultSet(resultSet, this.getter)); - interruptibleWait(result); - return result.getValue().get().getOrElseThrow(CQLKeyColumnValueStore.EXCEPTION_MAPPER); - } - - /** - * VAVR Future.await will throw InterruptedException wrapped in a FatalException. If the Thread was in Object.wait, the interrupted - * flag will be cleared as a side effect and needs to be reset. This method checks that the underlying cause of the FatalException is - * InterruptedException and resets the interrupted flag. - * - * @param result the future to wait on - * @throws PermanentBackendException if the thread was interrupted while waiting for the future result - */ - private void interruptibleWait(final Future result) throws PermanentBackendException { - try { - result.await(); - } catch (Exception e) { - if (e.getCause() instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new PermanentBackendException(e); - } - } -} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSimpleSliceFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSimpleSliceFunction.java deleted file mode 100644 index 08eacd1772f..00000000000 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSimpleSliceFunction.java +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2021 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.cql.function.slice; - -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.PreparedStatement; -import org.janusgraph.diskstorage.BackendException; -import org.janusgraph.diskstorage.EntryList; -import org.janusgraph.diskstorage.PermanentBackendException; -import org.janusgraph.diskstorage.cql.CQLColValGetter; -import org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore; - -import java.util.concurrent.CompletableFuture; - -public class CQLSimpleSliceFunction extends AbstractCQLSliceFunction{ - - private final CQLColValGetter getter; - - public CQLSimpleSliceFunction(CqlSession session, PreparedStatement getSlice, CQLColValGetter getter) { - super(session, getSlice); - this.getter = getter; - } - - @Override - protected EntryList getSlice(CompletableFuture completableFutureSlice) throws BackendException { - AsyncResultSet asyncResultSet = interruptibleWait(completableFutureSlice); - return fromResultSet(asyncResultSet, this.getter); - } - - private T interruptibleWait(final CompletableFuture result) throws BackendException { - try { - return result.get(); - } catch (InterruptedException e){ - Thread.currentThread().interrupt(); - throw new PermanentBackendException(e); - } catch (Exception e) { - throw CQLKeyColumnValueStore.EXCEPTION_MAPPER.apply(e); - } - } -} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSliceFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSliceFunction.java index 9602735759f..96b9bba8845 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSliceFunction.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSliceFunction.java @@ -19,8 +19,10 @@ import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import java.util.concurrent.CompletableFuture; + public interface CQLSliceFunction { - EntryList getSlice(final KeySliceQuery query, final StoreTransaction txh) throws BackendException; + CompletableFuture getSlice(final KeySliceQuery query, final StoreTransaction txh) throws BackendException; } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/PassAllQueryBackPressure.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/PassAllQueryBackPressure.java new file mode 100644 index 00000000000..e2370a93806 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/PassAllQueryBackPressure.java @@ -0,0 +1,27 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.util; + +public class PassAllQueryBackPressure implements QueryBackPressure{ + @Override + public void acquireBeforeQuery() { + // ignored + } + + @Override + public void releaseAfterQuery() { + // ignored + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/QueryBackPressure.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/QueryBackPressure.java new file mode 100644 index 00000000000..51cce292e37 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/QueryBackPressure.java @@ -0,0 +1,23 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.util; + +public interface QueryBackPressure { + + void acquireBeforeQuery(); + + void releaseAfterQuery(); + +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/SemaphoreQueryBackPressure.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/SemaphoreQueryBackPressure.java new file mode 100644 index 00000000000..ad460b027d5 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/util/SemaphoreQueryBackPressure.java @@ -0,0 +1,43 @@ +// Copyright 2023 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql.util; + +import org.janusgraph.core.JanusGraphException; + +import java.util.concurrent.Semaphore; + +public class SemaphoreQueryBackPressure implements QueryBackPressure{ + + private final Semaphore semaphore; + + public SemaphoreQueryBackPressure(Semaphore semaphore) { + this.semaphore = semaphore; + } + + @Override + public void acquireBeforeQuery() { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JanusGraphException(e); + } + } + + @Override + public void releaseAfterQuery(){ + semaphore.release(); + } +} diff --git a/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLConfigTest.java b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLConfigTest.java index 3b9cd4cf00d..024ebf4e961 100644 --- a/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLConfigTest.java +++ b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLConfigTest.java @@ -62,7 +62,6 @@ import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BASE_PROGRAMMATIC_CONFIGURATION_ENABLED; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_CLASS; -import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_ENABLED; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.FILE_CONFIGURATION; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.HEARTBEAT_TIMEOUT; @@ -327,7 +326,6 @@ public void testRequestLoggerConfigurationSet() { public void shouldCreateCachedThreadPool() { WriteConfiguration wc = getConfiguration(); wc.set(ConfigElement.getPath(EXECUTOR_SERVICE_CLASS), ExecutorServiceBuilder.CACHED_THREAD_POOL_CLASS); - wc.set(ConfigElement.getPath(EXECUTOR_SERVICE_ENABLED), true); graph = (StandardJanusGraph) JanusGraphFactory.open(wc); assertDoesNotThrow(() -> { graph.traversal().V().hasNext(); @@ -338,7 +336,6 @@ public void shouldCreateCachedThreadPool() { @Test public void shouldGracefullyCloseGraphWhichLostAConnection(){ WriteConfiguration wc = getConfiguration(); - wc.set(ConfigElement.getPath(EXECUTOR_SERVICE_ENABLED), true); wc.set(ConfigElement.getPath(EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME), 60000); wc.set(ConfigElement.getPath(PARALLEL_BACKEND_EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME), 60000); wc.set(ConfigElement.getPath(IDS_RENEW_TIMEOUT), 10000); diff --git a/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java b/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java index a85e08226f4..5486b6512db 100644 --- a/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java +++ b/janusgraph-cql/src/test/java/org/janusgraph/graphdb/cql/CQLGraphTest.java @@ -22,6 +22,7 @@ import org.janusgraph.JanusGraphCassandraContainer; import org.janusgraph.core.Cardinality; import org.janusgraph.core.JanusGraphException; +import org.janusgraph.core.Multiplicity; import org.janusgraph.core.PropertyKey; import org.janusgraph.diskstorage.PermanentBackendException; import org.janusgraph.diskstorage.TemporaryBackendException; @@ -32,6 +33,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -41,8 +43,9 @@ import static org.janusgraph.diskstorage.cql.CQLConfigOptions.ATOMIC_BATCH_MUTATE; import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BATCH_STATEMENT_SIZE; -import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_ENABLED; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.ASSIGN_TIMESTAMP; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.USE_MULTIQUERY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -72,18 +75,12 @@ public void simpleLogTest() throws InterruptedException{ protected static Stream generateConsistencyConfigs() { return Arrays.stream(new Arguments[]{ - arguments(true, true, 20, true), - arguments(true, false, 20, true), - arguments(true, false, 1, true), - arguments(false, true, 20, true), - arguments(false, false, 20, true), - arguments(false, false, 1, true), - arguments(true, true, 20, false), - arguments(true, false, 20, false), - arguments(true, false, 1, false), - arguments(false, true, 20, false), - arguments(false, false, 20, false), - arguments(false, false, 1, false), + arguments(true, true, 20), + arguments(true, false, 20), + arguments(true, false, 1), + arguments(false, true, 20), + arguments(false, false, 20), + arguments(false, false, 1), }); } @@ -96,8 +93,8 @@ public void testConsistencyEnforcement() { @ParameterizedTest @MethodSource("generateConsistencyConfigs") - public void testConsistencyEnforcement(boolean assignTimestamp, boolean atomicBatch, int batchSize, boolean executorServiceEnabled) { - clopen(option(ASSIGN_TIMESTAMP), assignTimestamp, option(ATOMIC_BATCH_MUTATE), atomicBatch, option(BATCH_STATEMENT_SIZE), batchSize, option(EXECUTOR_SERVICE_ENABLED), executorServiceEnabled); + public void testConsistencyEnforcement(boolean assignTimestamp, boolean atomicBatch, int batchSize) { + clopen(option(ASSIGN_TIMESTAMP), assignTimestamp, option(ATOMIC_BATCH_MUTATE), atomicBatch, option(BATCH_STATEMENT_SIZE), batchSize); super.testConsistencyEnforcement(); } @@ -110,8 +107,8 @@ public void testConcurrentConsistencyEnforcement() { @ParameterizedTest @MethodSource("generateConsistencyConfigs") - public void testConcurrentConsistencyEnforcement(boolean assignTimestamp, boolean atomicBatch, int batchSize, boolean executorServiceEnabled) throws Exception { - clopen(option(ASSIGN_TIMESTAMP), assignTimestamp, option(ATOMIC_BATCH_MUTATE), atomicBatch, option(BATCH_STATEMENT_SIZE), batchSize, option(EXECUTOR_SERVICE_ENABLED), executorServiceEnabled); + public void testConcurrentConsistencyEnforcement(boolean assignTimestamp, boolean atomicBatch, int batchSize) throws Exception { + clopen(option(ASSIGN_TIMESTAMP), assignTimestamp, option(ATOMIC_BATCH_MUTATE), atomicBatch, option(BATCH_STATEMENT_SIZE), batchSize); super.testConcurrentConsistencyEnforcement(); } @@ -134,4 +131,33 @@ public void testQueryLongForPropertyKey() { assertNotEquals(-1, ExceptionUtils.indexOfType(ex, PermanentBackendException.class), "Query should produce a PermanentBackendException"); } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 4, 5, 10, 50, 100, 1000}) + public void fetchElementsUsingDifferentPageSize(int pageSize) { + clopen(option(USE_MULTIQUERY), true, option(PAGE_SIZE), pageSize); + PropertyKey name = mgmt.makePropertyKey("name").dataType(String.class).cardinality(Cardinality.SINGLE).make(); + mgmt.buildIndex("nameIndex", Vertex.class).addKey(name).buildCompositeIndex(); + mgmt.makeEdgeLabel("testEdge").multiplicity(Multiplicity.SIMPLE).make(); + finishSchema(); + + int indexedVerticesCount = 10; + + int adjacentVerticesCount = pageSize * 3; + + GraphTraversalSource g = graph.traversal(); + for(int i=0; i < indexedVerticesCount; i++){ + Vertex indexedVertex = g.addV().property("name", "testName").next(); + for(int j=0; j