Skip to content

Commit

Permalink
Use moving sum for GC duration.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed May 22, 2024
1 parent 6011a45 commit cb896ce
Showing 1 changed file with 46 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.management.NotificationEmitter;
import javax.management.openmbean.CompositeData;

import org.eclipse.rdf4j.common.io.FileUtil;
import org.eclipse.rdf4j.model.IRI;
Expand All @@ -42,6 +46,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.management.GarbageCollectionNotificationInfo;
import com.sun.management.GcInfo;

/**
* Model implementation that stores in a {@link LinkedHashModel} until more than 10KB statements are added and the
* estimated memory usage is more than the amount of free memory available. Once the threshold is cross this
Expand Down Expand Up @@ -75,6 +82,39 @@ abstract class MemoryOverflowModel extends AbstractModel implements AutoCloseabl

private final SimpleValueFactory vf = SimpleValueFactory.getInstance();

private static volatile boolean highGcLoad = false;
private static volatile long lastGcUpdate;
private static volatile long gcSum;
private static volatile List<GcInfo> gcInfos = new CopyOnWriteArrayList<>();
static {
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean gcBean : gcBeans) {
NotificationEmitter emitter = (NotificationEmitter) gcBean;
emitter.addNotificationListener((notification, o) -> {
while (! gcInfos.isEmpty()) {
if (System.currentTimeMillis() - gcInfos.get(0).getEndTime() > 5000) {
gcSum -= gcInfos.remove(0).getDuration();
} else {
break;
}
}

// extract garbage collection information from notification.
GarbageCollectionNotificationInfo gcNotificationInfo = GarbageCollectionNotificationInfo.from((CompositeData) notification.getUserData());
GcInfo gcInfo = gcNotificationInfo.getGcInfo();
gcInfos.add(gcInfo);
gcSum += gcInfo.getDuration();
System.out.println("gcSum: " + gcSum);
if (gcSum > 1000) {
highGcLoad = true;
lastGcUpdate = System.currentTimeMillis();
} else if (System.currentTimeMillis() - lastGcUpdate > 10000) {
highGcLoad = false;
}
}, null, null);
}
}

public MemoryOverflowModel(boolean verifyAdditions) {
this.verifyAdditions = verifyAdditions;
memory = new LinkedHashModel(LARGE_BLOCK);
Expand Down Expand Up @@ -148,7 +188,7 @@ public boolean addAll(Collection<? extends Statement> c) {
if (buffer.size() >= 1024) {
ret |= getDelegate().addAll(buffer);
buffer.clear();
innerCheckMemoryOverflow();
checkMemoryOverflow();
}
}
if (!buffer.isEmpty()) {
Expand Down Expand Up @@ -266,85 +306,15 @@ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundEx
}
}

static class GcInfo {
long count;
long time;
}

private final Map<String, GcInfo> prevGcInfo = new ConcurrentHashMap<>();

private synchronized boolean highGcLoad() {
boolean highLoad = false;

// get all garbage collector MXBeans.
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean gcBean : gcBeans) {
long count = gcBean.getCollectionCount();
long time = gcBean.getCollectionTime();

GcInfo prevInfo = prevGcInfo.get(gcBean.getName());
if (prevInfo != null) {
long countDiff = count - prevInfo.count;
long timeDiff = time - prevInfo.time;
if (countDiff != 0) {
double gcLoad = (double) timeDiff / countDiff;
// TODO find good threshold
if (gcLoad > 100) {
highLoad = true;
}
}
} else {
prevInfo = new GcInfo();
prevGcInfo.put(gcBean.getName(), prevInfo);
}
prevInfo.count = count;
prevInfo.time = time;
}
return highLoad;
}

private void checkMemoryOverflow() {
private synchronized void checkMemoryOverflow() {
if (disk == getDelegate()) {
return;
}

if (overflow) {
innerCheckMemoryOverflow();
}
int size = size() + 1;
if (size >= LARGE_BLOCK && size % LARGE_BLOCK == 0) {
if (highGcLoad()) {
logger.debug("syncing at {} triples due to high gc load.", size);
overflowToDisk();
} else {
innerCheckMemoryOverflow();
}
}
}

private synchronized void innerCheckMemoryOverflow() {
if (disk == null) {
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();

// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();

// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();

// estimated memory used
long used = totalMemory - freeMemory;

// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;

// try to prevent OOM
overflow = freeToAllocateMemory < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING;
if (overflow) {
logger.debug("syncing at {} triples.", size());
overflowToDisk();
}
if (highGcLoad) {
logger.debug("syncing at {} triples due to gc load");
overflowToDisk();
System.gc();
}
}

Expand Down

0 comments on commit cb896ce

Please sign in to comment.