Skip to content

Commit

Permalink
Group CQL same token range keys into a single query [cql-tests] [tp-t…
Browse files Browse the repository at this point in the history
…ests]

Fixes #3863

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed Aug 4, 2023
1 parent 8e3816f commit 6d3ec8d
Show file tree
Hide file tree
Showing 30 changed files with 1,801 additions and 197 deletions.
27 changes: 27 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,33 @@ and optimize this method execution to execute all the slice queries for all the
(for example, by using asynchronous programming, slice queries grouping, multi-threaded execution, or any other technique which
is efficient for the respective storage adapter).

##### Added possibility to group multiple slice queries together via CQL storage backend

Starting from JanusGraph 1.0.0 CQL storage implementation now groups queries which fetch properties with `Cardinality.SINGLE` together
into the same CQL query. The behaviour can be disabled by setting configuration `storage.cql.grouping.slice-allowed = false`.

CQL storage implementation has also ability to group queries of different partition keys together if they belong to the same
token range or same replicas set (grouping strategy is available via `storage.cql.grouping.keys-class` configuration option).
This behaviour is disabled by default, but can be enabled via `storage.cql.grouping.keys-allowed = true`.
Please note that enabling the key grouping feature can increase the return size of related queries.
This is due to each row in a CQL query, which encompasses grouped keys, needing to return its specific key.
Moreover, it could potentially lead to less balanced load on the storage cluster. However, it reduces the amount of CQL
queries sent which may positively influence throughput in some cases as well as pricing point of some Serverless deployments.
We recommend to benchmark each use-case before enabling keys grouping (`storage.cql.grouping.keys-allowed`).

Notice, keys grouping (`storage.cql.grouping.keys-allowed`) can be used only with storage backends which support `PER PARTITION LIMIT`.
As such, this feature can't be used with Amazon Keyspaces because it doesn't support `PER PARTITION LIMIT`.

Different storage backends may also have restriction set on maximum keys which can be provided via `IN` operator (which is needed for grouping).
It is required to ensure that `storage.cql.grouping.keys-limit` or `storage.cql.grouping.slice-limit` is less than or equal to the
restriction provided via the storage backend.
On ScyllaDB it's possible to configure this restriction using [max-partition-key-restrictions-per-query](https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions)
configuration option (default to `100`).
On AstraDB side it is needed to be asked on AstraDB side to be changed via `partition_keys_in_select_failure_threshold` and `in_select_cartesian_product_failure_threshold` threshold
configurations (https://docs.datastax.com/en/astra-serverless/docs/plan/planning.html#_cassandra_yaml) which are set to `20` and `25` by default.

See additional properties to control grouping configurations under the namespace `storage.cql.grouping`.

##### Removal of deprecated classes/methods/functionalities

###### Methods
Expand Down
38 changes: 34 additions & 4 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,6 @@ CQL storage backend options
| storage.cql.request-timeout | Timeout for CQL requests in milliseconds. See DataStax Java Driver option `basic.request.timeout` for more information. | Long | 12000 | MASKABLE |
| storage.cql.session-leak-threshold | The maximum number of live sessions that are allowed to coexist in a given VM until the warning starts to log for every new session. If the value is less than or equal to 0, the feature is disabled: no warning will be issued. See DataStax Java Driver option `advanced.session-leak.threshold` for more information. | Integer | (no default value) | MASKABLE |
| storage.cql.session-name | Default name for the Cassandra session | String | JanusGraph Session | MASKABLE |
| storage.cql.slice-grouping-allowed | If `true` this allows multiple Slice queries which are allowed to be performed as non-range queries (i.e. direct equality operation) to be grouped together into a single CQL query via `IN` operator. Notice, currently only operations to fetch properties with Cardinality.SINGLE are allowed to be performed as non-range queries (edges fetching or properties with Cardinality SET or LIST won't be grouped together).
If this option if `false` then each Slice query will be executed in a separate asynchronous CQL query even when grouping is allowed. | Boolean | true | MASKABLE |
| storage.cql.slice-grouping-limit | Maximum amount of grouped together slice queries into a single CQL query.
This option is used only when `storage.cql.slice-grouping-allowed` is `true`. | Integer | 100 | MASKABLE |
| storage.cql.speculative-retry | The speculative retry policy. One of: NONE, ALWAYS, <X>percentile, <N>ms. | String | (no default value) | FIXED |
| storage.cql.ttl-enabled | Whether TTL should be enabled or not. Must be turned off if the storage does not support TTL. Amazon Keyspace, for example, does not support TTL by default unless otherwise enabled. | Boolean | true | LOCAL |
| storage.cql.use-external-locking | True to prevent JanusGraph from using its own locking mechanism. Setting this to true eliminates redundant checks when using an external locking mechanism outside of JanusGraph. Be aware that when use-external-locking is set to true, that failure to employ a locking algorithm which locks all columns that participate in a transaction upfront and unlocks them when the transaction ends, will result in a 'read uncommitted' transaction isolation level guarantee. If set to true without an appropriate external locking mechanism in place side effects such as dirty/non-repeatable/phantom reads should be expected. | Boolean | false | MASKABLE |
Expand All @@ -482,6 +478,40 @@ Configuration options for CQL executor service which is used to process deserial
| storage.cql.executor-service.max-pool-size | Maximum pool size for executor service. Ignored for `fixed` and `cached` executor services. May be ignored if custom executor service is used (depending on the implementation of the executor service). | Integer | 2147483647 | LOCAL |
| storage.cql.executor-service.max-shutdown-wait-time | Max shutdown wait time in milliseconds for executor service threads to be finished during shutdown. After this time threads will be interrupted (signalled with interrupt) without any additional wait time. | Long | 60000 | LOCAL |

### storage.cql.grouping
Configuration options for controlling CQL queries grouping


| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| storage.cql.grouping.keys-allowed | If `true` this allows multiple partition keys to be grouped together into a single CQL query via `IN` operator based on the keys grouping strategy provided (usually grouping is done by same token-ranges or same replica sets, but may also involve shard ids for custom implementations).
Notice, that any CQL query grouped with more than 1 key will require to return a row key for any column fetched.
This option is useful when less amount of CQL queries is desired to be sent for read requests in expense of fetching more data (partition key per each fetched value).
Notice, different storage backends may have different way of executing multi-partition `IN` queries (including, but not limited to how the checksum queries are sent for different consistency levels, processing node CPU usage, disk access pattern, etc.). Thus, a proper benchmarking is needed to determine if keys grouping is useful or not per case by case scenario.
This option can be enabled only for storage backends which support `PER PARTITION LIMIT`. As such, this feature can't be used with Amazon Keyspaces because it doesn't support `PER PARTITION LIMIT`.
If this option is `false` then each partition key will be executed in a separate asynchronous CQL query even when multiple keys from the same token range are queried.
Notice, the default grouping strategy does not take shards into account. Thus, this might be inefficient with ScyllaDB storage backend. ScyllaDB specific keys grouping strategy should be implemented after the resolution of the [ticket #232](https://github.com/scylladb/java-driver/issues/232). | Boolean | false | MASKABLE |
| storage.cql.grouping.keys-class | Full class path of the keys grouping execution strategy. The class should implement `org.janusgraph.diskstorage.cql.strategy.GroupedExecutionStrategy` interface and have a public constructor with two arguments `org.janusgraph.diskstorage.configuration.Configuration` and `org.janusgraph.diskstorage.cql.CQLStoreManager`.
Shortcuts available:
- `tokenRangeAware` - groups partition keys which belong to the same token range. Notice, this strategy does not take shards into account. Thus, this might be inefficient with ScyllaDB storage backend.
- `replicasAware` - groups partition keys which belong to the same replica sets (same nodes). Notice, this strategy does not take shards into account. Thus, this might be inefficient with ScyllaDB storage backend.

Usually `tokenRangeAware` grouping strategy provides more smaller groups where each group contain keys which are stored close to each other on a disk and may cause less disk seeks in some cases. However `replicasAware` grouping strategy groups keys per replica set which usually means fewer bigger groups to be used (i.e. less CQL requests).
This option takes effect only when `storage.cql.grouping.keys-allowed` is `true`. | String | replicasAware | MASKABLE |
| storage.cql.grouping.keys-limit | Maximum amount of the keys which can be grouped together into a single CQL query. If more keys are queried, they are going to be grouped into separate CQL queries.
Notice, for ScyllaDB this option should not exceed the maximum number of distinct clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` (https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions). For AstraDB this limit is set to 20 and usually it's fixed. However, you can ask customer support for a possibility to change the default threshold to your desired configuration via `partition_keys_in_select_failure_threshold` and `in_select_cartesian_product_failure_threshold` threshold configurations (https://docs.datastax.com/en/astra-serverless/docs/plan/planning.html#_cassandra_yaml).
Ensure that your storage backend allows more IN selectors than the one set via this configuration.
This option takes effect only when `storage.cql.grouping.keys-allowed` is `true`. | Integer | 20 | MASKABLE |
| storage.cql.grouping.keys-min | Minimum amount of keys to consider for grouping. Grouping will be skipped for any multi-key query which has less than this amount of keys (i.e. a separate CQL query will be executed for each key in such case).
Usually this configuration should always be set to `2`. It is useful to increase the value only in cases when queries with more keys should not be grouped, but be performed separately to increase parallelism in expense of the network overhead.
This option takes effect only when `storage.cql.grouping.keys-allowed` is `true`. | Integer | 2 | MASKABLE |
| storage.cql.grouping.slice-allowed | If `true` this allows multiple Slice queries which are allowed to be performed as non-range queries (i.e. direct equality operation) to be grouped together into a single CQL query via `IN` operator. Notice, currently only operations to fetch properties with Cardinality.SINGLE are allowed to be performed as non-range queries (edges fetching or properties with Cardinality SET or LIST won't be grouped together).
If this option is `false` then each Slice query will be executed in a separate asynchronous CQL query even when grouping is allowed. | Boolean | true | MASKABLE |
| storage.cql.grouping.slice-limit | Maximum amount of grouped together slice queries into a single CQL query.
Notice, for ScyllaDB this option should not exceed the maximum number of distinct clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` (https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions). For AstraDB this limit is set to 20 and usually it's fixed. However, you can ask customer support for a possibility to change the default threshold to your desired configuration via `partition_keys_in_select_failure_threshold` and `in_select_cartesian_product_failure_threshold` threshold configurations (https://docs.datastax.com/en/astra-serverless/docs/plan/planning.html#_cassandra_yaml).
Ensure that your storage backend allows more IN selectors than the one set via this configuration.
This option is used only when `storage.cql.grouping.slice-allowed` is `true`. | Integer | 20 | MASKABLE |

### storage.cql.internal
Advanced configuration of internal DataStax driver. Notice, all available configurations will be composed in the order. Non specified configurations will be skipped. By default only base configuration is enabled (which has the smallest priority. It means that you can overwrite any configuration used in base programmatic configuration by using any other configuration type). The configurations are composed in the next order (sorted by priority in descending order): `file-configuration`, `resource-configuration`, `string-configuration`, `url-configuration`, `base-programmatic-configuration` (which is controlled by `base-programmatic-configuration-enabled` property). Configurations with higher priority always overwrite configurations with lower priority. I.e. if the same configuration parameter is used in both `file-configuration` and `string-configuration` the configuration parameter from `file-configuration` will be used and configuration parameter from `string-configuration` will be ignored. See available configuration options and configurations structure here: https://docs.datastax.com/en/developer/java-driver/4.13/manual/core/configuration/reference/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum EntryMetaData {

TTL(Integer.class, false, data -> data instanceof Integer && ((Integer) data) >= 0L),
VISIBILITY(String.class, true, data -> data instanceof String && StringEncoding.isAsciiString((String) data)),
TIMESTAMP(Long.class, false, data -> data instanceof Long);
TIMESTAMP(Long.class, false, data -> data instanceof Long),
ROW_KEY(StaticBuffer.class, false, data -> data instanceof StaticBuffer);

public static final java.util.Map<EntryMetaData,Object> EMPTY_METADATA = Collections.emptyMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -63,14 +65,17 @@ public static <T> T get(CompletableFuture<T> future){
public static <K,V> Map<K,V> unwrap(Map<K,CompletableFuture<V>> futureMap) throws Throwable{
Map<K, V> resultMap = new HashMap<>(futureMap.size());
Throwable firstException = null;
Set<Throwable> uniqueExceptions = null;
for(Map.Entry<K, CompletableFuture<V>> entry : futureMap.entrySet()){
try{
resultMap.put(entry.getKey(), entry.getValue().get());
} catch (Throwable throwable){
Throwable rootException = unwrapExecutionException(throwable);
if(firstException == null){
firstException = throwable;
} else {
firstException.addSuppressed(throwable);
firstException = rootException;
uniqueExceptions = new HashSet<>(1);
} else if(firstException != rootException && uniqueExceptions.add(rootException)){
firstException.addSuppressed(rootException);
}
}
}
Expand All @@ -84,14 +89,17 @@ public static <K,V> Map<K,V> unwrap(Map<K,CompletableFuture<V>> futureMap) throw
public static <K,V,R> Map<K,Map<V, R>> unwrapMapOfMaps(Map<K, Map<V, CompletableFuture<R>>> futureMap) throws Throwable{
Map<K, Map<V, R>> resultMap = new HashMap<>(futureMap.size());
Throwable firstException = null;
Set<Throwable> uniqueExceptions = null;
for(Map.Entry<K, Map<V, CompletableFuture<R>>> entry : futureMap.entrySet()){
try{
resultMap.put(entry.getKey(), unwrap(entry.getValue()));
} catch (Throwable throwable){
Throwable rootException = unwrapExecutionException(throwable);
if(firstException == null){
firstException = throwable;
} else {
firstException.addSuppressed(throwable);
firstException = rootException;
uniqueExceptions = new HashSet<>(1);
} else if(firstException != rootException && uniqueExceptions.add(rootException)){
firstException.addSuppressed(rootException);
}
}
}
Expand All @@ -104,14 +112,17 @@ public static <K,V,R> Map<K,Map<V, R>> unwrapMapOfMaps(Map<K, Map<V, Completable

public static <V> void awaitAll(Collection<CompletableFuture<V>> futureCollection) throws Throwable{
Throwable firstException = null;
Set<Throwable> uniqueExceptions = null;
for(CompletableFuture<V> future : futureCollection){
try{
future.get();
} catch (Throwable throwable){
Throwable rootException = unwrapExecutionException(throwable);
if(firstException == null){
firstException = throwable;
} else {
firstException.addSuppressed(throwable);
firstException = rootException;
uniqueExceptions = new HashSet<>(1);
} else if(firstException != rootException && uniqueExceptions.add(rootException)){
firstException.addSuppressed(rootException);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ public static MetaDataSerializer getSerializer(EntryMetaData meta) {
return LongSerializer.INSTANCE;
case VISIBILITY:
return ASCIIStringSerializer.INSTANCE;
case ROW_KEY:
return StaticBufferSerializer.INSTANCE;
default: throw new AssertionError("Unexpected meta data: " + meta);
}
}
Expand Down Expand Up @@ -736,4 +738,38 @@ public String read(byte[] data, int startPos) {
}
}

private enum StaticBufferSerializer implements MetaDataSerializer<StaticBuffer> {

INSTANCE;

private static final StaticBuffer EMPTY_STATIC_BUFFER = StaticArrayBuffer.of(new byte[0]);

@Override
public int getByteLength(StaticBuffer value) {
return value.length() + 4;
}

@Override
public void write(byte[] data, int startPos, StaticBuffer value) {
int length = value.length();
StaticArrayBuffer.putInt(data, startPos, length);
if(length > 0){
startPos+=4;
for(int i=0; i<length; i++){
data[startPos++] = value.getByte(i);
}
}
}

@Override
public StaticBuffer read(byte[] data, int startPos) {
int bufSize = StaticArrayBuffer.getInt(data, startPos);
if(bufSize == 0){
return EMPTY_STATIC_BUFFER;
}
startPos+=4;
return new StaticArrayBuffer(data, startPos, startPos+bufSize);
}
}

}
Loading

1 comment on commit 6d3ec8d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 6d3ec8d Previous: 8e3816f Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 14841.2029223762 ms/op 14532.494919968713 ms/op 1.02
org.janusgraph.GraphCentricQueryBenchmark.getVertices 1407.964502325157 ms/op 1341.4205490969866 ms/op 1.05
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 221.89997733913043 ms/op 219.50517400869566 ms/op 1.01
org.janusgraph.MgmtOlapJobBenchmark.runReindex 478.9711819858586 ms/op 474.74738472 ms/op 1.01
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 413.59232150927204 ms/op 385.5766658693171 ms/op 1.07
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 10842.465855100349 ms/op 9837.973724257849 ms/op 1.10
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 36261.26070261999 ms/op 30990.078781266668 ms/op 1.17
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 38373.35104530714 ms/op 34130.417437145006 ms/op 1.12
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 69154.98505896666 ms/op 64924.88423363333 ms/op 1.07
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 17752.56837082264 ms/op 15213.595918878282 ms/op 1.17
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 640.9295423311797 ms/op 622.1791426031332 ms/op 1.03
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 9574.138906590011 ms/op 8743.251293119505 ms/op 1.10
org.janusgraph.CQLMultiQueryBenchmark.getNames 17816.678904073575 ms/op 15399.258113350808 ms/op 1.16
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 12973.089301988444 ms/op 12123.446650210828 ms/op 1.07
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 701.905681583698 ms/op 687.4349321580046 ms/op 1.02
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 24222.458341530553 ms/op 21767.370231805082 ms/op 1.11
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 630.8417517077713 ms/op 594.3546264197971 ms/op 1.06
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 31782.230165439207 ms/op 28281.21560570778 ms/op 1.12
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 443.95956339818827 ms/op 437.1551211838477 ms/op 1.02
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 37103.35960490555 ms/op 33032.19885374843 ms/op 1.12
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 18109.004589783406 ms/op 15388.736694382991 ms/op 1.18
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 19386.233837131796 ms/op 16535.70284782947 ms/op 1.17
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 18257.012663244175 ms/op 15568.310523091981 ms/op 1.17

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.