Skip to content

Commit

Permalink
Group CQL keys into a single query
Browse files Browse the repository at this point in the history
Fixes #3863

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed Jul 5, 2023
1 parent 16846ad commit 3438649
Show file tree
Hide file tree
Showing 22 changed files with 1,127 additions and 96 deletions.
10 changes: 10 additions & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ CQL storage backend options
| storage.cql.gc-grace-seconds | The number of seconds before tombstones (deletion markers) are eligible for garbage-collection. | Integer | (no default value) | FIXED |
| storage.cql.heartbeat-interval | The connection heartbeat interval in milliseconds. | Long | (no default value) | MASKABLE |
| storage.cql.heartbeat-timeout | How long the driver waits for the response (in milliseconds) to a heartbeat. | Long | (no default value) | MASKABLE |
| storage.cql.keys-grouping-allowed | If `true` this allows multiple same-partition keys for the same queries to be grouped together into a single CQL query via `IN` operator.
Notice, that any CQL query grouped with more than 1 key will require to return a row key for any column fetched.
If this option if `false` then each key will be executed in a separate asynchronous CQL query even when multiple keys from the same partition are queried. | Boolean | true | MASKABLE |
| storage.cql.keys-grouping-distribution-size | Preferred groups distribution amount for keys grouping. This distribution controls preferred amount of same-partition groups to be used when performing a multi-key query.
There will be at least this amount of groups used when keys amount is more or equal to the provided value. Each group will be no grater than `storage.cql.keys-grouping-limit` in size. This option could be equal to the replication factor so that same-partition CQL queries could be load-balanced to multiple different nodes serving same partitions. However, a different value could be more optimal in cases when the underlying CQL cluster is unbalanced or when load balancing strategy can't ensure optimal CQL queries distribution.
If this option is not set or it's value is -1 than the used value is equal to number of processors multiplied by 2.

This option takes effect only when `storage.cql.keys-grouping-allowed` is `true`. | Integer | (no default value) | MASKABLE |
| storage.cql.keys-grouping-limit | Maximum amount of the same-partition keys to be grouped together into a single CQL query. If more keys for the same partition are queried, they are going to be grouped into separate CQL queries. When this option is set to `0` then this option is treated as unlimited.
This option takes effect only when `storage.cql.keys-grouping-allowed` is `true`. | Integer | 1000 | MASKABLE |
| storage.cql.keyspace | The name of JanusGraph's keyspace. It will be created if it does not exist. | String | janusgraph | LOCAL |
| storage.cql.local-datacenter | The name of the local or closest Cassandra datacenter. This value will be passed into CqlSessionBuilder.withLocalDatacenter. | String | datacenter1 | MASKABLE |
| storage.cql.local-max-connections-per-host | The maximum number of connections that can be created per host for local datacenter | Integer | 1 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum EntryMetaData {

TTL(Integer.class, false, data -> data instanceof Integer && ((Integer) data) >= 0L),
VISIBILITY(String.class, true, data -> data instanceof String && StringEncoding.isAsciiString((String) data)),
TIMESTAMP(Long.class, false, data -> data instanceof Long);
TIMESTAMP(Long.class, false, data -> data instanceof Long),
ROW_KEY(StaticBuffer.class, false, data -> data instanceof StaticBuffer);

public static final java.util.Map<EntryMetaData,Object> EMPTY_METADATA = Collections.emptyMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ public interface GetColVal<E,D> {

}

public interface GetKeyColVal<E,D> {

D getKey(E element);

D getColumn(E element);

D getValue(E element);

EntryMetaData[] getMetaSchema(E element);

Object getMetaData(E element, EntryMetaData meta);

}

public static final EntryMetaData[] EMPTY_SCHEMA = new EntryMetaData[0];

public static final GetColVal<Entry,StaticBuffer> ENTRY_GETTER = new GetColVal<Entry, StaticBuffer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ public static MetaDataSerializer getSerializer(EntryMetaData meta) {
return LongSerializer.INSTANCE;
case VISIBILITY:
return ASCIIStringSerializer.INSTANCE;
case ROW_KEY:
return StaticBufferSerializer.INSTANCE;
default: throw new AssertionError("Unexpected meta data: " + meta);
}
}
Expand Down Expand Up @@ -736,4 +738,38 @@ public String read(byte[] data, int startPos) {
}
}

private enum StaticBufferSerializer implements MetaDataSerializer<StaticBuffer> {

INSTANCE;

private static final StaticBuffer EMPTY_STATIC_BUFFER = StaticArrayBuffer.of(new byte[0]);

@Override
public int getByteLength(StaticBuffer value) {
return value.length() + 4;
}

@Override
public void write(byte[] data, int startPos, StaticBuffer value) {
int length = value.length();
StaticArrayBuffer.putInt(data, startPos, length);
if(length > 0){
startPos+=4;
for(int i=0; i<length; i++){
data[startPos++] = value.getByte(i);
}
}
}

@Override
public StaticBuffer read(byte[] data, int startPos) {
int bufSize = StaticArrayBuffer.getInt(data, startPos);
if(bufSize == 0){
return EMPTY_STATIC_BUFFER;
}
startPos+=4;
return new StaticArrayBuffer(data, startPos, startPos+bufSize);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class ImplicitKey extends EmptyRelationType implements SystemRelationType
public static final ImplicitKey TTL = new ImplicitKey(7,Token.makeSystemName("ttl"), Duration.class);

public static final Map<EntryMetaData,ImplicitKey> MetaData2ImplicitKey = Collections.unmodifiableMap(
new HashMap<EntryMetaData, ImplicitKey>(3){{
new HashMap<EntryMetaData, ImplicitKey>(4){{
put(EntryMetaData.TIMESTAMP,TIMESTAMP);
put(EntryMetaData.TTL,TTL);
put(EntryMetaData.VISIBILITY,VISIBILITY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
import io.vavr.Tuple3;
import org.janusgraph.diskstorage.EntryMetaData;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry.GetColVal;

import java.nio.ByteBuffer;

public class CQLColValGetter implements GetColVal<Tuple3<StaticBuffer, StaticBuffer, Row>, StaticBuffer> {

private static final StaticArrayBuffer EMPTY_KEY = StaticArrayBuffer.of(new byte[0]);

private final EntryMetaData[] schema;

CQLColValGetter(final EntryMetaData[] schema) {
Expand Down Expand Up @@ -50,6 +55,9 @@ public Object getMetaData(final Tuple3<StaticBuffer, StaticBuffer, Row> tuple, f
return tuple._3.getLong(CQLKeyColumnValueStore.WRITETIME_COLUMN_NAME);
case TTL:
return tuple._3.getInt(CQLKeyColumnValueStore.TTL_COLUMN_NAME);
case ROW_KEY:
ByteBuffer rawKey = tuple._3.getByteBuffer(CQLKeyColumnValueStore.KEY_COLUMN_NAME);
return rawKey == null ? EMPTY_KEY : StaticArrayBuffer.of(rawKey);
default:
throw new UnsupportedOperationException("Unsupported meta data: " + metaData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,4 +772,37 @@ public interface CQLConfigOptions {
"This option is used only when `"+SLICE_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.",
ConfigOption.Type.MASKABLE,
100);

ConfigOption<Boolean> KEYS_GROUPING_ALLOWED = new ConfigOption<>(
CQL_NS,
"keys-grouping-allowed",
"If `true` this allows multiple same-partition keys for the same queries to be grouped together into a single CQL query via `IN` operator.\n" +
"Notice, that any CQL query grouped with more than 1 key will require to return a row key for any column fetched.\n" +
"If this option if `false` then each key will be executed in a separate asynchronous CQL query even when multiple keys from the same partition are queried.",
ConfigOption.Type.MASKABLE,
true);

ConfigOption<Integer> KEYS_GROUPING_LIMIT = new ConfigOption<>(
CQL_NS,
"keys-grouping-limit",
"Maximum amount of the same-partition keys to be grouped together into a single CQL query. If more keys for the same partition are queried, " +
"they are going to be grouped into separate CQL queries. When this option is set to `0` then this option is treated as unlimited." +
"\nThis option takes effect only when `"+KEYS_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.",
ConfigOption.Type.MASKABLE,
1000);

ConfigOption<Integer> KEYS_GROUPING_PREFERRED_DISTRIBUTION_SIZE = new ConfigOption<>(
CQL_NS,
"keys-grouping-distribution-size",
"Preferred groups distribution amount for keys grouping. This distribution controls preferred amount of same-partition groups to be " +
"used when performing a multi-key query.\n" +
"There will be at least this amount of groups used when keys amount is more or equal to the provided value. Each group will be " +
"no grater than `" + KEYS_GROUPING_LIMIT.toStringWithoutRoot() + "` in size. " +
"This option could be equal to the replication factor so that same-partition CQL queries could be load-balanced to " +
"multiple different nodes serving same partitions. However, a different value could be more optimal in cases " +
"when the underlying CQL cluster is unbalanced or when load balancing strategy can't " +
"ensure optimal CQL queries distribution.\n" +
"If this option is not set or it's value is -1 than the used value is equal to number of processors multiplied by 2.\n" +
"\nThis option takes effect only when `"+KEYS_GROUPING_ALLOWED.toStringWithoutRoot()+"` is `true`.",
ConfigOption.Type.MASKABLE, Integer.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.cql;

import com.datastax.oss.driver.api.core.cql.Row;
import io.vavr.Tuple4;
import org.janusgraph.diskstorage.EntryMetaData;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry;

//TODO: remove it
public class CQLKeyColValGetter implements StaticArrayEntry.GetKeyColVal<Tuple4<StaticBuffer, StaticBuffer, StaticBuffer, Row>, StaticBuffer> {

private final EntryMetaData[] schema;

CQLKeyColValGetter(final EntryMetaData[] schema) {
this.schema = schema;
}

@Override
public StaticBuffer getKey(Tuple4<StaticBuffer, StaticBuffer, StaticBuffer, Row> tuple) {
return tuple._1;
}

@Override
public StaticBuffer getColumn(final Tuple4<StaticBuffer, StaticBuffer, StaticBuffer, Row> tuple) {
return tuple._2;
}

@Override
public StaticBuffer getValue(final Tuple4<StaticBuffer, StaticBuffer, StaticBuffer, Row> tuple) {
return tuple._3;
}

@Override
public EntryMetaData[] getMetaSchema(final Tuple4<StaticBuffer, StaticBuffer, StaticBuffer, Row> tuple) {
return this.schema;
}

@Override
public Object getMetaData(final Tuple4<StaticBuffer, StaticBuffer, StaticBuffer, Row> tuple, final EntryMetaData metaData) {
switch (metaData) {
case TIMESTAMP:
return tuple._4.getLong(CQLKeyColumnValueStore.WRITETIME_COLUMN_NAME);
case TTL:
return tuple._4.getInt(CQLKeyColumnValueStore.TTL_COLUMN_NAME);
default:
throw new UnsupportedOperationException("Unsupported meta data: " + metaData);
}
}
}
Loading

0 comments on commit 3438649

Please sign in to comment.