-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
CQLStoreManager.java
374 lines (323 loc) · 18.3 KB
/
CQLStoreManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.janusgraph.diskstorage.cql;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import io.vavr.Tuple;
import io.vavr.collection.Array;
import io.vavr.collection.HashMap;
import io.vavr.collection.Iterator;
import io.vavr.collection.Seq;
import io.vavr.concurrent.Future;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.StoreMetaData.Container;
import org.janusgraph.diskstorage.common.DistributedStoreManager;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.cql.builder.CQLExecutorServiceBuilder;
import org.janusgraph.diskstorage.cql.builder.CQLMutateManyFunctionBuilder;
import org.janusgraph.diskstorage.cql.builder.CQLProgrammaticConfigurationLoaderBuilder;
import org.janusgraph.diskstorage.cql.builder.CQLSessionBuilder;
import org.janusgraph.diskstorage.cql.builder.CQLStoreFeaturesBuilder;
import org.janusgraph.diskstorage.cql.builder.CQLStoreFeaturesWrapper;
import org.janusgraph.diskstorage.cql.function.mutate.CQLMutateManyFunction;
import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
import org.janusgraph.diskstorage.keycolumnvalue.KeyRange;
import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure;
import org.janusgraph.diskstorage.util.backpressure.builder.QueryBackPressureBuilder;
import org.janusgraph.hadoop.CqlHadoopStoreManager;
import org.janusgraph.util.datastructures.ExceptionWrapper;
import org.janusgraph.util.stats.MetricManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.truncate;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createKeyspace;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.dropKeyspace;
import static io.vavr.API.$;
import static io.vavr.API.Case;
import static io.vavr.API.Match;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BACK_PRESSURE_CLASS;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.BACK_PRESSURE_LIMIT;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.INIT_WAIT_TIME;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.KEYSPACE;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.NETWORK_TOPOLOGY_REPLICATION_STRATEGY;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.REPLICATION_FACTOR;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.REPLICATION_OPTIONS;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.REPLICATION_STRATEGY;
import static org.janusgraph.diskstorage.cql.CQLConfigOptions.SIMPLE_REPLICATION_STRATEGY;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.BASIC_METRICS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DROP_ON_CLEAR;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.GRAPH_NAME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.METRICS_JMX_ENABLED;
import static org.janusgraph.util.system.ExecuteUtil.executeWithCatching;
import static org.janusgraph.util.system.ExecuteUtil.gracefulExecutorServiceShutdown;
import static org.janusgraph.util.system.ExecuteUtil.throwIfException;
/**
* This class creates see {@link CQLKeyColumnValueStore CQLKeyColumnValueStores} and handles Cassandra-backed allocation of vertex IDs for JanusGraph (when so
* configured).
*/
public class CQLStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager {
private static final Logger LOGGER = LoggerFactory.getLogger(CQLStoreManager.class);
public static final String CONSISTENCY_LOCAL_QUORUM = "LOCAL_QUORUM";
public static final String CONSISTENCY_QUORUM = "QUORUM";
private static final int DEFAULT_PORT = 9042;
protected static final CQLSessionBuilder DEFAULT_CQL_SESSION_BUILDER = new CQLSessionBuilder();
protected static final CQLProgrammaticConfigurationLoaderBuilder DEFAULT_PROGRAMMATIC_CONFIGURATION_LOADER_BUILDER = new CQLProgrammaticConfigurationLoaderBuilder();
protected static final CQLMutateManyFunctionBuilder DEFAULT_MUTATE_MANY_FUNCTION_BUILDER = new CQLMutateManyFunctionBuilder();
protected static final CQLStoreFeaturesBuilder DEFAULT_STORE_FEATURES_BUILDER = new CQLStoreFeaturesBuilder();
private final String keyspace;
final ExecutorService executorService;
private final long threadPoolShutdownMaxWaitTime;
private final CQLMutateManyFunction executeManyFunction;
private CqlSession session;
private final StoreFeatures storeFeatures;
private final Map<String, CQLKeyColumnValueStore> openStores;
private final Deployment deployment;
private final QueryBackPressure queriesBackPressure;
/**
* Constructor for the {@link CQLStoreManager} given a JanusGraph {@link Configuration}.
* @param configuration Graph configuration
* @throws BackendException throws {@link PermanentBackendException} in case CQL connection cannot be initialized or
* CQLStoreManager cannot be initialized
*/
public CQLStoreManager(final Configuration configuration) throws BackendException {
this(configuration, DEFAULT_MUTATE_MANY_FUNCTION_BUILDER, DEFAULT_STORE_FEATURES_BUILDER, DEFAULT_CQL_SESSION_BUILDER, DEFAULT_PROGRAMMATIC_CONFIGURATION_LOADER_BUILDER);
}
/**
* Constructor for the {@link CQLStoreManager} given a JanusGraph {@link Configuration}.
* @param configuration Graph configuration
* @param mutateManyFunctionBuilder Builder for mutate many function with or without executor service
* @param storeFeaturesBuilder Builder for store features function with {@link DistributedStoreManager.Deployment}
* @param sessionBuilder Builder for {@link CqlSession}
* @param baseConfigurationLoaderBuilder Builder for {@link CqlSession} main configuration. It's not guaranteed to be used if it's disabled or if other configuration types are provided with higher priority.
* @throws BackendException throws {@link PermanentBackendException} in case CQL connection cannot be initialized or
* CQLStoreManager cannot be initialized
*/
public CQLStoreManager(final Configuration configuration, final CQLMutateManyFunctionBuilder mutateManyFunctionBuilder,
final CQLStoreFeaturesBuilder storeFeaturesBuilder, CQLSessionBuilder sessionBuilder,
CQLProgrammaticConfigurationLoaderBuilder baseConfigurationLoaderBuilder) throws BackendException {
super(configuration, DEFAULT_PORT);
this.keyspace = determineKeyspaceName(configuration);
this.openStores = new ConcurrentHashMap<>();
this.session = sessionBuilder.build(getStorageConfig(), hostnames, port, connectionTimeoutMS, baseConfigurationLoaderBuilder);
try{
this.threadPoolShutdownMaxWaitTime = configuration.get(EXECUTOR_SERVICE_MAX_SHUTDOWN_WAIT_TIME);
initializeJmxMetrics();
initializeKeyspace();
int backPressureLimit = getBackPressureLimit(configuration, session);
queriesBackPressure = QueryBackPressureBuilder.build(configuration, configuration.get(BACK_PRESSURE_CLASS), backPressureLimit);
this.executeManyFunction = mutateManyFunctionBuilder
.build(session, configuration, times, assignTimestamp, openStores, this::sleepAfterWrite, queriesBackPressure);
CQLStoreFeaturesWrapper storeFeaturesWrapper = storeFeaturesBuilder.build(session, configuration, hostnames);
deployment = storeFeaturesWrapper.getDeployment();
storeFeatures = storeFeaturesWrapper.getStoreFeatures();
this.executorService = CQLExecutorServiceBuilder.buildExecutorService(configuration);
} catch (Throwable throwable){
close();
throw new PermanentBackendException("Couldn't initialize CQLStoreManager", throwable);
}
}
private static int getBackPressureLimit(final Configuration configuration, final CqlSession session){
if(configuration.has(BACK_PRESSURE_LIMIT)){
final int backPressureLimit = configuration.get(BACK_PRESSURE_LIMIT);
if(backPressureLimit != 0){
return backPressureLimit;
}
}
return getDefaultBackPressureLimit(session);
}
static int getDefaultBackPressureLimit(final CqlSession session){
DriverExecutionProfile profile = session.getContext().getConfig().getDefaultProfile();
int connectionMaxRequests = profile.getInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS);
int connectionPoolLocalSize = profile.getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE);
int currentAvailableNodesAmount = session.getMetadata().getNodes().size();
try{
return Math.multiplyExact(currentAvailableNodesAmount, Math.multiplyExact(connectionMaxRequests, connectionPoolLocalSize));
} catch (ArithmeticException overflowException){
LOGGER.warn("Default back pressure limit calculation is overflowed via the formula " +
"["+DefaultDriverOption.CONNECTION_MAX_REQUESTS.getPath()+"("+connectionMaxRequests+") * "+
DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE.getPath()+"("+connectionPoolLocalSize+
") * currentAvailableNodesAmount("+currentAvailableNodesAmount+")]. " +
"Thus, the default back pressure limit is going to be "+Integer.MAX_VALUE+". " +
"It is possible to disable back pressure using configuration parameter `"+
BACK_PRESSURE_LIMIT.toStringWithoutRoot()+"=-1`.");
return Integer.MAX_VALUE;
}
}
private void initializeJmxMetrics() {
final Configuration configuration = getStorageConfig();
if (configuration.get(METRICS_JMX_ENABLED) && configuration.get(BASIC_METRICS) && session.getMetrics().isPresent()) {
MetricManager.INSTANCE.getRegistry().registerAll(session.getMetrics().get().getRegistry());
}
}
private void clearJmxMetrics() {
final Configuration configuration = getStorageConfig();
if (configuration.get(METRICS_JMX_ENABLED) && configuration.get(BASIC_METRICS) && session.getMetrics().isPresent()) {
session.getMetrics().get().getRegistry().getNames().forEach(metricName -> MetricManager.INSTANCE.getRegistry().remove(metricName));
}
}
void initializeKeyspace(){
// if the keyspace already exists, just return
if (this.session.getMetadata().getKeyspace(this.keyspace).isPresent()) {
return;
}
final Configuration configuration = getStorageConfig();
// Setting replication strategy based on value reading from the configuration: either "SimpleStrategy" or "NetworkTopologyStrategy"
final Map<String, Object> replication = Match(configuration.get(REPLICATION_STRATEGY)).of(
Case($(SIMPLE_REPLICATION_STRATEGY), strategy -> HashMap.<String, Object> of("class", strategy, "replication_factor", configuration.get(REPLICATION_FACTOR))),
Case($(NETWORK_TOPOLOGY_REPLICATION_STRATEGY),
strategy -> HashMap.<String, Object> of("class", strategy)
.merge(Array.of(configuration.get(REPLICATION_OPTIONS))
.grouped(2)
.toMap(array -> Tuple.of(array.get(0), Integer.parseInt(array.get(1)))))))
.toJavaMap();
session.execute(createKeyspace(this.keyspace)
.ifNotExists()
.withReplicationOptions(replication)
.build());
if (configuration.has(INIT_WAIT_TIME) && configuration.get(INIT_WAIT_TIME) > 0) {
try {
Thread.sleep(configuration.get(INIT_WAIT_TIME));
} catch (InterruptedException e) {
throw new JanusGraphException("Interrupted while waiting for keyspace initialization to complete", e);
}
}
}
public ExecutorService getExecutorService() {
return executorService;
}
public CqlSession getSession() {
return this.session;
}
public String getKeyspaceName() {
return this.keyspace;
}
@VisibleForTesting
Map<String, String> getCompressionOptions(final String name) throws BackendException {
TableMetadata tableMetadata = getTableMetadata(name);
Object compressionOptions = tableMetadata.getOptions().get(CqlIdentifier.fromCql("compression"));
return (Map<String, String>) compressionOptions;
}
@VisibleForTesting
Integer getGcGraceSeconds(final String name) throws BackendException {
TableMetadata tableMetadata = getTableMetadata(name);
Object gcGraceSeconds = tableMetadata.getOptions().get(CqlIdentifier.fromCql("gc_grace_seconds"));
return (Integer) gcGraceSeconds;
}
@VisibleForTesting
String getSpeculativeRetry(final String name) throws BackendException {
TableMetadata tableMetadata = getTableMetadata(name);
Object res = tableMetadata.getOptions().get(CqlIdentifier.fromCql("speculative_retry"));
return (String) res;
}
@VisibleForTesting
TableMetadata getTableMetadata(final String name) throws BackendException {
final KeyspaceMetadata keyspaceMetadata = (this.session.getMetadata().getKeyspace(this.keyspace))
.orElseThrow(() -> new PermanentBackendException(String.format("Unknown keyspace '%s'", this.keyspace)));
return keyspaceMetadata.getTable(name)
.orElseThrow(() -> new PermanentBackendException(String.format("Unknown table '%s'", name)));
}
@Override
public void close() throws BackendException {
try {
ExceptionWrapper exceptionWrapper = new ExceptionWrapper();
executeWithCatching(this::clearJmxMetrics, exceptionWrapper);
executeWithCatching(session::close, exceptionWrapper);
if (queriesBackPressure != null) executeWithCatching(queriesBackPressure::close, exceptionWrapper);
throwIfException(exceptionWrapper);
} finally {
gracefulExecutorServiceShutdown(executorService, threadPoolShutdownMaxWaitTime);
}
}
@Override
public String getName() {
return String.format("%s.%s", getClass().getSimpleName(), this.keyspace);
}
@Override
public Deployment getDeployment() {
return this.deployment;
}
@Override
public StoreFeatures getFeatures() {
return this.storeFeatures;
}
@Override
public KeyColumnValueStore openDatabase(final String name, final Container metaData) throws BackendException {
return this.openStores.computeIfAbsent(name, n -> new CQLKeyColumnValueStore(this, n, getStorageConfig(), () -> this.openStores.remove(n)));
}
@Override
public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException {
return new CQLTransaction(config);
}
@Override
public void clearStorage() throws BackendException {
if (this.storageConfig.get(DROP_ON_CLEAR)) {
this.session.execute(dropKeyspace(this.keyspace).build());
} else if (this.exists()) {
final Future<Seq<AsyncResultSet>> result = Future.sequence(
Iterator.ofAll(this.session.getMetadata().getKeyspace(this.keyspace).get().getTables().values())
.map(table -> Future.fromJavaFuture(this.session.executeAsync(truncate(this.keyspace, table.getName().toString()).build())
.toCompletableFuture())));
result.await();
} else {
LOGGER.info("Keyspace {} does not exist in the cluster", this.keyspace);
}
}
@Override
public boolean exists() throws BackendException {
return session.getMetadata().getKeyspace(this.keyspace).isPresent();
}
@Override
public List<KeyRange> getLocalKeyPartition() throws BackendException {
throw new UnsupportedOperationException();
}
@Override
public void mutateMany(final Map<String, Map<StaticBuffer, KCVMutation>> mutations, final StoreTransaction txh) throws BackendException {
try{
executeManyFunction.mutateMany(mutations, txh);
} catch (Throwable e){
throw CQLKeyColumnValueStore.EXCEPTION_MAPPER.apply(e);
}
}
public static String determineKeyspaceName(Configuration config) {
return !config.has(KEYSPACE) && config.has(GRAPH_NAME)
? config.get(GRAPH_NAME)
: config.get(KEYSPACE);
}
@Override
public Object getHadoopManager() {
return new CqlHadoopStoreManager(this.session);
}
public QueryBackPressure getQueriesBackPressure() {
return queriesBackPressure;
}
}