Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-4954 LMDB: Fix GC and record counting for deletions #4955

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,11 @@ public void flush() throws SailException {
}
if (activeTxn) {
if (!multiThreadingActive) {
tripleStore.commit();
filterUsedIdsInTripleStore();
}
handleRemovedIdsInValueStore();
valueStore.commit();
if (!multiThreadingActive) {
tripleStore.commit();
}
// do not set flag to false until _after_ commit is successfully completed.
storeTxnStarted.set(false);
}
Expand Down Expand Up @@ -604,9 +602,9 @@ private void startTransaction(boolean preferThreading) throws SailException {
Operation op = opQueue.remove();
if (op != null) {
if (op == COMMIT_TRANSACTION) {
tripleStore.commit();
filterUsedIdsInTripleStore();

tripleStore.commit();
nextTransactionAsync = false;
asyncTransactionFinished = true;
break;
Expand Down Expand Up @@ -712,23 +710,18 @@ private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit,

private long removeStatements(long subj, long pred, long obj, boolean explicit, long[] contexts)
throws IOException {
long removeCount = 0;
long[] removeCount = { 0 };
for (long contextId : contexts) {
final Map<Long, Long> perContextCounts = new HashMap<>();
tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> {
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
removeCount[0]++;
for (long id : quad) {
if (id != 0L) {
unusedIds.add(id);
}
}
});

for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
removeCount += entry.getValue();
}
}
return removeCount;
return removeCount[0];
}

private long removeStatements(Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ final class LmdbUtil {
private LmdbUtil() {
}

static void E(int rc) throws IOException {
static int E(int rc) throws IOException {
if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) {
throw new IOException(mdb_strerror(rc));
}
return rc;
}

static <T> T readTransaction(long env, Transaction<T> transaction) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction;
import static org.eclipse.rdf4j.sail.lmdb.Varint.readListUnsigned;
import static org.eclipse.rdf4j.sail.lmdb.Varint.writeListUnsigned;
import static org.eclipse.rdf4j.sail.lmdb.Varint.writeUnsigned;
import static org.lwjgl.system.MemoryStack.stackPush;
import static org.lwjgl.system.MemoryUtil.NULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
import static org.lwjgl.util.lmdb.LMDB.MDB_KEYEXIST;
import static org.lwjgl.util.lmdb.LMDB.MDB_LAST;
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
import static org.lwjgl.util.lmdb.LMDB.MDB_PREV;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
import static org.lwjgl.util.lmdb.LMDB.mdb_cmp;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
Expand All @@ -47,6 +51,7 @@
import static org.lwjgl.util.lmdb.LMDB.mdb_put;
import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare;
import static org.lwjgl.util.lmdb.LMDB.mdb_stat;
import static org.lwjgl.util.lmdb.LMDB.mdb_strerror;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;
Expand Down Expand Up @@ -92,7 +97,7 @@
import org.slf4j.LoggerFactory;

/**
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four integer
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four long
* IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's
* subject, predicate, object and context. The ID <tt>0</tt> is used to represent the "null" context and doesn't map to
* an actual RDF value.
Expand Down Expand Up @@ -539,7 +544,7 @@ protected void bucketStart(double fraction, long[] lowerValues, long[] upperValu
* @throws IOException
*/
protected void filterUsedIds(Collection<Long> ids) throws IOException {
try (MemoryStack stack = stackPush()) {
readTransaction(env, (stack, txn) -> {
MDBVal maxKey = MDBVal.malloc(stack);
ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);
MDBVal keyData = MDBVal.malloc(stack);
Expand All @@ -559,7 +564,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
keyBuf.clear();
Varint.writeUnsigned(keyBuf, id);
keyData.mv_data(keyBuf.flip());
if (mdb_get(writeTxn, contextsDbi, keyData, valueData) == 0) {
if (mdb_get(txn, contextsDbi, keyData, valueData) == 0) {
it.remove();
}
}
Expand All @@ -580,7 +585,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {

long cursor = 0;
try {
E(mdb_cursor_open(writeTxn, dbi, pp));
E(mdb_cursor_open(txn, dbi, pp));
cursor = pp.get(0);

if (fullScan) {
Expand Down Expand Up @@ -626,7 +631,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
boolean exists = false;
while (!exists && rc == 0) {
if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) {
if (mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
// id was not found
break;
} else if (!matcher.matches(keyData.mv_data())) {
Expand All @@ -649,7 +654,8 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
}
}
}
}
return null;
});
}

protected double cardinality(long subj, long pred, long obj, long context) throws IOException {
Expand Down Expand Up @@ -842,6 +848,7 @@ private boolean requiresResize() {

public boolean storeTriple(long subj, long pred, long obj, long context, boolean explicit) throws IOException {
TripleIndex mainIndex = indexes.get(0);
boolean stAdded;
try (MemoryStack stack = MemoryStack.stackPush()) {
MDBVal keyVal = MDBVal.malloc(stack);
// use calloc to get an empty data value
Expand All @@ -851,33 +858,34 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
keyBuf.flip();
keyVal.mv_data(keyBuf);

boolean foundExplicit = mdb_get(writeTxn, mainIndex.getDB(true), keyVal, dataVal) == 0;
boolean foundImplicit = !foundExplicit && mdb_get(writeTxn, mainIndex.getDB(false), keyVal, dataVal) == 0;

boolean stAdded = !(foundExplicit || foundImplicit);
if (stAdded || explicit && foundImplicit) {
if (recordCache == null) {
if (requiresResize()) {
// map is full, resize required
recordCache = new TxnRecordCache(dir);
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
}
if (recordCache == null) {
if (requiresResize()) {
// map is full, resize required
recordCache = new TxnRecordCache(dir);
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
}
if (recordCache != null) {
long quad[] = new long[] { subj, pred, obj, context };
if (explicit && foundImplicit) {
// remove implicit statement
recordCache.removeRecord(quad, false);
}
// put record in cache and return immediately
return recordCache.storeRecord(quad, explicit);
}
if (recordCache != null) {
long quad[] = new long[] { subj, pred, obj, context };
if (explicit) {
// remove implicit statement
recordCache.removeRecord(quad, false);
}
// put record in cache and return immediately
return recordCache.storeRecord(quad, explicit);
}

if (explicit && foundImplicit) {
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
}
E(mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, 0));
int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE);
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
throw new IOException(mdb_strerror(rc));
}
stAdded = rc == MDB_SUCCESS;
boolean foundImplicit = false;
if (explicit && stAdded) {
foundImplicit = E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal)) == MDB_SUCCESS;
}

if (stAdded) {
for (int i = 1; i < indexes.size(); i++) {
TripleIndex index = indexes.get(i);
keyBuf.clear();
Expand All @@ -887,7 +895,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
// update buffer positions in MDBVal
keyVal.mv_data(keyBuf);

if (explicit && foundImplicit) {
if (foundImplicit) {
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
}
E(mdb_put(writeTxn, index.getDB(explicit), keyVal, dataVal, 0));
Expand All @@ -897,9 +905,9 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
incrementContext(stack, context);
}
}

return stAdded;
}

return stAdded;
}

private void incrementContext(MemoryStack stack, long context) throws IOException {
Expand Down Expand Up @@ -991,6 +999,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
}
if (recordCache != null) {
recordCache.removeRecord(quad, explicit);
handler.accept(quad);
continue;
}

Expand All @@ -1005,6 +1014,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
}

decrementContext(stack, quad[CONTEXT_IDX]);
handler.accept(quad);
}
} finally {
it.close();
Expand Down Expand Up @@ -1284,24 +1294,22 @@ GroupMatcher createMatcher(long subj, long pred, long obj, long context) {
}

void toKey(ByteBuffer bb, long subj, long pred, long obj, long context) {
long[] values = new long[4];
for (int i = 0; i < fieldSeq.length; i++) {
switch (fieldSeq[i]) {
case 's':
values[i] = subj;
writeUnsigned(bb, subj);
break;
case 'p':
values[i] = pred;
writeUnsigned(bb, pred);
break;
case 'o':
values[i] = obj;
writeUnsigned(bb, obj);
break;
case 'c':
values[i] = context;
writeUnsigned(bb, context);
break;
}
}
writeListUnsigned(bb, values);
}

void keyToQuad(ByteBuffer key, long[] quad) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
package org.eclipse.rdf4j.sail.lmdb;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig;
Expand Down Expand Up @@ -65,6 +70,25 @@ public void testInferredStmts() throws Exception {
}
}

@Test
public void testGc() throws Exception {
tripleStore.startTransaction();
tripleStore.storeTriple(1, 2, 3, 1, true);
tripleStore.storeTriple(1, 2, 4, 1, true);
tripleStore.storeTriple(1, 2, 5, 1, true);
tripleStore.storeTriple(1, 6, 7, 1, true);
tripleStore.storeTriple(1, 6, 7, 8, true);
Set<Long> removed = new HashSet<>();
tripleStore.removeTriplesByContext(1, 6, -1, -1, true, quad -> {
for (Long c : quad) {
removed.add(c);
}
});
tripleStore.commit();
tripleStore.filterUsedIds(removed);
assertEquals(Arrays.asList(6L, 7L, 8L), removed.stream().sorted().collect(Collectors.toList()));
}

@AfterEach
public void after() throws Exception {
tripleStore.close();
Expand Down
Loading