Skip to content

Commit

Permalink
fix cross-close the underlying rocksdb session pool (apache#598)
Browse files Browse the repository at this point in the history
fix: apache#597
Change-Id: I8b185cd7f81a9a04bc6fd971490ae887fd4ddbb5
  • Loading branch information
javeme authored and zhoney committed Jul 9, 2019
1 parent 011ac85 commit 9a4c543
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 28 deletions.
Expand Up @@ -465,6 +465,11 @@ public Transaction tx() {

@Override
public void close() throws HugeException {
if (this.closed()) {
return;
}

LOG.info("Close graph {}", this);
this.taskManager.closeScheduler(this);
try {
this.closeTx();
Expand Down
Expand Up @@ -41,6 +41,9 @@ public RocksDBSessions(HugeConfig config, String database, String store) {

public abstract String property(String property);

public abstract RocksDBSessions copy(HugeConfig config,
String database, String store);

@Override
public abstract Session session();

Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
Expand Down Expand Up @@ -63,13 +64,14 @@

public class RocksDBStdSessions extends RocksDBSessions {

private final Map<String, ColumnFamilyHandle> cfs = new HashMap<>();

private final RocksDB rocksdb;
private final SstFileManager sstFileManager;

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
private final Map<String, ColumnFamilyHandle> cfs;
private final AtomicInteger refCount;

public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath)
throws RocksDBException {
super(config, database, store);

Expand All @@ -86,10 +88,13 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
* Don't merge old CFs, we expect a clear DB when using this one
*/
this.rocksdb = RocksDB.open(options, dataPath);

this.cfs = new HashMap<>();
this.refCount = new AtomicInteger(1);
}

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store,
public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath,
List<String> cfNames) throws RocksDBException {
super(config, database, store);

Expand Down Expand Up @@ -121,13 +126,28 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
"Expect same size of cf-handles and cf-names");

// Collect CF Handles
this.cfs = new HashMap<>();
for (int i = 0; i < cfs.size(); i++) {
this.cfs.put(cfs.get(i), cfhs.get(i));
}

this.refCount = new AtomicInteger(1);

ingestExternalFile();
}

private RocksDBStdSessions(HugeConfig config, String database, String store,
RocksDBStdSessions origin) {
super(config, database, store);

this.rocksdb = origin.rocksdb;
this.sstFileManager = origin.sstFileManager;
this.cfs = origin.cfs;
this.refCount = origin.refCount;

this.refCount.incrementAndGet();
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -182,6 +202,12 @@ public String property(String property) {
}
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBStdSessions(config, database, store, this);
}

@Override
public final Session session() {
return (Session) super.getOrNewSession();
Expand All @@ -198,6 +224,11 @@ protected final Session newSession() {
protected synchronized void doClose() {
this.checkValid();

if (this.refCount.decrementAndGet() > 0) {
return;
}
assert this.refCount.get() == 0;

for (ColumnFamilyHandle cf : this.cfs.values()) {
cf.close();
}
Expand Down
Expand Up @@ -92,7 +92,7 @@ public RocksDBStore(final BackendStoreProvider provider,
private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
List<RocksDBSessions> dbs = new ArrayList<>();
dbs.add(sessions);
dbs.add(this.sessions);
dbs.addAll(tableDBMapping().values());

RocksDBMetrics metrics = new RocksDBMetrics(dbs, session);
Expand Down Expand Up @@ -157,7 +157,8 @@ public synchronized void open(HugeConfig config) {
// Open tables with optimized disk
Map<String, String> disks = config.getMap(RocksDBOptions.DATA_DISKS);
if (!disks.isEmpty()) {
this.parseTableDiskMapping(disks);
String dataPath = config.get(RocksDBOptions.DATA_PATH);
this.parseTableDiskMapping(disks, dataPath);
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
String disk = e.getValue();
Expand All @@ -181,10 +182,11 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
sessions = this.openSessionPool(config, dataPath,
walPath, tableNames);
} catch (RocksDBException e) {
if (dbs.containsKey(dataPath)) {
RocksDBSessions origin = dbs.get(dataPath);
if (origin != null) {
if (e.getMessage().contains("No locks available")) {
// Open twice, but we should support keyspace
sessions = dbs.get(dataPath);
sessions = origin.copy(config, this.database, this.store);
}
}

Expand Down Expand Up @@ -222,6 +224,7 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
}

if (sessions != null) {
// May override the original session pool
dbs.put(dataPath, sessions);
sessions.session();
LOG.debug("Store opened: {}", dataPath);
Expand All @@ -235,12 +238,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath);
} else {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store,
tableNames);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath, tableNames);
}
}

Expand Down Expand Up @@ -363,7 +365,7 @@ public void clear() {

private void dropTable(RocksDBSessions db, String table) {
try {
this.sessions.dropTable(table);
db.dropTable(table);
} catch (BackendException e) {
if (e.getMessage().contains("is not opened")) {
return;
Expand Down Expand Up @@ -446,12 +448,15 @@ private void checkOpened() {
this.database, this.provider.type());
}

private void parseTableDiskMapping(Map<String, String> disks) {
private void parseTableDiskMapping(Map<String, String> disks,
String dataPath) {
this.tableDiskMapping.clear();
for (Map.Entry<String, String> disk : disks.entrySet()) {
// The format of `disk` like: `graph/vertex: /path/to/disk1`
String name = disk.getKey();
String path = disk.getValue();
E.checkArgument(!dataPath.equals(path), "Invalid disk path" +
"(can't be the same as data_path): '%s'", path);
E.checkArgument(!name.isEmpty() && !path.isEmpty(),
"Invalid disk format: '%s', expect `NAME:PATH`",
disk);
Expand Down
Expand Up @@ -49,8 +49,8 @@ public class RocksDBSstSessions extends RocksDBSessions {
private final String dataPath;
private final Map<String, SstFileWriter> tables;

public RocksDBSstSessions(HugeConfig config, String dataPath,
String database, String store) {
public RocksDBSstSessions(HugeConfig config, String database, String store,
String dataPath) {
super(config, database, store);

this.dataPath = dataPath;
Expand All @@ -71,6 +71,14 @@ public RocksDBSstSessions(HugeConfig config, String dataPath,
}
}

private RocksDBSstSessions(HugeConfig config, String database, String store,
RocksDBSstSessions origin) {
super(config, database, store);

this.dataPath = origin.dataPath;
this.tables = origin.tables;
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -110,6 +118,13 @@ public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBSstSessions(config, database, store, this);
}


private SstFileWriter table(String table) {
SstFileWriter sst = this.tables.get(table);
if (sst == null) {
Expand Down
Expand Up @@ -44,11 +44,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store());
return new RocksDBSstSessions(config, this.database(),
this.store(), dataPath);
} else {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store(), tableNames);
return new RocksDBSstSessions(config, this.database(), this.store(),
dataPath, tableNames);
}
}

Expand Down
Expand Up @@ -2190,6 +2190,9 @@ public void testRemoveEdgesOfSuperVertex() {
Assert.assertTrue(e.getMessage().contains(
"Edges size has reached tx capacity"));
});

// Clear all
graph.truncateBackend();
}

@Test
Expand Down
Expand Up @@ -108,7 +108,8 @@ public void testCreateGraphsWithSameName() {
g1.clearBackend();
g2.clearBackend();
g3.clearBackend();
destoryGraphs(ImmutableList.of(g1));

destoryGraphs(ImmutableList.of(g1, g2, g3));
}

@Test
Expand Down
Expand Up @@ -697,6 +697,8 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
.create();
graph.addVertex(T.label, "programmer", T.id, 123456, "name", "marko",
"age", 18, "city", "Beijing");
graph.addVertex(T.label, "programmer", T.id, 61695499031416832L,
"name", "marko", "age", 18, "city", "Beijing");
graph.tx().commit();

List<Vertex> vertices = graph.traversal().V(123456).toList();
Expand All @@ -707,6 +709,16 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");

vertices = graph.traversal().V(61695499031416832L).toList();
Assert.assertEquals(1, vertices.size());
id = vertices.get(0).id();
Assert.assertEquals(IdGenerator.LongId.class, id.getClass());
Assert.assertEquals(61695499031416832L,
((IdGenerator.LongId) id).asLong());
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");
}

@Test
Expand Down
Expand Up @@ -111,8 +111,8 @@ protected static long l(byte[] bytes) {

private static RocksDBSessions open(String table) throws RocksDBException {
HugeConfig config = FakeObjects.newConfig();
RocksDBSessions rocks = new RocksDBStdSessions(config, DB_PATH, DB_PATH,
"db", "store");
RocksDBSessions rocks = new RocksDBStdSessions(config, "db", "store",
DB_PATH, DB_PATH);
rocks.createTable(table);
return rocks;
}
Expand Down
5 changes: 3 additions & 2 deletions hugegraph-test/src/main/resources/hugegraph.properties
Expand Up @@ -22,8 +22,9 @@ cassandra.connect_timeout=30
cassandra.read_timeout=120

# rocksdb backend config
#rocksdb.data_path=
#rocksdb.wal_path=
rocksdb.data_path=rocksdb-data
rocksdb.wal_path=rocksdb-data
rocksdb.data_disks=[graph/secondary_index:rocksdb-index]

# hbase backend config
hbase.hosts=localhost
Expand Down

0 comments on commit 9a4c543

Please sign in to comment.