Skip to content

Commit

Permalink
fix: handle BerkeleyJE DB interruption [tp-tests]
Browse files Browse the repository at this point in the history
Co-authored-by: Pavel Ershov <owner.mad.epa@gmail.com>

Signed-off-by: Tiến Nguyễn Khắc <tien.nguyenkhac@icloud.com>
  • Loading branch information
tien committed May 3, 2024
1 parent 1c53402 commit 68976e5
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
Expand Down Expand Up @@ -60,10 +62,10 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final Database db;
private volatile Database db;
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;
private volatile boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db = data;
Expand Down Expand Up @@ -100,6 +102,10 @@ private static void closeCursor(StoreTransaction txh, Cursor cursor) {
((BerkeleyJETx) txh).closeCursor(cursor);
}

public void reopen(final Database db) {
this.db = db;
}

@Override
public synchronized void close() throws BackendException {
try {
Expand Down Expand Up @@ -181,11 +187,16 @@ private KeyValueEntry getNextEntry() {
return null;
}
while (!selector.reachedLimit()) {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
try {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);

Check warning on line 197 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L196-L197

Added lines #L196 - L197 were not covered by tests
}

if (status != OperationStatus.SUCCESS) {
break;
}
Expand Down Expand Up @@ -237,13 +248,17 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
try {
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);

Check warning on line 257 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L257

Added line #L257 was not covered by tests
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);

Check warning on line 261 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L260-L261

Added lines #L260 - L261 were not covered by tests
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -261,6 +276,8 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);

Check warning on line 280 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L279-L280

Added lines #L279 - L280 were not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
Expand All @@ -48,9 +49,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -88,19 +90,16 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered
ConfigOption.Type.MASKABLE, String.class,
IsolationLevel.REPEATABLE_READ.toString(), disallowEmpty(String.class));

private final Map<String, BerkeleyJEKeyValueStore> stores;
private final ConcurrentMap<String, BerkeleyJEKeyValueStore> stores;

protected Environment environment;
protected volatile Environment environment;
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();
stores = new ConcurrentHashMap<>();

int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);
initialize();

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -111,14 +110,24 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.scanTxConfig(GraphDatabaseConfiguration.buildGraphConfiguration()
.set(ISOLATION_LEVEL, IsolationLevel.READ_UNCOMMITTED.toString())
)
.supportsInterruption(false)
.supportsInterruption(true)
.cellTTL(true)
.optimisticLocking(false)
.build();
}

private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
private synchronized void initialize() throws BackendException {
try {
if (environment != null && environment.isValid()) {
return;

Check warning on line 122 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L122

Added line #L122 was not covered by tests
}

close(true);

int cachePercent = storageConfig.get(JVM_CACHE);
boolean sharedCache = storageConfig.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(storageConfig.get(CACHE_MODE), CacheMode.class);

EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(transactional);
Expand All @@ -131,9 +140,13 @@ private void initialize(int cachePercent, final boolean sharedCache, final Cache
envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
}

//Open the environment
// Open the environment
environment = new Environment(directory, envConfig);

// Reopen any existing DB connections
for (String storeName : stores.keySet()) {
openDatabase(storeName, true);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
}
Expand All @@ -150,8 +163,7 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {
throw new UnsupportedOperationException();
}

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
private BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean retryEnvironmentFailure) throws BackendException {
try {
Transaction tx = null;

Expand Down Expand Up @@ -182,15 +194,27 @@ public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws B
}

return btx;
} catch (EnvironmentFailureException e) {
initialize();

if (retryEnvironmentFailure) {
return beginTransaction(txCfg, false);
}

throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);

Check warning on line 204 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L204

Added line #L204 was not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
return beginTransaction(txCfg, true);
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force, boolean retryEnvironmentFailure) throws BackendException {
Preconditions.checkNotNull(name);
if (stores.containsKey(name)) {
if (stores.containsKey(name) && !force) {
return stores.get(name);
}
try {
Expand All @@ -209,13 +233,34 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
log.debug("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
if (stores.containsKey(name)) {
stores.get(name).reopen(db);
} else {
stores.put(name, store);
}
return store;
} catch (EnvironmentFailureException e) {
initialize();

Check warning on line 243 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L242-L243

Added lines #L242 - L243 were not covered by tests

if (retryEnvironmentFailure) {
return openDatabase(name, force, false);

Check warning on line 246 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L246

Added line #L246 was not covered by tests
}

throw new PermanentBackendException("Could not open BerkeleyJE data store", e);

Check warning on line 249 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L249

Added line #L249 was not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not open BerkeleyJE data store", e);
}
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force) throws BackendException {
return openDatabase(name, force, true);
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
return openDatabase(name, false, true);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -252,11 +297,9 @@ void removeDatabase(BerkeleyJEKeyValueStore db) {
log.debug("Removed database {}", name);
}


@Override
public void close() throws BackendException {
public void close(boolean force) throws BackendException {
if (environment != null) {
if (!stores.isEmpty())
if (!force && !stores.isEmpty())
throw new IllegalStateException("Cannot shutdown manager since some databases are still open");
try {
// TODO this looks like a race condition
Expand All @@ -274,6 +317,11 @@ public void close() throws BackendException {

}

@Override
public void close() throws BackendException {
close(false);
}

private static final Transaction NULL_TRANSACTION = null;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
Expand Down Expand Up @@ -60,16 +62,26 @@ Cursor openCursor(Database db) throws BackendException {
if (!isOpen) {
throw new PermanentBackendException("Transaction already closed");
}
Cursor cursor = db.openCursor(tx, null);
openCursors.add(cursor);
return cursor;

try {
Cursor cursor = db.openCursor(tx, null);
openCursors.add(cursor);
return cursor;
} catch (ThreadInterruptedException e) {
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);

Check warning on line 71 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java#L70-L71

Added lines #L70 - L71 were not covered by tests
}
}
}

void closeCursor(Cursor cursor) {
synchronized (openCursors) {
cursor.close();
openCursors.remove(cursor);
try {
cursor.close();
} catch (ThreadInterruptedException e) {
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);

Check warning on line 81 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java#L80-L81

Added lines #L80 - L81 were not covered by tests
} finally {
openCursors.remove(cursor);
}
}
}

Expand Down Expand Up @@ -98,6 +110,13 @@ public synchronized void rollback() throws BackendException {
closeOpenCursors();
tx.abort();
tx = null;
} catch (ThreadInterruptedException e) {
// Ignore to avoid issues when backend was interrupted
} catch (IllegalStateException e) {

Check warning on line 115 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java#L115

Added line #L115 was not covered by tests
// Ignore to avoid issues when backend was closed
if (!e.getMessage().equals("Database was closed.") && !e.getMessage().equals("Environment is closed.")) {
throw e;

Check warning on line 118 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java#L118

Added line #L118 was not covered by tests
}
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -114,6 +133,8 @@ public synchronized void commit() throws BackendException {
closeOpenCursors();
tx.commit();
tx = null;
} catch (ThreadInterruptedException e) {
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);

Check warning on line 137 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java#L136-L137

Added lines #L136 - L137 were not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2022 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.janusgraph;

import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class BerkeleyInterruptionTest {

@Test
public void interruptedEnvironmentShouldBeRestarted(@TempDir File dir) {
try (JanusGraph graph = JanusGraphFactory.open("berkeleyje:" + dir.getAbsolutePath())) {
assertThrows(JanusGraphException.class, () -> {
Transaction tx = graph.tx();
GraphTraversalSource gtx = tx.begin();

gtx.addV().iterate();

Thread.currentThread().interrupt();
tx.commit();
});

assertDoesNotThrow(() -> {
graph.traversal().addV().iterate();
});

assertEquals(1, graph.traversal().V().count().next());
}
}
}

0 comments on commit 68976e5

Please sign in to comment.