Skip to content

Commit

Permalink
Add possibility to specify back pressure class, add benchmark, add mo…
Browse files Browse the repository at this point in the history
…re tests

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed May 14, 2023
1 parent 5b5f46f commit 291bcfc
Show file tree
Hide file tree
Showing 14 changed files with 746 additions and 98 deletions.
3 changes: 2 additions & 1 deletion docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ CQL storage backend options
| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| storage.cql.atomic-batch-mutate | True to use Cassandra atomic batch mutation, false to use non-atomic batches | Boolean | false | MASKABLE |
| storage.cql.back-pressure-limit | The maximum number of concurrent requests which are allowed to be processed by CQL driver. Any concurrent CQL requests which are above the provided limit are going to be back pressured using fair Semaphore. If no value is provided or the value is set to `0` then the value will be calculated based on CQL driver session provided parameters by using formula [advanced.connection.max-requests-per-connection * advanced.connection.pool.local.size * available_nodes_amount]. It's not recommended to use any value which is above this limit because it may result in CQL driver overload but it's suggested to have a lower value to keep the driver healthy under pressure. In situations when remote nodes connections are in use then the bigger value might be relevant as well to improve parallelism. In case the value `-1` is provided then the back pressure for CQL requests is turned off. In case the back pressure is turned off then it is advised to tune CQL driver for the ongoing workload. | Integer | (no default value) | MASKABLE |
| storage.cql.back-pressure-class | The implementation of `QueryBackPressure` to use. The full name of the class which extends `QueryBackPressure` which has either a public constructor with `Configuration janusGraphConfiguration` and `Integer backPressureLimit` arguments (preferred constructor) or a public constructor with `Configuration janusGraphConfiguration` argument (second preferred constructor) or a public parameterless constructor. Other accepted options are:<br> `semaphore` - fair semaphore based back pressure implementation of `back-pressure-limit` limit size (preferred implementation);<br> `semaphoreReleaseProtected` - fair semaphore based back pressure implementation of `back-pressure-limit` limit size with protected releasing logic (meant to be used for testing);<br> `passAll` - turned off back pressure (it is recommended to tune CQL driver for the ongoing workload when this implementation is used); | String | semaphore | MASKABLE |
| storage.cql.back-pressure-limit | The maximum number of concurrent requests which are allowed to be processed by CQL driver. If no value is provided or the value is set to `0` then the value will be calculated based on CQL driver session provided parameters by using formula [advanced.connection.max-requests-per-connection * advanced.connection.pool.local.size * available_nodes_amount]. It's not recommended to use any value which is above this limit because it may result in CQL driver overload but it's suggested to have a lower value to keep the driver healthy under pressure. In situations when remote nodes connections are in use then the bigger value might be relevant as well to improve parallelism. | Integer | (no default value) | MASKABLE |
| storage.cql.batch-statement-size | The number of statements in each batch | Integer | 20 | MASKABLE |
| storage.cql.compaction-strategy-class | The compaction strategy to use for JanusGraph tables | String | (no default value) | FIXED |
| storage.cql.compaction-strategy-options | Compaction strategy options. This list is interpreted as a map. It must have an even number of elements in [key,val,key,val,...] form. | String[] | (no default value) | FIXED |
Expand Down
1 change: 1 addition & 0 deletions janusgraph-benchmark/benchmark.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"unit":"ms/op","name":"org.janusgraph.BackPressureBenchmark.releaseBlocked","value":2754.2091194591017}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph;

import org.janusgraph.diskstorage.util.backpressure.SemaphoreQueryBackPressure;
import org.janusgraph.diskstorage.util.backpressure.PassAllQueryBackPressure;
import org.janusgraph.diskstorage.util.backpressure.QueryBackPressure;
import org.janusgraph.diskstorage.util.backpressure.SemaphoreProtectedReleaseQueryBackPressure;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static org.janusgraph.util.system.ExecuteUtil.gracefulExecutorServiceShutdown;

/**
* Benchmark for different implementations of `QueryBackPressure`.
*/
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class BackPressureBenchmark {

/**
* How many parallel threads which will try to acquire and release queries when the backPressure is reached.
*/
@Param({ "2000", "1000", "100", "10", "4", "2", "1" })
int threads;

/**
* `QueryBackPressure` size (ignored for `passAllBackPressure` type).
*/
@Param({ "50000", "10000", "1000", "100" })
int backPressure;

@Param({
"semaphoreReleaseProtectedBackPressureWithReleasesAwait",
"semaphoreReleaseProtectedBackPressureWithoutReleasesAwait",
"semaphoreBackPressure",
"passAllBackPressure"})
String type;

private QueryBackPressure queryBackPressure;
private ExecutorService queriesAcquireService;
private ExecutorService queriesReleaseService;
private boolean closeBackPressure;
private Semaphore acquireJobsSemaphore;

@Setup(Level.Invocation)
public void setup() {
acquireJobsSemaphore = new Semaphore(0);
queriesAcquireService = Executors.newFixedThreadPool(threads);
queriesReleaseService = Executors.newFixedThreadPool(threads);
switch (type){
case "semaphoreReleaseProtectedBackPressureWithReleasesAwait": {
queryBackPressure = new SemaphoreProtectedReleaseQueryBackPressure(backPressure);
closeBackPressure = true;
break;
}
case "semaphoreReleaseProtectedBackPressureWithoutReleasesAwait": {
queryBackPressure = new SemaphoreProtectedReleaseQueryBackPressure(backPressure);
closeBackPressure = false;
break;
}
case "semaphoreBackPressure": {
queryBackPressure = new SemaphoreQueryBackPressure(backPressure);
closeBackPressure = false;
break;
}
case "passAllBackPressure": {
queryBackPressure = new PassAllQueryBackPressure();
closeBackPressure = false;
break;
}
default: throw new IllegalArgumentException("No implementation found to type = "+type);
}

for(int j=0; j<backPressure; j++){
queryBackPressure.acquireBeforeQuery();
acquireJobsSemaphore.release();
}

for(int i = 0; i< threads; i++){
queriesAcquireService.submit(() -> {
queryBackPressure.acquireBeforeQuery();
acquireJobsSemaphore.release();
});
}
}

@Benchmark
public void releaseBlocked() {
for(int i = 0; i< threads; i++){
queriesReleaseService.submit(() -> {
try {
acquireJobsSemaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);

Check warning on line 121 in janusgraph-benchmark/src/main/java/org/janusgraph/BackPressureBenchmark.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-benchmark/src/main/java/org/janusgraph/BackPressureBenchmark.java#L121

Avoid throwing raw exception types.
}
queryBackPressure.releaseAfterQuery();
});
}

gracefulExecutorServiceShutdown(queriesReleaseService, Long.MAX_VALUE);

if(closeBackPressure){
queryBackPressure.close();
}
}

@TearDown(Level.Invocation)
public void clearResources() {
gracefulExecutorServiceShutdown(queriesAcquireService, Long.MAX_VALUE);
queryBackPressure.close();
System.gc();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public static void main(String[] args) throws RunnerException, IOException, Inte
builder.include(".*Benchmark");
builder.exclude("StaticArrayEntryListBenchmark");
builder.exclude("VertexCacheBenchmark");
builder.exclude("BackPressureBenchmark");
builder.exclude("CQL.*Benchmark");
}
Collection<RunResult> results = new Runner(builder.build()).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package org.janusgraph.diskstorage.util.backpressure;

/**
* Query back pressure implementation to pass all acquires and releases
*/
public class PassAllQueryBackPressure implements QueryBackPressure{
@Override
public void acquireBeforeQuery() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.util.backpressure;

import org.janusgraph.core.JanusGraphException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

import static org.janusgraph.util.system.ExecuteUtil.gracefulExecutorServiceShutdown;

/**
* Query back pressure implementation which uses Semaphore to control back pressure and has protection
* in place to not generate more `permits` than `backPressureLimit`.<br>
*
* This implementation is similar to {@link SemaphoreQueryBackPressure } with the exception that `releaseAfterQuery`
* calls are asynchronous (non-blocking) and protected against generating more `permits` than `backPressureLimit`.
* This comes with additional overhead of using a separate thread to process any new `release` calls
* which means that all calls to `releaseAfterQuery` will be processed in sequential order one by one.
* The first time logic registers that an attempt to add a new permit could potentially result in a bigger amount of
* `permits` than `backPressureLimit`, it logs a warning. Subsequent calls to `releaseAfterQuery` will not log such
* warning anymore.
*/
public class SemaphoreProtectedReleaseQueryBackPressure implements QueryBackPressure{

private static final Logger log = LoggerFactory.getLogger(SemaphoreProtectedReleaseQueryBackPressure.class);

private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final Runnable releaseNonBlocking;
private final Semaphore semaphore;
private volatile boolean hadWarningLogged;

public SemaphoreProtectedReleaseQueryBackPressure(final int backPressureLimit) {
this.semaphore = new Semaphore(backPressureLimit, true);
this.releaseNonBlocking = () -> {
// ensure we never add more permits than `backPressureLimit`
// (even if `releaseAfterQuery()` is called more times than `acquireBeforeQuery()`);
if(semaphore.availablePermits()<backPressureLimit){
semaphore.release();
} else if(!hadWarningLogged){
log.warn("`releaseAfterQuery` is called more than once for some of the `acquireBeforeQuery` calls. " +
"This is a sign that the logic using this `QueryBackPressure` may not properly handle special " +
"(potentially exceptional) cases. {} will not trigger more releases than {}. This warning will " +
"be logged only once and it will be ignored for other `releaseAfterQuery` calls which attempt " +
"to add more permits than the configured limit.",
SemaphoreProtectedReleaseQueryBackPressure.class.getSimpleName(), backPressureLimit);
hadWarningLogged = true;
}
};
}

@Override
public void acquireBeforeQuery() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JanusGraphException(e);
}
}

@Override
public void releaseAfterQuery(){
executorService.execute(releaseNonBlocking);
}

@Override
public void close() {
gracefulExecutorServiceShutdown(executorService, Long.MAX_VALUE);
}

int availablePermits(){
return semaphore.availablePermits();
}

boolean hadWarningLogged(){
return hadWarningLogged;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,31 @@

import org.janusgraph.core.JanusGraphException;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

import static org.janusgraph.util.system.ExecuteUtil.gracefulExecutorServiceShutdown;

public class SemaphoreQueryBackPressure implements QueryBackPressure{

private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final Runnable releaseNonBlocking;
private final Semaphore semaphore;
/**
* Query back pressure implementation which uses Semaphore to control back pressure.<br>
*
* Warning: This implementation assumes that for each `acquireBeforeQuery` call there will be exactly
* one `releaseAfterQuery` call. This implementation uses `backPressureLimit` as a starting `permits` amount
* of the Semaphore. Each time `releaseAfterQuery` is called it will add a new `permit` even if the
* current total amount of permits is already grater then `backPressureLimit`.
* In case you assume that the logic where `SemaphoreQueryBackPressure` is used might be affected by
* any bug which may call `releaseAfterQuery` more than once for a single `acquireBeforeQuery` call then
* it's suggested to use {@link SemaphoreProtectedReleaseQueryBackPressure } which has a tiny overhead
* for `releaseAfterQuery` calls but protects those calls from the possible side effects of calling
* `releaseAfterQuery` more than once for any `acquireBeforeQuery` call.
*/
public class SemaphoreQueryBackPressure extends Semaphore implements QueryBackPressure{

public SemaphoreQueryBackPressure(final int backPressureLimit) {
this.semaphore = new Semaphore(backPressureLimit, true);
this.releaseNonBlocking = () -> {
// ensure we never add more permits than `backPressureLimit`
// (even if `releaseAfterQuery()` is called more times than `acquireBeforeQuery()`);
if(semaphore.availablePermits()<backPressureLimit){
semaphore.release();
}
};
super(backPressureLimit, true);
}

@Override
public void acquireBeforeQuery() {
try {
semaphore.acquire();
acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JanusGraphException(e);
Expand All @@ -51,15 +49,11 @@ public void acquireBeforeQuery() {

@Override
public void releaseAfterQuery(){
executorService.execute(releaseNonBlocking);
release();
}

@Override
public void close() {
gracefulExecutorServiceShutdown(executorService, Long.MAX_VALUE);
}

int availablePermits(){
return semaphore.availablePermits();
// ignored
}
}
Loading

0 comments on commit 291bcfc

Please sign in to comment.