Skip to content

Commit

Permalink
GH-4899 Introduce MapDB3 backed queue to the MapDb3CollectionFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
JervenBolleman committed Feb 6, 2024
1 parent 3996cd8 commit 85dcb36
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
package org.eclipse.rdf4j.collection.factory.mapdb;

import java.util.AbstractMap;
import java.util.AbstractQueue;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.function.BiConsumer;
Expand All @@ -41,11 +43,12 @@
import org.mapdb.HTreeMap;
import org.mapdb.Serializer;
import org.mapdb.serializer.SerializerJava;
import org.mapdb.serializer.SerializerLong;

public class MapDb3CollectionFactory implements CollectionFactory {
// The size 16 seems like a nice starting value but others could well
// be better.
private static final int SWITCH_TO_DISK_BASED_SET_AT_SIZE = 16;
private static final int DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE = 16;
protected volatile DB db;
protected volatile long colectionId = 0;
protected final long iterationCacheSyncThreshold;
Expand All @@ -54,6 +57,63 @@ public class MapDb3CollectionFactory implements CollectionFactory {
// So I am not going to worry about this.
private static final boolean ON_32_BIT_VM = "32".equals(System.getProperty("sun.arch.data.model"));

private final class MapDb3BackedQueue<T> extends AbstractQueue<T> {
private final Map<Long, T> m;
private long tail;
private long head;

private MapDb3BackedQueue(Map<Long, T> m) {
this.m = m;
}

@Override
public boolean offer(T arg0) {
m.put((Long) tail++, arg0);
if (tail % iterationCacheSyncThreshold == 0)
db.commit();
return true;
}

@Override
public T peek() {
return m.get(head);
}

@Override
public T poll() {
T r = m.remove(head++);
if (head % iterationCacheSyncThreshold == 0)
db.commit();
return r;
}

@Override
public Iterator<T> iterator() {
return new Iterator<>() {
long at = head;

@Override
public boolean hasNext() {
return at < tail;
}

@Override
public T next() {
if (at >= tail) {
throw new NoSuchElementException();
}
return m.get(at++);
}

};
}

@Override
public int size() {
return (int) (tail - head);
}
}

protected static final class RDF4jMapDB3Exception extends RDF4JException {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -112,7 +172,7 @@ public Set<BindingSet> createSetOfBindingSets(Supplier<MutableBindingSet> create
init();
Serializer<BindingSet> serializer = createBindingSetSerializer(create, getHas, getget, getSet);
MemoryTillSizeXSet<BindingSet> set = new MemoryTillSizeXSet<>(colectionId++,
delegate.createSetOfBindingSets(), serializer);
delegate.createSetOfBindingSets(), serializer, DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE);
return new CommitingSet<>(set, iterationCacheSyncThreshold, db);
} else {
return delegate.createSetOfBindingSets();
Expand All @@ -124,7 +184,8 @@ public <T> Set<T> createSet() {
if (iterationCacheSyncThreshold > 0) {
init();
Serializer<T> serializer = createAnySerializer();
MemoryTillSizeXSet<T> set = new MemoryTillSizeXSet<T>(colectionId++, delegate.createSet(), serializer);
MemoryTillSizeXSet<T> set = new MemoryTillSizeXSet<T>(colectionId++, delegate.createSet(), serializer,
DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE);
return new CommitingSet<T>(set, iterationCacheSyncThreshold, db);
} else {
return delegate.createSet();
Expand All @@ -136,7 +197,8 @@ public Set<Value> createValueSet() {
if (iterationCacheSyncThreshold > 0) {
init();
Serializer<Value> serializer = createValueSerializer();
Set<Value> set = new MemoryTillSizeXSet<>(colectionId++, delegate.createValueSet(), serializer);
Set<Value> set = new MemoryTillSizeXSet<>(colectionId++, delegate.createValueSet(), serializer,
DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE);
return new CommitingSet<Value>(set, iterationCacheSyncThreshold, db);
} else {
return delegate.createValueSet();
Expand Down Expand Up @@ -173,12 +235,28 @@ public <V> Map<Value, V> createValueKeyedMap() {

@Override
public <T> Queue<T> createQueue() {
return delegate.createQueue();
if (iterationCacheSyncThreshold > 0) {
init();
Serializer<T> s = createAnySerializer();
Map<Long, T> m = db.hashMap(Long.toHexString(colectionId++), new SerializerLong(), s).create();

return new MemoryTillSizeXQueue<>(delegate.createQueue(), 128, () -> new MapDb3BackedQueue<>(m));
} else {
return delegate.createQueue();
}
}

@Override
public Queue<Value> createValueQueue() {
return delegate.createValueQueue();
if (iterationCacheSyncThreshold > 0) {
init();
Serializer<Value> s = createValueSerializer();
Map<Long, Value> m = db.hashMap(Long.toHexString(colectionId++), new SerializerLong(), s).create();
return new MemoryTillSizeXQueue<>(delegate.createQueue(), 128, () -> new MapDb3BackedQueue<>(m));

} else {
return delegate.createValueQueue();
}
}

@Override
Expand Down Expand Up @@ -303,17 +381,19 @@ protected class MemoryTillSizeXSet<V> extends AbstractSet<V> {
private Set<V> wrapped;
private final long setName;
private final Serializer<V> valueSerializer;
private final long switchToDiskAtSize;

public MemoryTillSizeXSet(long setName, Set<V> wrapped, Serializer<V> valueSerializer) {
public MemoryTillSizeXSet(long setName, Set<V> wrapped, Serializer<V> valueSerializer, long switchToSize) {
super();
this.setName = setName;
this.wrapped = wrapped;
this.valueSerializer = valueSerializer;
this.switchToDiskAtSize = switchToSize;
}

@Override
public boolean add(V e) {
if (wrapped instanceof HashSet && wrapped.size() > SWITCH_TO_DISK_BASED_SET_AT_SIZE) {
if (wrapped instanceof HashSet && wrapped.size() > switchToDiskAtSize) {
Set<V> disk = db.hashSet(Long.toHexString(setName), valueSerializer).create();
disk.addAll(wrapped);
wrapped = disk;
Expand All @@ -323,7 +403,7 @@ public boolean add(V e) {

@Override
public boolean addAll(Collection<? extends V> arg0) {
if (wrapped instanceof HashSet && arg0.size() > SWITCH_TO_DISK_BASED_SET_AT_SIZE) {
if (wrapped instanceof HashSet && arg0.size() > switchToDiskAtSize) {
Set<V> disk = db.hashSet(Long.toHexString(setName), valueSerializer).create();
disk.addAll(wrapped);
wrapped = disk;
Expand Down Expand Up @@ -383,6 +463,55 @@ public int size() {

}

/**
* Only create a disk based set once the contents are large enough that it starts to pay off.
*
* @param <T> of the contents of the set.
*/
protected class MemoryTillSizeXQueue<V> extends AbstractQueue<V> {
private Queue<V> wrapped;
private final long switchToDiskAtSize;
private final Supplier<Queue<V>> supplier;

public MemoryTillSizeXQueue(Queue<V> wrapped, long switchToSize, Supplier<Queue<V>> supplier) {
super();
this.wrapped = wrapped;
this.switchToDiskAtSize = switchToSize;
this.supplier = supplier;
}

@Override
public int size() {
return wrapped.size();
}

@Override
public boolean offer(V e) {
if (!(wrapped instanceof MapDb3BackedQueue) && wrapped.size() > switchToDiskAtSize) {
Queue<V> disk = supplier.get();
disk.addAll(wrapped);
wrapped = disk;
}
return wrapped.offer(e);
}

@Override
public V peek() {
return wrapped.peek();
}

@Override
public V poll() {
return wrapped.poll();
}

@Override
public Iterator<V> iterator() {
return wrapped.iterator();
}

}

/**
* These methods should be overriding in case a store can deliver a better serialization protocol.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.collection.factory.mapdb;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;

import org.junit.jupiter.api.Test;

public class MapDb3CollectionFactoryTest {

@Test
void queuOfferAnd() {
try (MapDb3CollectionFactory mapDb3CollectionFactory = new MapDb3CollectionFactory(1)) {
Queue<String> q = mapDb3CollectionFactory.createQueue();
int size = 1024;
for (int i = 0; i < size; i++) {
assertTrue(q.offer(Integer.toString(i)));
}
assertEquals(size, q.size());
for (int i = 0; i < size; i++) {
String p = q.peek();
assertEquals(p, Integer.toString(i));
String p2 = q.peek();
assertEquals(p2, Integer.toString(i));
String s = q.poll();
assertEquals(s, Integer.toString(i));
}
assertEquals(0, q.size());
}
}

@Test
void iterator() {
try (MapDb3CollectionFactory mapDb3CollectionFactory = new MapDb3CollectionFactory(1)) {
Queue<String> q = mapDb3CollectionFactory.createQueue();
int size = 1024;
for (int i = 0; i < size; i++) {
assertTrue(q.offer(Integer.toString(i)));
}
assertEquals(size, q.size());
Iterator<String> iter = q.iterator();
for (int i = 0; i < size; i++) {
assertTrue(iter.hasNext());
assertEquals(iter.next(), Integer.toString(i));
}
assertFalse(iter.hasNext());
assertEquals(size, q.size());
try {
iter.next();
fail();
} catch (NoSuchElementException e) {
assertNotNull(e);
}
}
}
}

0 comments on commit 85dcb36

Please sign in to comment.