Skip to content

Commit

Permalink
Handle interruption for BerkeleyJE backend [tp-tests]
Browse files Browse the repository at this point in the history
Fixes #2120

Signed-off-by: Pavel Ershov <owner.mad.epa@gmail.com>
  • Loading branch information
mad authored and li-boxuan committed Oct 6, 2023
1 parent f23abbe commit cdea0d7
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.janusgraph.diskstorage.BackendException;
Expand All @@ -48,8 +49,11 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager.convertThreadInterruptedException;

public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {

private static final Logger log = LoggerFactory.getLogger(BerkeleyJEKeyValueStore.class);
Expand All @@ -60,21 +64,21 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final Database db;
private final AtomicReference<Database> db = new AtomicReference<>();
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db = data;
db.set(data);
name = n;
manager = m;
isOpen = true;
}

public DatabaseConfig getConfiguration() throws BackendException {
try {
return db.getConfig();
return db.get().getConfig();
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -92,18 +96,24 @@ private static Transaction getTransaction(StoreTransaction txh) {

private Cursor openCursor(StoreTransaction txh) throws BackendException {
Preconditions.checkArgument(txh!=null);
return ((BerkeleyJETx) txh).openCursor(db);
return ((BerkeleyJETx) txh).openCursor(db.get());
}

private static void closeCursor(StoreTransaction txh, Cursor cursor) {
Preconditions.checkArgument(txh!=null);
((BerkeleyJETx) txh).closeCursor(cursor);
}

void reopen(final Database db) {
this.db.set(db);
}

@Override
public synchronized void close() throws BackendException {
try {
if(isOpen) db.close();
if (isOpen) db.get().close();
} catch (ThreadInterruptedException ignored) {
// environment will be closed
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -120,7 +130,7 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx

log.trace("db={}, op=get, tx={}", name, txh);

OperationResult result = db.get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));
OperationResult result = db.get().get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));

if (result != null) {
return getBuffer(data);
Expand Down Expand Up @@ -153,6 +163,7 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
final DatabaseEntry foundKey = keyStart.as(ENTRY_FACTORY);
final DatabaseEntry foundData = new DatabaseEntry();
final Cursor cursor = openCursor(txh);
final ReadOptions readOptions = getReadOptions(txh);

return new RecordIterator<KeyValueEntry>() {
private OperationStatus status;
Expand Down Expand Up @@ -182,9 +193,9 @@ private KeyValueEntry getNextEntry() {
}
while (!selector.reachedLimit()) {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
status = get(Get.SEARCH_GTE, readOptions);
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
status = get(Get.NEXT, readOptions);
}
if (status != OperationStatus.SUCCESS) {
break;
Expand All @@ -203,6 +214,16 @@ private KeyValueEntry getNextEntry() {
return null;
}

private OperationStatus get(Get get, ReadOptions readOptions) {
try {
return cursor.get(foundKey, foundData, get, readOptions) == null
? OperationStatus.NOTFOUND
: OperationStatus.SUCCESS;
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}
}

@Override
public void close() {
closeCursor(txh, cursor);
Expand Down Expand Up @@ -237,13 +258,17 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
try {
if (allowOverwrite) {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -257,10 +282,12 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
Transaction tx = getTransaction(txh);
try {
log.trace("db={}, op=delete, tx={}", name, txh);
OperationStatus status = db.delete(tx, key.as(ENTRY_FACTORY));
OperationStatus status = db.get().delete(tx, key.as(ENTRY_FACTORY));
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.janusgraph.diskstorage.berkeleyje;


import com.google.common.base.Preconditions;
import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
Expand All @@ -23,8 +22,10 @@
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
Expand All @@ -51,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -90,17 +92,14 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered

private final Map<String, BerkeleyJEKeyValueStore> stores;

protected Environment environment;
protected AtomicReference<Environment> environment = new AtomicReference<>();
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();

int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);
initialize(configuration);

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -117,7 +116,10 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.build();
}

private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
private void initialize(Configuration configuration) throws BackendException {
int cachePercent = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
try {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
Expand All @@ -132,7 +134,7 @@ private void initialize(int cachePercent, final boolean sharedCache, final Cache
}

//Open the environment
environment = new Environment(directory, envConfig);
environment.set(new Environment(directory, envConfig));

} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
Expand All @@ -152,39 +154,56 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
try {
Transaction tx = null;
boolean interrupted = false;
do {
try {
Transaction tx = null;

Configuration effectiveCfg =
Configuration effectiveCfg =
new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig());

if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.get().beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
}
}
}
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}

return btx;
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
return btx;
} catch (ThreadInterruptedException e) {
log.error("BerkeleyJE backend is interrupted! Try to recreate environment", e);
environment.get().close();
initialize(storageConfig);
for (Map.Entry<String, BerkeleyJEKeyValueStore> entry : stores.entrySet()) {
final String name = entry.getKey();
final BerkeleyJEKeyValueStore store = entry.getValue();
store.reopen(openDb(name));
}
if (!interrupted) {
interrupted = true;
} else {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} while (true);
}

@Override
Expand All @@ -194,19 +213,8 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
return stores.get(name);
}
try {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

Database db = environment.openDatabase(null, name, dbConfig);

log.debug("Opened database {}", name);
Database db = openDb(name);
log.trace("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
Expand All @@ -216,6 +224,20 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
}
}

private Database openDb(String name) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

return environment.get().openDatabase(null, name, dbConfig);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -266,7 +288,7 @@ public void close() throws BackendException {
//Ignore
}
try {
environment.close();
environment.get().close();
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not close BerkeleyJE database", e);
}
Expand All @@ -282,8 +304,8 @@ public void clearStorage() throws BackendException {
throw new IllegalStateException("Cannot delete store, since database is open: " + stores.keySet());
}

for (final String db : environment.getDatabaseNames()) {
environment.removeDatabase(NULL_TRANSACTION, db);
for (final String db : environment.get().getDatabaseNames()) {
environment.get().removeDatabase(NULL_TRANSACTION, db);
log.debug("Removed database {} (clearStorage)", db);
}
close();
Expand All @@ -292,7 +314,7 @@ public void clearStorage() throws BackendException {

@Override
public boolean exists() throws BackendException {
return !environment.getDatabaseNames().isEmpty();
return !environment.get().getDatabaseNames().isEmpty();
}

@Override
Expand Down Expand Up @@ -335,4 +357,10 @@ private TransactionBegin(String msg) {
super(msg);
}
}

static TraversalInterruptedException convertThreadInterruptedException(final ThreadInterruptedException e) {
final TraversalInterruptedException ex = new TraversalInterruptedException();
ex.initCause(e);
return ex;
}
}
Loading

1 comment on commit cdea0d7

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: cdea0d7 Previous: 68f49a1 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 18136.403304074436 ms/op 14750.912757292574 ms/op 1.23
org.janusgraph.GraphCentricQueryBenchmark.getVertices 1632.4888138782544 ms/op 1346.1948471823591 ms/op 1.21
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 223.13205506956518 ms/op 221.0470166869565 ms/op 1.01
org.janusgraph.MgmtOlapJobBenchmark.runReindex 518.043557269697 ms/op 463.2053476121212 ms/op 1.12
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 525.1134952189349 ms/op 473.90261975756744 ms/op 1.11
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 11983.315509123086 ms/op 8776.908210320456 ms/op 1.37
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 40901.21219325143 ms/op 29926.15626700794 ms/op 1.37
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 41128.595248785714 ms/op 32526.844515654997 ms/op 1.26
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 83294.91181863332 ms/op 59550.41166966667 ms/op 1.40
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 20606.768417814874 ms/op 15006.601503888596 ms/op 1.37
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 737.0977162164081 ms/op 610.4456304918209 ms/op 1.21
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 10988.653636041785 ms/op 8351.21971247489 ms/op 1.32
org.janusgraph.CQLMultiQueryBenchmark.getNames 20115.314952276112 ms/op 14737.422644903432 ms/op 1.36
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14173.940602120054 ms/op 10903.708326936881 ms/op 1.30
org.janusgraph.CQLMultiQueryBenchmark.getLabels 17336.388793878683 ms/op 13305.68422999396 ms/op 1.30
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 830.4330184538484 ms/op 672.9530191288869 ms/op 1.23
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 28553.117843711905 ms/op 21151.69997790227 ms/op 1.35
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 731.3033683791041 ms/op 566.2220666199714 ms/op 1.29
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 34180.69097613428 ms/op 25822.55861829052 ms/op 1.32
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 498.10206398957234 ms/op 411.4887692411499 ms/op 1.21
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 38788.181389508085 ms/op 29794.881563733332 ms/op 1.30
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 20314.746245311442 ms/op 14451.119747276667 ms/op 1.41
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 21359.775590334546 ms/op 16086.213028835715 ms/op 1.33
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 20102.574859667857 ms/op 14981.480702516508 ms/op 1.34

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.