Skip to content

Commit

Permalink
better handling of errors and resizing in LMDB
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Jun 5, 2024
1 parent 6a618c5 commit 9cb0c52
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_open;
Expand All @@ -24,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.locks.StampedLock;

import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
import org.lwjgl.PointerBuffer;
import org.lwjgl.system.MemoryStack;
Expand Down Expand Up @@ -92,7 +94,7 @@ public long[] next() {
try {
if (txnRefVersion != txnRef.version()) {
// cursor must be renewed
mdb_cursor_renew(txn, cursor);
E(mdb_cursor_renew(txn, cursor));
if (fetchNext) {
// cursor must be positioned on last item, reuse minKeyBuf if available
if (minKeyBuf == null) {
Expand All @@ -102,10 +104,10 @@ public long[] next() {
Varint.writeUnsigned(minKeyBuf, record[0]);
minKeyBuf.flip();
keyData.mv_data(minKeyBuf);
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET));
if (lastResult != 0) {
// use MDB_SET_RANGE if key was deleted
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
}
if (lastResult != 0) {
closeInternal(false);
Expand All @@ -117,27 +119,29 @@ public long[] next() {
}

if (fetchNext) {
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
fetchNext = false;
} else {
if (minKeyBuf != null) {
// set cursor to min key
keyData.mv_data(minKeyBuf);
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
} else {
// set cursor to first item
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
}
}

while (lastResult == 0) {
while (lastResult == MDB_SUCCESS) {
record[0] = Varint.readUnsigned(keyData.mv_data());
// fetch next value
fetchNext = true;
return record;
}
closeInternal(false);
return null;
} catch (IOException e) {
throw new SailException(e);
} finally {
txnLock.unlockRead(stamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
import static org.lwjgl.util.lmdb.LMDB.mdb_cmp;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
Expand All @@ -25,6 +26,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.locks.StampedLock;

import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.lmdb.TripleStore.TripleIndex;
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
import org.eclipse.rdf4j.sail.lmdb.Varint.GroupMatcher;
Expand Down Expand Up @@ -136,10 +138,10 @@ public long[] next() {
index.toKey(minKeyBuf, quad[0], quad[1], quad[2], quad[3]);
minKeyBuf.flip();
keyData.mv_data(minKeyBuf);
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET));
if (lastResult != 0) {
// use MDB_SET_RANGE if key was deleted
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
}
if (lastResult != 0) {
closeInternal(false);
Expand All @@ -151,26 +153,26 @@ public long[] next() {
}

if (fetchNext) {
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
fetchNext = false;
} else {
if (minKeyBuf != null) {
// set cursor to min key
keyData.mv_data(minKeyBuf);
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
} else {
// set cursor to first item
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
}
}

while (lastResult == 0) {
while (lastResult == MDB_SUCCESS) {
// if (maxKey != null && TripleStore.COMPARATOR.compare(keyData.mv_data(), maxKey.mv_data()) > 0) {
if (maxKey != null && mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
lastResult = MDB_NOTFOUND;
} else if (groupMatcher != null && !groupMatcher.matches(keyData.mv_data())) {
// value doesn't match search key/mask, fetch next value
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
lastResult = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
} else {
// Matching value found
index.keyToQuad(keyData.mv_data(), quad);
Expand All @@ -181,6 +183,8 @@ public long[] next() {
}
closeInternal(false);
return null;
} catch (IOException e) {
throw new SailException(e);
} finally {
txnLock.unlockRead(stamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts)
if (tripleStoreException != null) {
throw wrapTripleStoreException();
}
Thread.yield();
}

} else {
Expand Down Expand Up @@ -704,7 +705,7 @@ private void startTransaction(boolean preferThreading) throws SailException {
} else {
tripleStore.startTransaction();
}
valueStore.startTransaction();
valueStore.startTransaction(true);
} catch (Exception e) {
storeTxnStarted.set(false);
throw new SailException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ final class LmdbUtil {
* Percentage free space in an LMDB db before automatically resizing the map. Default is 80%.
*/
@SuppressWarnings("StaticNonFinalField")
public static int PERCENTAGE_FULL_TRIGGERS_RESIZE = 80;
public static int PERCENTAGE_FULL_TRIGGERS_RESIZE = 80;

private LmdbUtil() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private synchronized boolean update(Object element, boolean add) throws IOExcept
keyVal.mv_data(keyBuf);

if (add) {
if (mdb_put(factory.writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE) == MDB_SUCCESS) {
if (E(mdb_put(factory.writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE)) == MDB_SUCCESS) {
size++;
return true;
}
Expand Down Expand Up @@ -226,16 +226,16 @@ private T computeNext() throws IOException {

try (MemoryStack stack = MemoryStack.stackPush()) {
keyData.mv_data(stack.bytes(write(current)));
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET) != 0) {
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET) != MDB_SUCCESS) {
// use MDB_SET_RANGE if key was deleted
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE) == 0) {
if (mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE) == MDB_SUCCESS) {
return read(keyData.mv_data());
}
}
}
}

if (mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT) == 0) {
if (mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT) == MDB_SUCCESS) {
return read(keyData.mv_data());
}
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
keyBuf.clear();
Varint.writeUnsigned(keyBuf, id);
keyData.mv_data(keyBuf.flip());
if (mdb_get(txn, contextsDbi, keyData, valueData) == 0) {
if (E(mdb_get(txn, contextsDbi, keyData, valueData)) == MDB_SUCCESS) {
it.remove();
}
}
Expand All @@ -587,15 +587,15 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {

if (fullScan) {
long[] quad = new long[4];
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST);
while (rc == 0 && !ids.isEmpty()) {
int rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST));
while (rc == MDB_SUCCESS && !ids.isEmpty()) {
index.keyToQuad(keyData.mv_data(), quad);
ids.remove(quad[0]);
ids.remove(quad[1]);
ids.remove(quad[2]);
ids.remove(quad[3]);

rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
}
} else {
for (Iterator<Long> it = ids.iterator(); it.hasNext();) {
Expand Down Expand Up @@ -625,15 +625,15 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {

// set cursor to min key
keyData.mv_data(keyBuf);
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
int rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
boolean exists = false;
while (!exists && rc == 0) {
if (mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
// id was not found
break;
} else if (!matcher.matches(keyData.mv_data())) {
// value doesn't match search key/mask, fetch next value
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
} else {
exists = true;
}
Expand Down Expand Up @@ -708,7 +708,7 @@ protected double cardinality(long subj, long pred, long obj, long context) throw

// set cursor to min key
keyData.mv_data(keyBuf);
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
int rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
if (rc != 0 || mdb_cmp(txn, dbi, keyData, maxKey) >= 0) {
break;
} else {
Expand All @@ -717,13 +717,13 @@ protected double cardinality(long subj, long pred, long obj, long context) throw

// set cursor to max key
keyData.mv_data(maxKeyBuf);
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
if (rc != 0) {
// directly go to last value
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_LAST);
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_LAST));
} else {
// go to previous value of selected key
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_PREV);
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_PREV));
}
if (rc == 0) {
Varint.readListUnsigned(keyData.mv_data(), s.maxValues);
Expand All @@ -747,8 +747,8 @@ protected double cardinality(long subj, long pred, long obj, long context) throw
keyData.mv_data(keyBuf);

int currentSamplesCount = 0;
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
while (rc == 0 && currentSamplesCount < s.MAX_SAMPLES_PER_BUCKET) {
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE));
while (rc == MDB_SUCCESS && currentSamplesCount < s.MAX_SAMPLES_PER_BUCKET) {
if (mdb_cmp(txn, dbi, keyData, maxKey) >= 0) {
endOfRange = true;
break;
Expand Down Expand Up @@ -776,7 +776,7 @@ protected double cardinality(long subj, long pred, long obj, long context) throw
}
}
}
rc = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
rc = E(mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT));
if (rc != 0) {
// no more elements are available
endOfRange = true;
Expand Down Expand Up @@ -862,6 +862,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
}
}

if (recordCache != null) {
long quad[] = new long[] { subj, pred, obj, context };
if (explicit) {
Expand All @@ -872,7 +873,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
return recordCache.storeRecord(quad, explicit);
}

int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE);
int rc = E(mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE));
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
throw new IOException(mdb_strerror(rc));
}
Expand All @@ -884,6 +885,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean

if (stAdded) {
for (int i = 1; i < indexes.size(); i++) {

TripleIndex index = indexes.get(i);
keyBuf.clear();
index.toKey(keyBuf, subj, pred, obj, context);
Expand Down Expand Up @@ -918,7 +920,7 @@ private void incrementContext(MemoryStack stack, long context) throws IOExceptio
idVal.mv_data(bb);
MDBVal dataVal = MDBVal.calloc(stack);
long newCount = 1;
if (mdb_get(writeTxn, contextsDbi, idVal, dataVal) == 0) {
if (E(mdb_get(writeTxn, contextsDbi, idVal, dataVal)) == MDB_SUCCESS) {
// update count
newCount = Varint.readUnsigned(dataVal.mv_data()) + 1;
}
Expand All @@ -942,7 +944,7 @@ private boolean decrementContext(MemoryStack stack, long context) throws IOExcep
bb.flip();
idVal.mv_data(bb);
MDBVal dataVal = MDBVal.calloc(stack);
if (mdb_get(writeTxn, contextsDbi, idVal, dataVal) == 0) {
if (E(mdb_get(writeTxn, contextsDbi, idVal, dataVal)) == MDB_SUCCESS) {
// update count
long newCount = Varint.readUnsigned(dataVal.mv_data()) - 1;
if (newCount <= 0) {
Expand All @@ -953,7 +955,7 @@ private boolean decrementContext(MemoryStack stack, long context) throws IOExcep
ByteBuffer countBb = stack.malloc(Varint.calcLengthUnsigned(newCount));
Varint.writeUnsigned(countBb, newCount);
dataVal.mv_data(countBb.flip());
mdb_put(writeTxn, contextsDbi, idVal, dataVal, 0);
E(mdb_put(writeTxn, contextsDbi, idVal, dataVal, 0));
}
}
return false;
Expand All @@ -980,7 +982,7 @@ public void removeTriplesByContext(long subj, long pred, long obj, long context,
}

public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]> handler) throws IOException {
try (MemoryStack stack = MemoryStack.stackPush()) {
try (it; MemoryStack stack = MemoryStack.stackPush()) {
MDBVal keyValue = MDBVal.callocStack(stack);
ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH);

Expand Down Expand Up @@ -1013,8 +1015,6 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
decrementContext(stack, quad[CONTEXT_IDX]);
handler.accept(quad);
}
} finally {
it.close();
}
}

Expand Down
Loading

0 comments on commit 9cb0c52

Please sign in to comment.