Skip to content

Commit

Permalink
GH-4554 Fix performance of memory overflow
Browse files Browse the repository at this point in the history
- do not verify additions in Changeset via isEmpty()
- proactively close MemoryOverflowModel in Changeset by using AutoClosable interface
- use GarbageCollectorMXBean to monitor GC load
- do not isolate transactions in LmdbStore when used in MemoryOverflowModel
  • Loading branch information
kenwenzel committed May 23, 2023
1 parent 221ab7e commit 4f67b34
Show file tree
Hide file tree
Showing 12 changed files with 348 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,25 @@ public void close() throws SailException {
refbacks = null;
prepend = null;
observed = null;
approved = null;
try {
if (approved instanceof AutoCloseable) {
try {
((AutoCloseable) approved).close();
} catch (Exception e) {
throw new SailException(e);
}
}
approved = null;
} finally {
if (deprecated instanceof AutoCloseable) {
try {
((AutoCloseable) deprecated).close();
} catch (Exception e) {
throw new SailException(e);
}
}
deprecated = null;
}
deprecated = null;
approvedContexts = null;
deprecatedContexts = null;
Expand Down Expand Up @@ -416,7 +434,7 @@ public void approve(Statement statement) {
approved = createEmptyModel();
}
approved.add(statement);
approvedEmpty = approved == null || approved.isEmpty();
approvedEmpty = false;
if (statement.getContext() != null) {
if (approvedContexts == null) {
approvedContexts = new HashSet<>();
Expand Down Expand Up @@ -447,7 +465,7 @@ public void deprecate(Statement statement) {
deprecated = createEmptyModel();
}
deprecated.add(statement);
deprecatedEmpty = deprecated == null || deprecated.isEmpty();
deprecatedEmpty = false;
Resource ctx = statement.getContext();
if (approvedContexts != null && approvedContexts.contains(ctx)
&& !approved.contains(null, null, null, ctx)) {
Expand Down Expand Up @@ -885,7 +903,7 @@ public void approveAll(Set<Statement> approve, Set<Resource> approveContexts) {
approved = createEmptyModel();
}
approved.addAll(approve);
approvedEmpty = approved == null || approved.isEmpty();
approvedEmpty = false;

if (approveContexts != null) {
if (approvedContexts == null) {
Expand All @@ -912,7 +930,7 @@ public void deprecateAll(Set<Statement> deprecate) {
deprecated = createEmptyModel();
}
deprecated.addAll(deprecate);
deprecatedEmpty = deprecated == null || deprecated.isEmpty();
deprecatedEmpty = false;

for (Statement statement : deprecate) {
Resource ctx = statement.getContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ CloseableIteration<? extends Statement, SailException> createStatementIterator(
}
}

public void setTransactionIsolation(boolean transactionIsolation) {
this.tripleStore.setTransactionIsolation(transactionIsolation);
}

private final class LmdbSailSource extends BackingSailSource {

private final boolean explicit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,15 @@ protected void initializeInternal() throws SailException {
FileUtils.writeStringToFile(versionFile, VERSION, StandardCharsets.UTF_8);
}
backingStore = new LmdbSailStore(dataDir, config);
this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() {
this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel(false) {
@Override
protected SailStore createSailStore(File dataDir) throws IOException, SailException {
// Model can't fit into memory, use another LmdbSailStore to store delta
return new LmdbSailStore(dataDir, config);
LmdbStoreConfig overflowConfig = new LmdbStoreConfig();
LmdbSailStore store = new LmdbSailStore(dataDir, overflowConfig);
// does not need to isolate transactions and therefore can optimize autogrow and others
store.setTransactionIsolation(false);
return store;
}
}) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.eclipse.rdf4j.common.io.FileUtil;
import org.eclipse.rdf4j.model.IRI;
Expand All @@ -41,16 +45,16 @@
* estimated memory usage is more than the amount of free memory available. Once the threshold is cross this
* implementation seamlessly changes to a disk based {@link SailSourceModel}.
*/
abstract class MemoryOverflowModel extends AbstractModel {
abstract class MemoryOverflowModel extends AbstractModel implements AutoCloseable {

private static final long serialVersionUID = 4119844228099208169L;

private static final Runtime RUNTIME = Runtime.getRuntime();

private static final int LARGE_BLOCK = 10000;

// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
// we have space for one more block. The limit is currently set at 32 MB
// To reduce the chance of OOM we will always overflow once we get close to running out of memory.
// The limit is currently set at 32 MB
private static final int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = 32 * 1024 * 1024;

final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);
Expand All @@ -63,27 +67,17 @@ abstract class MemoryOverflowModel extends AbstractModel {

transient SailSourceModel disk;

private long baseline = 0;
private final boolean verifyAdditions;

private long maxBlockSize = 0;
private final SimpleValueFactory vf = SimpleValueFactory.getInstance();

SimpleValueFactory vf = SimpleValueFactory.getInstance();

public MemoryOverflowModel() {
public MemoryOverflowModel(boolean verifyAdditions) {
this.verifyAdditions = verifyAdditions;
memory = new LinkedHashModel(LARGE_BLOCK);
}

public MemoryOverflowModel(Model model) {
this(model.getNamespaces());
addAll(model);
}

public MemoryOverflowModel(Set<Namespace> namespaces, Collection<? extends Statement> c) {
this(namespaces);
addAll(c);
}

public MemoryOverflowModel(Set<Namespace> namespaces) {
public MemoryOverflowModel(Set<Namespace> namespaces, boolean verifyAdditions) {
this.verifyAdditions = verifyAdditions;
memory = new LinkedHashModel(namespaces, LARGE_BLOCK);
}

Expand Down Expand Up @@ -227,40 +221,71 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx
}
}

static class GcInfo {
long count;
long time;
}

private final Map<String, GcInfo> prevGcInfo = new ConcurrentHashMap<>();

private synchronized boolean highGcLoad() {
boolean highLoad = false;

// get all garbage collector MXBeans.
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean gcBean : gcBeans) {
long count = gcBean.getCollectionCount();
long time = gcBean.getCollectionTime();

GcInfo prevInfo = prevGcInfo.get(gcBean.getName());
if (prevInfo != null) {
long countDiff = count - prevInfo.count;
long timeDiff = time - prevInfo.time;
if (countDiff != 0) {
double gcLoad = (double) timeDiff / countDiff;
// TODO find good threshold
if (gcLoad > 30) {
highLoad = true;
}
}
} else {
prevInfo = new GcInfo();
prevGcInfo.put(gcBean.getName(), prevInfo);
}
prevInfo.count = count;
prevInfo.time = time;
}
return highLoad;
}

private synchronized void checkMemoryOverflow() {
if (disk == null) {
int size = size();
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();
boolean overflow = highGcLoad();
if (!overflow) {
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();

// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();
// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();

// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();
// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();

// estimated memory used
long used = totalMemory - freeMemory;
// estimated memory used
long used = totalMemory - freeMemory;

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;
// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;

if (baseline > 0) {
long blockSize = used - baseline;
if (blockSize > maxBlockSize) {
maxBlockSize = blockSize;
}

// Sync if either the estimated size of the next block is larger than remaining memory, or
// if less than 15% of the heap is still free (this last condition to avoid GC overhead limit)
if (freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING ||
freeToAllocateMemory < Math.min(0.15 * maxMemory, maxBlockSize)) {
logger.debug("syncing at {} triples. max block size: {}", size, maxBlockSize);
overflowToDisk();
}
// try to prevent OOM
overflow = freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING;
}
if (overflow) {
logger.debug("syncing at {} triples.", size);
overflowToDisk();
}
baseline = used;
}
}
}
Expand All @@ -271,26 +296,7 @@ private synchronized void overflowToDisk() {
dataDir = Files.createTempDirectory("model").toFile();
logger.debug("memory overflow using temp directory {}", dataDir);
store = createSailStore(dataDir);
disk = new SailSourceModel(store) {

@Override
protected void finalize() throws Throwable {
logger.debug("finalizing {}", dataDir);
if (disk == this) {
try {
store.close();
} catch (SailException e) {
logger.error(e.toString(), e);
} finally {
FileUtil.deleteDir(dataDir);
dataDir = null;
store = null;
disk = null;
}
}
super.finalize();
}
};
disk = new SailSourceModel(store, verifyAdditions);
disk.addAll(memory);
memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK);
logger.debug("overflow synced to disk");
Expand All @@ -299,4 +305,22 @@ protected void finalize() throws Throwable {
logger.error("Error while writing to overflow directory " + path, e);
}
}

@Override
public void close() throws IOException {
if (disk != null) {
logger.debug("closing {}", dataDir);
disk.close();
try {
store.close();
} catch (SailException e) {
logger.error(e.toString(), e);
} finally {
FileUtil.deleteDir(dataDir);
dataDir = null;
store = null;
disk = null;
}
}
}
}

0 comments on commit 4f67b34

Please sign in to comment.