Navigation Menu

Skip to content

Commit

Permalink
HCOLL-352 Changed garbage-free Chronicle-Map iterations API from old …
Browse files Browse the repository at this point in the history
…to new Contexts; Add SegmentContexts API
  • Loading branch information
leventov committed Jul 24, 2015
1 parent 912a3cc commit 9fe1dab
Show file tree
Hide file tree
Showing 32 changed files with 196 additions and 231 deletions.
6 changes: 4 additions & 2 deletions src/main/java/net/openhft/chronicle/hash/ChronicleHash.java
Expand Up @@ -33,8 +33,8 @@
* This interface defines common {@link ChronicleMap} and {@link ChronicleSet}, related to off-heap
* memory management and file-mapping. Not usable by itself.
*/
public interface ChronicleHash<K, C extends KeyContext<K>, EQC extends ExternalHashQueryContext<K>>
extends Closeable {
public interface ChronicleHash<K, C extends HashEntry<K>, SC extends HashSegmentContext<K, ?>,
EQC extends ExternalHashQueryContext<K>> extends Closeable {
/**
* Returns the file this hash container mapped to, i. e. when it is created by
* {@link ChronicleHashBuilder#create()} call, or {@code null} if it is purely in-memory,
Expand Down Expand Up @@ -81,6 +81,8 @@ public interface ChronicleHash<K, C extends KeyContext<K>, EQC extends ExternalH
* @return the context to perform operations with the key
*/
@NotNull EQC queryContext(Data<K> key);

SC segmentContext(int segmentIndex);

/**
* Checks the given predicate on each entry in this {@code ChronicleHash} until all entries
Expand Down
Expand Up @@ -65,7 +65,7 @@
* @param <B> the concrete builder type, i. e. {@link ChronicleMapBuilder}
* or {@link ChronicleSetBuilder}
*/
public interface ChronicleHashBuilder<K, H extends ChronicleHash<K, ?, ?>,
public interface ChronicleHashBuilder<K, H extends ChronicleHash<K, ?, ?, ?>,
B extends ChronicleHashBuilder<K, H, B>> extends Cloneable {

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/openhft/chronicle/hash/HashContext.java
Expand Up @@ -27,5 +27,5 @@ public interface HashContext<K> {
/**
* Returns the accessed {@code ChronicleHash}.
*/
ChronicleHash<K, ?, ?> hash();
ChronicleHash<K, ?, ?, ?> hash();
}
33 changes: 33 additions & 0 deletions src/main/java/net/openhft/chronicle/hash/HashSegmentContext.java
@@ -0,0 +1,33 @@
/*
* Copyright 2015 Higher Frequency Trading
*
* http://www.higherfrequencytrading.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.openhft.chronicle.hash;

import net.openhft.chronicle.core.io.Closeable;

import java.util.function.Consumer;
import java.util.function.Predicate;

public interface HashSegmentContext<K, E extends HashEntry<K>> extends HashContext<K>, Closeable {

void forEachSegmentEntry(Consumer<? super E> action);

boolean forEachSegmentEntryWhile(Predicate<? super E> predicate);

long size();
}
Expand Up @@ -22,7 +22,7 @@
import net.openhft.chronicle.hash.serialization.internal.SerializationBuilder;

//TODO remove this temporary interface
public interface ChronicleHashBuilderImpl<K, H extends ChronicleHash<K, ?, ?>,
public interface ChronicleHashBuilderImpl<K, H extends ChronicleHash<K, ?, ?, ?>,
B extends ChronicleHashBuilder<K, H, B>> extends ChronicleHashBuilder<K, H, B> {

SerializationBuilder<K> keyBuilder();
Expand Down
Expand Up @@ -16,9 +16,7 @@

package net.openhft.chronicle.hash.impl;

import net.openhft.chronicle.hash.ChronicleHash;
import net.openhft.chronicle.hash.ExternalHashQueryContext;
import net.openhft.chronicle.hash.KeyContext;
import net.openhft.chronicle.hash.*;
import net.openhft.chronicle.hash.impl.hashlookup.HashLookup;
import net.openhft.chronicle.hash.impl.util.BuildVersion;
import net.openhft.chronicle.hash.serialization.BytesReader;
Expand All @@ -44,8 +42,9 @@
import static net.openhft.lang.MemoryUnit.*;

public abstract class VanillaChronicleHash<K, KI, MKI extends MetaBytesInterop<K, ? super KI>,
C extends KeyContext<K>, ECQ extends ExternalHashQueryContext<K>>
implements ChronicleHash<K, C, ECQ>, Serializable {
C extends HashEntry<K>, SC extends HashSegmentContext<K, ?>,
ECQ extends ExternalHashQueryContext<K>>
implements ChronicleHash<K, C, SC, ECQ>, Serializable {

private static final long serialVersionUID = 0L;

Expand Down
Expand Up @@ -21,5 +21,5 @@
import net.openhft.chronicle.hash.serialization.internal.MetaBytesInterop;

public interface VanillaChronicleHashHolder<K, KI, MKI extends MetaBytesInterop<K, ? super KI>> {
VanillaChronicleHash<K, KI, MKI, ?, ?> h();
VanillaChronicleHash<K, KI, MKI, ?, ?, ?> h();
}
Expand Up @@ -360,7 +360,7 @@ boolean segmentInit() {
}

void initSegment() {
VanillaChronicleHash<?, ?, ?, ?, ?> h = hh.h();
VanillaChronicleHash<?, ?, ?, ?, ?, ?> h = hh.h();
long hashLookupOffset = h.segmentOffset(segmentIndex);
long freeListOffset = hashLookupOffset + h.segmentHashLookupOuterSize;
freeListBytes.storePositionAndSize(h.ms, freeListOffset, h.segmentFreeListInnerSize);
Expand All @@ -375,7 +375,7 @@ void closeSegment() {

//TODO refactor/optimize
public long alloc(int chunks) {
VanillaChronicleHash<?, ?, ?, ?, ?> h = hh.h();
VanillaChronicleHash<?, ?, ?, ?, ?, ?> h = hh.h();
if (chunks > h.maxChunksPerEntry)
throw new IllegalArgumentException("Entry is too large: requires " + chunks +
" entry size chucks, " + h.maxChunksPerEntry + " is maximum.");
Expand Down
Expand Up @@ -27,6 +27,7 @@
import net.openhft.sg.StageRef;
import net.openhft.sg.Staged;

import java.util.function.Consumer;
import java.util.function.Predicate;

@Staged
Expand All @@ -50,7 +51,7 @@ protected void initEntryRemovedOnThisIteration(boolean entryRemovedOnThisIterati
this.entryRemovedOnThisIteration = entryRemovedOnThisIteration;
}

public boolean forEachRemoving(Predicate<? super E> action) {
public boolean forEachSegmentEntryWhile(Predicate<? super E> action) {
s.innerUpdateLock.lock();
try {
long size = s.size();
Expand Down Expand Up @@ -86,6 +87,13 @@ public boolean forEachRemoving(Predicate<? super E> action) {
initEntryRemovedOnThisIteration(false);
}
}

public void forEachSegmentEntry(Consumer<? super E> action) {
forEachSegmentEntryWhile(e -> {
action.accept(e);
return true;
});
}

public void checkEntryNotRemovedOnThisIteration() {
if (entryRemovedOnThisIterationInit())
Expand Down
28 changes: 12 additions & 16 deletions src/main/java/net/openhft/chronicle/map/AbstractChronicleMap.java
Expand Up @@ -80,7 +80,7 @@ default void putAll(File fromFile) throws IOException {

@Override
default boolean containsValue(Object value) {
return !forEachEntryWhile(c -> !c.valueEqualTo((V) value));
return !forEachEntryWhile(c -> !c.value().equals(c.context().wrapValueAsData((V) value)));
}

@Override
Expand All @@ -90,7 +90,7 @@ default boolean isEmpty() {

@Override
default void forEach(BiConsumer<? super K, ? super V> action) {
forEachEntry(c -> action.accept(c.key(), c.get()));
forEachEntry(c -> action.accept(c.key().get(), c.value().get()));
}

@Override
Expand Down Expand Up @@ -147,7 +147,7 @@ public boolean contains(Object v) {

@Override
public void forEach(java.util.function.Consumer<? super V> action) {
AbstractChronicleMap.this.forEachEntry(c -> action.accept(c.get()));
AbstractChronicleMap.this.forEachEntry(c -> action.accept(c.value().get()));
}
};
}
Expand Down Expand Up @@ -181,7 +181,7 @@ public boolean contains(Object k) {

@Override
public void forEach(java.util.function.Consumer<? super K> action) {
AbstractChronicleMap.this.forEachEntry(c -> action.accept(c.key()));
AbstractChronicleMap.this.forEachEntry(c -> action.accept(c.key().get()));
}
};
}
Expand All @@ -198,9 +198,9 @@ default boolean mapEquals(Object o) {

try {
return forEachEntryWhile(c -> {
K k = c.key();
K k = c.key().get();
V v = (V) m.get(k instanceof CharSequence ? k.toString() : k);
return v != null && c.valueEqualTo(v);
return v != null && c.value().equals(c.context().wrapValueAsData(v));
});
} catch (ClassCastException unused) {
return false;
Expand Down Expand Up @@ -247,25 +247,21 @@ default Set<Entry<K, V>> newEntrySet() {


@Override
default void forEachEntry(final Consumer<? super MapKeyContext<K, V>> action) {
default void forEachEntry(final Consumer<? super MapEntry<K, V>> action) {
forEachEntryWhile(c -> {
action.accept(c);
return true;
});
}

IterationContextInterface<K, V> iterationContext();

@Override
default boolean forEachEntryWhile(final Predicate<? super MapKeyContext<K, V>> action) {
default boolean forEachEntryWhile(final Predicate<? super MapEntry<K, V>> action) {
boolean interrupt = false;
iteration:
try (IterationContextInterface<K, V> c = iterationContext()) {
for (int segmentIndex = actualSegments() - 1; segmentIndex >= 0; segmentIndex--) {
c.initTheSegmentIndex(segmentIndex);
if (!c.forEachRemoving(e -> action.test(c.deprecatedMapKeyContextOnIteration()))) {
for (int i = actualSegments() - 1; i >= 0; i--) {
try (MapSegmentContext<K, V, ?> c = segmentContext(i)) {
if (!c.forEachSegmentEntryWhile(action)) {
interrupt = true;
break iteration;
break;
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/net/openhft/chronicle/map/ChronicleMap.java
Expand Up @@ -30,7 +30,6 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.Object;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -54,9 +53,9 @@
* @param <K> the map key type
* @param <V> the map value type
*/
public interface ChronicleMap<K, V>
extends ConcurrentMap<K, V>,
ChronicleHash<K, MapKeyContext<K, V>, ExternalMapQueryContext<K, V, ?>> {
public interface ChronicleMap<K, V> extends ConcurrentMap<K, V>,
ChronicleHash<K, MapEntry<K, V>, MapSegmentContext<K, V, ?>,
ExternalMapQueryContext<K, V, ?>> {

/**
* Returns the value to which the specified key is mapped, or {@code null} if this map contains
Expand Down
35 changes: 16 additions & 19 deletions src/main/java/net/openhft/chronicle/map/ChronicleMapIterator.java
Expand Up @@ -16,24 +16,22 @@

package net.openhft.chronicle.map;

import net.openhft.chronicle.map.impl.IterationContextInterface;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.function.Consumer;

/**
* Very inefficient (esp. if segments are large), but CORRECT implementation
*/
abstract class ChronicleMapIterator<K, V, E> implements Iterator<E> {
abstract class ChronicleMapIterator<K, V, E> implements Iterator<E>, Consumer<MapEntry<K, V>> {

final AbstractChronicleMap<K, V> map;
private final Thread ownerThread = Thread.currentThread();
private final Queue<E> entryBuffer = new ArrayDeque<>();
private int segmentIndex;
IterationContextInterface<K, V> context;
E returned;

ChronicleMapIterator(AbstractChronicleMap<K, V> map) {
Expand All @@ -54,24 +52,23 @@ private void fillEntryBuffer() {
while (true) {
if (segmentIndex < 0)
return;
try (IterationContextInterface<K, V> c = map.iterationContext()) {
context = c;
c.initTheSegmentIndex(segmentIndex);

try (MapSegmentContext<K, V, ?> c = map.segmentContext(segmentIndex)) {
segmentIndex--;
if (c.size() == 0)
continue;
c.forEachRemoving(e -> {
entryBuffer.add(read());
return true;
});
c.forEachSegmentEntry(this);
return;
} finally {
context = null;
}
}
}

abstract E read();
@Override
public void accept(MapEntry<K, V> e) {
entryBuffer.add(read(e));
}

abstract E read(MapEntry<K, V> entry);

@Override
public boolean hasNext() {
Expand Down Expand Up @@ -108,9 +105,9 @@ static class OfEntries<K, V> extends ChronicleMapIterator<K, V, Entry<K, V>> {
}

@Override
Entry<K, V> read() {
K key = context.key().getUsing(null);
V value = context.value().getUsing(null);
Entry<K, V> read(MapEntry<K, V> entry) {
K key = entry.key().getUsing(null);
V value = entry.value().getUsing(null);
return new WriteThroughEntry<>(map, key, value);
}

Expand All @@ -127,8 +124,8 @@ static class OfKeys<K, V> extends ChronicleMapIterator<K, V, K> {
}

@Override
K read() {
return context.key().getUsing(null);
K read(MapEntry<K, V> entry) {
return entry.key().getUsing(null);
}

@Override
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/net/openhft/chronicle/map/MapSegmentContext.java
@@ -0,0 +1,25 @@
/*
* Copyright 2015 Higher Frequency Trading
*
* http://www.higherfrequencytrading.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.openhft.chronicle.map;

import net.openhft.chronicle.hash.HashSegmentContext;

public interface MapSegmentContext<K, V, R>
extends HashSegmentContext<K, MapEntry<K, V>>, MapContext<K, V, R> {
}
Expand Up @@ -18,7 +18,6 @@

package net.openhft.chronicle.map;

import net.openhft.chronicle.hash.KeyContext;
import net.openhft.chronicle.hash.replication.AbstractReplication;
import net.openhft.chronicle.hash.replication.ReplicableEntry;
import net.openhft.chronicle.hash.replication.TimeProvider;
Expand Down Expand Up @@ -218,7 +217,7 @@ public void onHeaderCreated() {

@Override
public void clear() {
forEachEntry(KeyContext::remove);
forEachEntry(c -> c.context().remove(c));
}

void addCloseable(Closeable closeable) {
Expand Down

0 comments on commit 9fe1dab

Please sign in to comment.