Skip to content

Commit

Permalink
Group CQL keys into a single query
Browse files Browse the repository at this point in the history
Fixes #3863

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed Jul 6, 2023
1 parent 16846ad commit 4ffdd10
Show file tree
Hide file tree
Showing 21 changed files with 1,009 additions and 175 deletions.
23 changes: 23 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,29 @@ and optimize this method execution to execute all the slice queries for all the
(for example, by using asynchronous programming, slice queries grouping, multi-threaded execution, or any other technique which
is efficient for the respective storage adapter).

##### CQL storage backend now groups multiple keys and multiple slice queries together for the same token ranges

Starting from JanusGraph 1.0.0 CQL storage implementation now groups different keys and slice queries together to
minimize the amount of CQL queries sent to the storage backend.
This often comes with better performance characteristics due to less disk seeks on the storage backend size and often less network overhead.
However, in some rare cases it might degrade performance due to imbalanced queries execution.
Moreover, keys grouping requires fetching an additional `key` column (i.e. vertex id) per each fetched data column (i.e. per each fetched property or edge).
An additional `key` column retrieval is often smaller than the separate query network overhead (unless very large `String` vertex ids are used),
but for some Serverless storage deployments this additional `key` bytes might be charged by the storage providers.
In most cases proper grouping of queries and / or keys together bring only benefits for CQL queries execution, but in
case the previous behaviour is desired than the grouping can be disabled by setting `cql.slice-grouping-allowed = false` and
`cql.keys-grouping-allowed = false`.

It is recommended to use the below new configurations to configure grouping limits which brings the most benefit for your deployment:
- `cql.slice-grouping-limit` - maximum amount of slice queries grouped together. As for now only queries which are fetching multiple properties with cardinality `SINGLE` can be grouped. Any other query will be executed separately.
- `cql.keys-grouping-limit` - maximum amount of keys (vertex ids) grouped together. Only keys which are placed on the same tokens range can be grouped together (i.e. only keys which touch the same partition).

By default `cql.keys-grouping-limit` is set to `100`, but it's recommended to increase this value depending on the use case.
Notice, ScyllaDB storage backend has a limit on distinct clustering key restrictions per query which is set to `100` by default.
To be able to use more than `100` for `cql.keys-grouping-limit` or `cql.slice-grouping-limit` it is required first to ensure
that a bigger amount of distinct clustering key restrictions is allowed on the storage backend. On ScyllaDB it's possible
to configure this restriction using [max-partition-key-restrictions-per-query](https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions) configuration option.

##### Removal of deprecated classes/methods/functionalities

###### Methods
Expand Down
7 changes: 7 additions & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ CQL storage backend options
| storage.cql.gc-grace-seconds | The number of seconds before tombstones (deletion markers) are eligible for garbage-collection. | Integer | (no default value) | FIXED |
| storage.cql.heartbeat-interval | The connection heartbeat interval in milliseconds. | Long | (no default value) | MASKABLE |
| storage.cql.heartbeat-timeout | How long the driver waits for the response (in milliseconds) to a heartbeat. | Long | (no default value) | MASKABLE |
| storage.cql.keys-grouping-allowed | If `true` this allows multiple same-partition keys for the same queries to be grouped together into a single CQL query via `IN` operator.
Notice, that any CQL query grouped with more than 1 key will require to return a row key for any column fetched.
If this option if `false` then each key will be executed in a separate asynchronous CQL query even when multiple keys from the same partition are queried. | Boolean | true | MASKABLE |
| storage.cql.keys-grouping-limit | Maximum amount of the same-partition keys to be grouped together into a single CQL query. If more keys for the same partition are queried, they are going to be grouped into a separate CQL queries.
Notice, for ScyllaDB this option should not exceed the maximum number of distinct clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` (https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions).
This option takes effect only when `storage.cql.keys-grouping-allowed` is `true`. | Integer | 100 | MASKABLE |
| storage.cql.keyspace | The name of JanusGraph's keyspace. It will be created if it does not exist. | String | janusgraph | LOCAL |
| storage.cql.local-datacenter | The name of the local or closest Cassandra datacenter. This value will be passed into CqlSessionBuilder.withLocalDatacenter. | String | datacenter1 | MASKABLE |
| storage.cql.local-max-connections-per-host | The maximum number of connections that can be created per host for local datacenter | Integer | 1 | MASKABLE |
Expand All @@ -464,6 +470,7 @@ CQL storage backend options
| storage.cql.slice-grouping-allowed | If `true` this allows multiple Slice queries which are allowed to be performed as non-range queries (i.e. direct equality operation) to be grouped together into a single CQL query via `IN` operator. Notice, currently only operations to fetch properties with Cardinality.SINGLE are allowed to be performed as non-range queries (edges fetching or properties with Cardinality SET or LIST won't be grouped together).
If this option if `false` then each Slice query will be executed in a separate asynchronous CQL query even when grouping is allowed. | Boolean | true | MASKABLE |
| storage.cql.slice-grouping-limit | Maximum amount of grouped together slice queries into a single CQL query.
Notice, for ScyllaDB this option should not exceed the maximum number of distinct clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` (https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions).
This option is used only when `storage.cql.slice-grouping-allowed` is `true`. | Integer | 100 | MASKABLE |
| storage.cql.speculative-retry | The speculative retry policy. One of: NONE, ALWAYS, <X>percentile, <N>ms. | String | (no default value) | FIXED |
| storage.cql.ttl-enabled | Whether TTL should be enabled or not. Must be turned off if the storage does not support TTL. Amazon Keyspace, for example, does not support TTL by default unless otherwise enabled. | Boolean | true | LOCAL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum EntryMetaData {

TTL(Integer.class, false, data -> data instanceof Integer && ((Integer) data) >= 0L),
VISIBILITY(String.class, true, data -> data instanceof String && StringEncoding.isAsciiString((String) data)),
TIMESTAMP(Long.class, false, data -> data instanceof Long);
TIMESTAMP(Long.class, false, data -> data instanceof Long),
ROW_KEY(StaticBuffer.class, false, data -> data instanceof StaticBuffer);

public static final java.util.Map<EntryMetaData,Object> EMPTY_METADATA = Collections.emptyMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -63,14 +65,17 @@ public static <T> T get(CompletableFuture<T> future){
public static <K,V> Map<K,V> unwrap(Map<K,CompletableFuture<V>> futureMap) throws Throwable{
Map<K, V> resultMap = new HashMap<>(futureMap.size());
Throwable firstException = null;
Set<Throwable> uniqueExceptions = null;
for(Map.Entry<K, CompletableFuture<V>> entry : futureMap.entrySet()){
try{
resultMap.put(entry.getKey(), entry.getValue().get());
} catch (Throwable throwable){
Throwable rootException = unwrapExecutionException(throwable);

Check warning on line 73 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L73

Added line #L73 was not covered by tests
if(firstException == null){
firstException = throwable;
} else {
firstException.addSuppressed(throwable);
firstException = rootException;
uniqueExceptions = new HashSet<>(1);

Check warning on line 76 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L75-L76

Added lines #L75 - L76 were not covered by tests
} else if(firstException != rootException && uniqueExceptions.add(rootException)){

Check warning on line 77 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L77

Use equals() to compare object references.
firstException.addSuppressed(rootException);

Check warning on line 78 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L78

Added line #L78 was not covered by tests
}
}
}
Expand All @@ -84,14 +89,17 @@ public static <K,V> Map<K,V> unwrap(Map<K,CompletableFuture<V>> futureMap) throw
public static <K,V,R> Map<K,Map<V, R>> unwrapMapOfMaps(Map<K, Map<V, CompletableFuture<R>>> futureMap) throws Throwable{
Map<K, Map<V, R>> resultMap = new HashMap<>(futureMap.size());
Throwable firstException = null;
Set<Throwable> uniqueExceptions = null;
for(Map.Entry<K, Map<V, CompletableFuture<R>>> entry : futureMap.entrySet()){
try{
resultMap.put(entry.getKey(), unwrap(entry.getValue()));
} catch (Throwable throwable){
Throwable rootException = unwrapExecutionException(throwable);

Check warning on line 97 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L97

Added line #L97 was not covered by tests
if(firstException == null){
firstException = throwable;
} else {
firstException.addSuppressed(throwable);
firstException = rootException;
uniqueExceptions = new HashSet<>(1);

Check warning on line 100 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L99-L100

Added lines #L99 - L100 were not covered by tests
} else if(firstException != rootException && uniqueExceptions.add(rootException)){
firstException.addSuppressed(rootException);

Check warning on line 102 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L102

Added line #L102 was not covered by tests
}
}
}
Expand All @@ -104,14 +112,17 @@ public static <K,V,R> Map<K,Map<V, R>> unwrapMapOfMaps(Map<K, Map<V, Completable

public static <V> void awaitAll(Collection<CompletableFuture<V>> futureCollection) throws Throwable{
Throwable firstException = null;
Set<Throwable> uniqueExceptions = null;
for(CompletableFuture<V> future : futureCollection){
try{
future.get();
} catch (Throwable throwable){
Throwable rootException = unwrapExecutionException(throwable);

Check warning on line 120 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L120

Added line #L120 was not covered by tests
if(firstException == null){
firstException = throwable;
} else {
firstException.addSuppressed(throwable);
firstException = rootException;
uniqueExceptions = new HashSet<>(1);

Check warning on line 123 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L122-L123

Added lines #L122 - L123 were not covered by tests
} else if(firstException != rootException && uniqueExceptions.add(rootException)){
firstException.addSuppressed(rootException);

Check warning on line 125 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/CompletableFutureUtil.java#L125

Added line #L125 was not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ public static MetaDataSerializer getSerializer(EntryMetaData meta) {
return LongSerializer.INSTANCE;
case VISIBILITY:
return ASCIIStringSerializer.INSTANCE;
case ROW_KEY:
return StaticBufferSerializer.INSTANCE;
default: throw new AssertionError("Unexpected meta data: " + meta);
}
}
Expand Down Expand Up @@ -736,4 +738,38 @@ public String read(byte[] data, int startPos) {
}
}

private enum StaticBufferSerializer implements MetaDataSerializer<StaticBuffer> {

INSTANCE;

private static final StaticBuffer EMPTY_STATIC_BUFFER = StaticArrayBuffer.of(new byte[0]);

@Override
public int getByteLength(StaticBuffer value) {
return value.length() + 4;
}

@Override
public void write(byte[] data, int startPos, StaticBuffer value) {
int length = value.length();
StaticArrayBuffer.putInt(data, startPos, length);
if(length > 0){
startPos+=4;

Check warning on line 757 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java#L757

Avoid reassigning parameters such as 'startPos'
for(int i=0; i<length; i++){
data[startPos++] = value.getByte(i);
}
}
}

@Override
public StaticBuffer read(byte[] data, int startPos) {
int bufSize = StaticArrayBuffer.getInt(data, startPos);
if(bufSize == 0){
return EMPTY_STATIC_BUFFER;

Check warning on line 768 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/StaticArrayEntryList.java#L768

Added line #L768 was not covered by tests
}
startPos+=4;
return new StaticArrayBuffer(data, startPos, startPos+bufSize);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
import io.vavr.Tuple3;
import org.janusgraph.diskstorage.EntryMetaData;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry.GetColVal;

import java.nio.ByteBuffer;

public class CQLColValGetter implements GetColVal<Tuple3<StaticBuffer, StaticBuffer, Row>, StaticBuffer> {

private static final StaticArrayBuffer EMPTY_KEY = StaticArrayBuffer.of(new byte[0]);

private final EntryMetaData[] schema;

CQLColValGetter(final EntryMetaData[] schema) {
Expand Down Expand Up @@ -50,6 +55,9 @@ public Object getMetaData(final Tuple3<StaticBuffer, StaticBuffer, Row> tuple, f
return tuple._3.getLong(CQLKeyColumnValueStore.WRITETIME_COLUMN_NAME);
case TTL:
return tuple._3.getInt(CQLKeyColumnValueStore.TTL_COLUMN_NAME);
case ROW_KEY:
ByteBuffer rawKey = tuple._3.getByteBuffer(CQLKeyColumnValueStore.KEY_COLUMN_NAME);
return rawKey == null ? EMPTY_KEY : StaticArrayBuffer.of(rawKey);
default:
throw new UnsupportedOperationException("Unsupported meta data: " + metaData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,10 @@ public interface CQLConfigOptions {
ConfigOption.Type.LOCAL,
String.class);

String SCYLLA_DB_MAX_IN_CONFIG_MSG = "Notice, for ScyllaDB this option should not exceed the maximum number of distinct " +
"clustering key restrictions per query which can be changed by ScyllaDB configuration option `max-partition-key-restrictions-per-query` " +
"(https://enterprise.docs.scylladb.com/branch-2022.2/faq.html#how-can-i-change-the-maximum-number-of-in-restrictions).";

ConfigOption<Boolean> SLICE_GROUPING_ALLOWED = new ConfigOption<>(
CQL_NS,
"slice-grouping-allowed",
Expand All @@ -768,8 +772,26 @@ public interface CQLConfigOptions {
ConfigOption<Integer> SLICE_GROUPING_LIMIT = new ConfigOption<>(
CQL_NS,
"slice-grouping-limit",
"Maximum amount of grouped together slice queries into a single CQL query.\n" +
"This option is used only when `"+SLICE_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.",
"Maximum amount of grouped together slice queries into a single CQL query.\n" + SCYLLA_DB_MAX_IN_CONFIG_MSG +
"\nThis option is used only when `"+SLICE_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.",
ConfigOption.Type.MASKABLE,
100);

ConfigOption<Boolean> KEYS_GROUPING_ALLOWED = new ConfigOption<>(
CQL_NS,
"keys-grouping-allowed",
"If `true` this allows multiple same-partition keys for the same queries to be grouped together into a single CQL query via `IN` operator.\n" +
"Notice, that any CQL query grouped with more than 1 key will require to return a row key for any column fetched.\n" +
"If this option if `false` then each key will be executed in a separate asynchronous CQL query even when multiple keys from the same partition are queried.",
ConfigOption.Type.MASKABLE,
true);

ConfigOption<Integer> KEYS_GROUPING_LIMIT = new ConfigOption<>(
CQL_NS,
"keys-grouping-limit",
"Maximum amount of the same-partition keys to be grouped together into a single CQL query. If more keys for the same partition are queried, " +
"they are going to be grouped into a separate CQL queries.\n" + SCYLLA_DB_MAX_IN_CONFIG_MSG +
"\nThis option takes effect only when `"+KEYS_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.",
ConfigOption.Type.MASKABLE,
100);
}
Loading

0 comments on commit 4ffdd10

Please sign in to comment.