Skip to content

Commit

Permalink
Revert "Handle interruption for BerkeleyJE backend [tp-tests]"
Browse files Browse the repository at this point in the history
This reverts commit cdea0d7.

Signed-off-by: Boxuan Li <liboxuan@connect.hku.hk>
  • Loading branch information
li-boxuan committed Oct 7, 2023
1 parent cdea0d7 commit bef739e
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.janusgraph.diskstorage.BackendException;
Expand All @@ -49,11 +48,8 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager.convertThreadInterruptedException;

public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {

private static final Logger log = LoggerFactory.getLogger(BerkeleyJEKeyValueStore.class);
Expand All @@ -64,21 +60,21 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final AtomicReference<Database> db = new AtomicReference<>();
private final Database db;
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db.set(data);
db = data;
name = n;
manager = m;
isOpen = true;
}

public DatabaseConfig getConfiguration() throws BackendException {
try {
return db.get().getConfig();
return db.getConfig();
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -96,24 +92,18 @@ private static Transaction getTransaction(StoreTransaction txh) {

private Cursor openCursor(StoreTransaction txh) throws BackendException {
Preconditions.checkArgument(txh!=null);
return ((BerkeleyJETx) txh).openCursor(db.get());
return ((BerkeleyJETx) txh).openCursor(db);
}

private static void closeCursor(StoreTransaction txh, Cursor cursor) {
Preconditions.checkArgument(txh!=null);
((BerkeleyJETx) txh).closeCursor(cursor);
}

void reopen(final Database db) {
this.db.set(db);
}

@Override
public synchronized void close() throws BackendException {
try {
if (isOpen) db.get().close();
} catch (ThreadInterruptedException ignored) {
// environment will be closed
if(isOpen) db.close();
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -130,7 +120,7 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx

log.trace("db={}, op=get, tx={}", name, txh);

OperationResult result = db.get().get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));
OperationResult result = db.get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));

if (result != null) {
return getBuffer(data);
Expand Down Expand Up @@ -163,7 +153,6 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
final DatabaseEntry foundKey = keyStart.as(ENTRY_FACTORY);
final DatabaseEntry foundData = new DatabaseEntry();
final Cursor cursor = openCursor(txh);
final ReadOptions readOptions = getReadOptions(txh);

return new RecordIterator<KeyValueEntry>() {
private OperationStatus status;
Expand Down Expand Up @@ -193,9 +182,9 @@ private KeyValueEntry getNextEntry() {
}
while (!selector.reachedLimit()) {
if (status == null) {
status = get(Get.SEARCH_GTE, readOptions);
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = get(Get.NEXT, readOptions);
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
}
if (status != OperationStatus.SUCCESS) {
break;
Expand All @@ -214,16 +203,6 @@ private KeyValueEntry getNextEntry() {
return null;
}

private OperationStatus get(Get get, ReadOptions readOptions) {
try {
return cursor.get(foundKey, foundData, get, readOptions) == null
? OperationStatus.NOTFOUND
: OperationStatus.SUCCESS;
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}
}

@Override
public void close() {
closeCursor(txh, cursor);
Expand Down Expand Up @@ -258,17 +237,13 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
try {
if (allowOverwrite) {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -282,12 +257,10 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
Transaction tx = getTransaction(txh);
try {
log.trace("db={}, op=delete, tx={}", name, txh);
OperationStatus status = db.get().delete(tx, key.as(ENTRY_FACTORY));
OperationStatus status = db.delete(tx, key.as(ENTRY_FACTORY));
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.berkeleyje;


import com.google.common.base.Preconditions;
import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
Expand All @@ -22,10 +23,8 @@
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
Expand All @@ -52,7 +51,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -92,14 +90,17 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered

private final Map<String, BerkeleyJEKeyValueStore> stores;

protected AtomicReference<Environment> environment = new AtomicReference<>();
protected Environment environment;
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();

initialize(configuration);
int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -116,10 +117,7 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.build();
}

private void initialize(Configuration configuration) throws BackendException {
int cachePercent = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
try {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
Expand All @@ -134,7 +132,7 @@ private void initialize(Configuration configuration) throws BackendException {
}

//Open the environment
environment.set(new Environment(directory, envConfig));
environment = new Environment(directory, envConfig);

} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
Expand All @@ -154,56 +152,39 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
boolean interrupted = false;
do {
try {
Transaction tx = null;
try {
Transaction tx = null;

Configuration effectiveCfg =
Configuration effectiveCfg =
new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig());

if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.get().beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
}
if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
}
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}

return btx;
} catch (ThreadInterruptedException e) {
log.error("BerkeleyJE backend is interrupted! Try to recreate environment", e);
environment.get().close();
initialize(storageConfig);
for (Map.Entry<String, BerkeleyJEKeyValueStore> entry : stores.entrySet()) {
final String name = entry.getKey();
final BerkeleyJEKeyValueStore store = entry.getValue();
store.reopen(openDb(name));
}
if (!interrupted) {
interrupted = true;
} else {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} while (true);
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}

return btx;
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
}

@Override
Expand All @@ -213,8 +194,19 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
return stores.get(name);
}
try {
Database db = openDb(name);
log.trace("Opened database {}", name);
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

Database db = environment.openDatabase(null, name, dbConfig);

log.debug("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
Expand All @@ -224,20 +216,6 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
}
}

private Database openDb(String name) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

return environment.get().openDatabase(null, name, dbConfig);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -288,7 +266,7 @@ public void close() throws BackendException {
//Ignore
}
try {
environment.get().close();
environment.close();
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not close BerkeleyJE database", e);
}
Expand All @@ -304,8 +282,8 @@ public void clearStorage() throws BackendException {
throw new IllegalStateException("Cannot delete store, since database is open: " + stores.keySet());
}

for (final String db : environment.get().getDatabaseNames()) {
environment.get().removeDatabase(NULL_TRANSACTION, db);
for (final String db : environment.getDatabaseNames()) {
environment.removeDatabase(NULL_TRANSACTION, db);
log.debug("Removed database {} (clearStorage)", db);
}
close();
Expand All @@ -314,7 +292,7 @@ public void clearStorage() throws BackendException {

@Override
public boolean exists() throws BackendException {
return !environment.get().getDatabaseNames().isEmpty();
return !environment.getDatabaseNames().isEmpty();
}

@Override
Expand Down Expand Up @@ -357,10 +335,4 @@ private TransactionBegin(String msg) {
super(msg);
}
}

static TraversalInterruptedException convertThreadInterruptedException(final ThreadInterruptedException e) {
final TraversalInterruptedException ex = new TraversalInterruptedException();
ex.initCause(e);
return ex;
}
}
Loading

1 comment on commit bef739e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: bef739e Previous: 68f49a1 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 14927.050626726417 ms/op 14750.912757292574 ms/op 1.01
org.janusgraph.GraphCentricQueryBenchmark.getVertices 1347.8267181504064 ms/op 1346.1948471823591 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 219.61766866956518 ms/op 221.0470166869565 ms/op 0.99
org.janusgraph.MgmtOlapJobBenchmark.runReindex 467.7944938136364 ms/op 463.2053476121212 ms/op 1.01
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 442.92279841359596 ms/op 473.90261975756744 ms/op 0.93
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 8122.791841978639 ms/op 8776.908210320456 ms/op 0.93
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 28353.030081150788 ms/op 29926.15626700794 ms/op 0.95
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 29206.459477536668 ms/op 32526.844515654997 ms/op 0.90
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 57942.172594933334 ms/op 59550.41166966667 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 14376.526274325874 ms/op 15006.601503888596 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 583.9551961363666 ms/op 610.4456304918209 ms/op 0.96
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 7992.781591453918 ms/op 8351.21971247489 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getNames 13913.942935956797 ms/op 14737.422644903432 ms/op 0.94
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 10198.03170366322 ms/op 10903.708326936881 ms/op 0.94
org.janusgraph.CQLMultiQueryBenchmark.getLabels 12480.611843284973 ms/op 13305.68422999396 ms/op 0.94
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 653.5421063555981 ms/op 672.9530191288869 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 20557.118673283636 ms/op 21151.69997790227 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 545.4782293646509 ms/op 566.2220666199714 ms/op 0.96
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 25965.150806526668 ms/op 25822.55861829052 ms/op 1.01
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 386.6965789145318 ms/op 411.4887692411499 ms/op 0.94
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 29274.286155585265 ms/op 29794.881563733332 ms/op 0.98
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 13916.855363086077 ms/op 14451.119747276667 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 14975.576886961428 ms/op 16086.213028835715 ms/op 0.93
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 14572.852567779657 ms/op 14981.480702516508 ms/op 0.97

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.