Skip to content

Commit

Permalink
On the way to HCOLL-360: Rename HashLookup to CompactOffHeapLinearHas…
Browse files Browse the repository at this point in the history
…hTable and make it not a stage; extracted KeySearch from HashQuery
  • Loading branch information
leventov committed Aug 3, 2015
1 parent 7c10ea4 commit ff99f5c
Show file tree
Hide file tree
Showing 23 changed files with 255 additions and 259 deletions.
Expand Up @@ -14,20 +14,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package net.openhft.chronicle.hash.impl.stage.entry;
package net.openhft.chronicle.hash.impl;

import net.openhft.chronicle.hash.impl.VanillaChronicleHashHolder;
import net.openhft.lang.Maths;
import net.openhft.sg.Stage;
import net.openhft.sg.StageRef;
import net.openhft.sg.Staged;

import static net.openhft.lang.MemoryUnit.BITS;
import static net.openhft.lang.MemoryUnit.BYTES;
import static net.openhft.lang.io.NativeBytes.UNSAFE;

@Staged
public class HashLookup {
public class CompactOffHeapLinearHashTable {
// to fit 64 bits per slot.
public static final int MAX_SEGMENT_CHUNKS = 1 << 30;
public static final int MAX_SEGMENT_ENTRIES = 1 << 29;
Expand Down Expand Up @@ -71,25 +66,16 @@ public static long mask(int bits) {
public static final long UNSET_KEY = 0L;
public static final long UNSET_ENTRY = 0L;

@Stage("SegmentHashLookup") long address = -1;
@Stage("SegmentHashLookup") long capacityMask;
@Stage("SegmentHashLookup") int hashLookupEntrySize;
@Stage("SegmentHashLookup") long capacityMask2;
@Stage("SegmentHashLookup") int keyBits;
@Stage("SegmentHashLookup") long keyMask;
@Stage("SegmentHashLookup") long valueMask;
@Stage("SegmentHashLookup") long entryMask;

public void initSegmentHashLookup(
long address, long capacity, int entrySize, int keyBits, int valueBits) {
innerInitSegmentHashLookup(address, capacity, entrySize, keyBits, valueBits);
}

@Stage("SegmentHashLookup")
private void innerInitSegmentHashLookup(
long address, long capacity, int entrySize, int keyBits, int valueBits) {
this.address = address;
private final long capacityMask;
private final int hashLookupEntrySize;
private final long capacityMask2;
private final int keyBits;
private final long keyMask;
private final long valueMask;
private final long entryMask;

CompactOffHeapLinearHashTable(long capacity, int entrySize, int keyBits, int valueBits) {
this.capacityMask = capacity - 1L;

this.hashLookupEntrySize = entrySize;
Expand All @@ -100,16 +86,10 @@ private void innerInitSegmentHashLookup(
this.valueMask = mask(valueBits);
this.entryMask = mask(keyBits + valueBits);
}

@StageRef VanillaChronicleHashHolder<?, ?, ?> hh;
@StageRef SegmentStages s;

public void initSegmentHashLookup() {
long hashLookupOffset = hh.h().segmentOffset(s.segmentIndex);
innerInitSegmentHashLookup(hh.h().ms.address() + hashLookupOffset,
hh.h().segmentHashLookupCapacity, hh.h().segmentHashLookupEntrySize,
hh.h().segmentHashLookupKeyBits, hh.h().segmentHashLookupValueBits);

CompactOffHeapLinearHashTable(VanillaChronicleHash h) {
this(h.segmentHashLookupCapacity, h.segmentHashLookupEntrySize, h.segmentHashLookupKeyBits,
h.segmentHashLookupValueBits);
}

long indexToPos(long index) {
Expand Down Expand Up @@ -152,49 +132,44 @@ public long stepBack(long pos) {
return (pos -= hashLookupEntrySize) >= 0 ? pos : capacityMask2;
}

public long readEntry(long pos) {
return UNSAFE.getLong(address + pos);
public long readEntry(long addr, long pos) {
return UNSAFE.getLong(addr + pos);
}

public void writeEntry(long pos, long prevEntry, long key, long value) {
public void writeEntryVolatile(long addr, long pos, long prevEntry, long key, long value) {
long entry = (prevEntry & ~entryMask) | entry(key, value);
UNSAFE.putLong(address + pos, entry);
}

public void writeEntryVolatile(long pos, long prevEntry, long key, long value) {
long entry = (prevEntry & ~entryMask) | entry(key, value);
UNSAFE.putLongVolatile(null, address + pos, entry);
UNSAFE.putLongVolatile(null, addr + pos, entry);
}

public void putValueVolatile(long pos, long value) {
public void putValueVolatile(long addr, long pos, long value) {
checkValueForPut(value);
long currentEntry = readEntry(pos);
writeEntryVolatile(pos, currentEntry, key(currentEntry), value);
long currentEntry = readEntry(addr, pos);
writeEntryVolatile(addr, pos, currentEntry, key(currentEntry), value);
}

void writeEntry(long pos, long prevEntry, long anotherEntry) {
void writeEntry(long addr, long pos, long prevEntry, long anotherEntry) {
long entry = (prevEntry & ~entryMask) | (anotherEntry & entryMask);
UNSAFE.putLong(address + pos, entry);
UNSAFE.putLong(addr + pos, entry);
}

void clearEntry(long pos, long prevEntry) {
void clearEntry(long addr, long pos, long prevEntry) {
long entry = (prevEntry & ~entryMask);
UNSAFE.putLong(address + pos, entry);
UNSAFE.putLong(addr + pos, entry);
}

public void clearHashLookup() {
UNSAFE.setMemory(address, capacityMask2 + hashLookupEntrySize, (byte) 0);
public void clearHashLookup(long addr) {
UNSAFE.setMemory(addr, capacityMask2 + hashLookupEntrySize, (byte) 0);
}

/**
* Returns "insert" position in terms of consequent putValue()
*/
public long remove(long posToRemove) {
long entryToRemove = readEntry(posToRemove);
public long remove(long addr, long posToRemove) {
long entryToRemove = readEntry(addr, posToRemove);
long posToShift = posToRemove;
while (true) {
posToShift = step(posToShift);
long entryToShift = readEntry(posToShift);
long entryToShift = readEntry(addr, posToShift);
if (empty(entryToShift))
break;
long insertPos = hlPos(key(entryToShift));
Expand All @@ -209,31 +184,12 @@ public long remove(long posToRemove) {
if ((cond1 && cond2) ||
// chain wrapped around capacity
(posToShift < insertPos && (cond1 || cond2))) {
writeEntry(posToRemove, entryToRemove, entryToShift);
writeEntry(addr, posToRemove, entryToRemove, entryToShift);
posToRemove = posToShift;
entryToRemove = entryToShift;
}
}
clearEntry(posToRemove, entryToRemove);
clearEntry(addr, posToRemove, entryToRemove);
return posToRemove;
}

interface EntryConsumer {
void accept(long key, long value);
}

String hashLookupToString() {
final StringBuilder sb = new StringBuilder("{");
forEach((key, value) -> sb.append(key).append('=').append(value).append(','));
sb.append('}');
return sb.toString();
}

void forEach(EntryConsumer action) {
for (long pos = 0L; pos <= capacityMask2; pos += hashLookupEntrySize) {
long entry = readEntry(pos);
if (!empty(entry))
action.accept(key(entry), value(entry));
}
}
}
Expand Up @@ -20,7 +20,6 @@
import net.openhft.chronicle.hash.ExternalHashQueryContext;
import net.openhft.chronicle.hash.HashEntry;
import net.openhft.chronicle.hash.HashSegmentContext;
import net.openhft.chronicle.hash.impl.stage.entry.HashLookup;
import net.openhft.chronicle.hash.impl.util.BuildVersion;
import net.openhft.chronicle.hash.serialization.BytesReader;
import net.openhft.chronicle.hash.serialization.SizeMarshaller;
Expand All @@ -40,9 +39,13 @@
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.channels.FileChannel;
import java.util.List;

import static java.lang.Long.numberOfTrailingZeros;
import static java.lang.Math.max;
import static net.openhft.chronicle.hash.impl.CompactOffHeapLinearHashTable.entrySize;
import static net.openhft.chronicle.hash.impl.CompactOffHeapLinearHashTable.keyBits;
import static net.openhft.chronicle.hash.impl.CompactOffHeapLinearHashTable.valueBits;
import static net.openhft.lang.MemoryUnit.*;

public abstract class VanillaChronicleHash<K, KI, MKI extends MetaBytesInterop<K, ? super KI>,
Expand Down Expand Up @@ -107,6 +110,8 @@ public abstract class VanillaChronicleHash<K, KI, MKI extends MetaBytesInterop<K
public transient long headerSize;
transient long segmentHeadersOffset;
transient long segmentsOffset;

public transient CompactOffHeapLinearHashTable hashLookup;

@SuppressWarnings("deprecation")
public VanillaChronicleHash(ChronicleMapBuilder<K, ?> builder, boolean replicated) {
Expand Down Expand Up @@ -134,12 +139,11 @@ public VanillaChronicleHash(ChronicleMapBuilder<K, ?> builder, boolean replicate
// Precomputed offsets and sizes for fast Context init
segmentHeaderSize = builder.segmentHeaderSize(replicated);

segmentHashLookupValueBits = HashLookup.valueBits(actualChunksPerSegment);
segmentHashLookupKeyBits =
HashLookup.keyBits(entriesPerSegment, segmentHashLookupValueBits);
segmentHashLookupValueBits = valueBits(actualChunksPerSegment);
segmentHashLookupKeyBits = keyBits(entriesPerSegment, segmentHashLookupValueBits);
segmentHashLookupEntrySize =
HashLookup.entrySize(segmentHashLookupKeyBits, segmentHashLookupValueBits);
segmentHashLookupCapacity = HashLookup.capacityFor(entriesPerSegment);
entrySize(segmentHashLookupKeyBits, segmentHashLookupValueBits);
segmentHashLookupCapacity = CompactOffHeapLinearHashTable.capacityFor(entriesPerSegment);
segmentHashLookupInnerSize = segmentHashLookupCapacity * segmentHashLookupEntrySize;
segmentHashLookupOuterSize = CACHE_LINES.align(segmentHashLookupInnerSize, BYTES);

Expand Down Expand Up @@ -183,6 +187,7 @@ public void initTransients() {
private void ownInitTransients() {
keyReaderProvider = Provider.of((Class) originalKeyReader.getClass());
keyInteropProvider = Provider.of((Class) originalKeyInterop.getClass());
hashLookup = new CompactOffHeapLinearHashTable(this);
}

public final void createMappedStoreAndSegments(BytesStore bytesStore) throws IOException {
Expand Down
Expand Up @@ -33,7 +33,7 @@
@Staged
public abstract class HashEntryStages<K> implements HashEntry<K> {

@StageRef VanillaChronicleHashHolder<?, ?, ?> hh;
@StageRef public VanillaChronicleHashHolder<?, ?, ?> hh;
@StageRef public SegmentStages s;
@StageRef public CheckOnEachPublicOperation checkOnEachPublicOperation;
@StageRef public HashLookupPos hlp;
Expand Down
Expand Up @@ -16,7 +16,10 @@

package net.openhft.chronicle.hash.impl.stage.entry;

import net.openhft.chronicle.hash.impl.CompactOffHeapLinearHashTable;
import net.openhft.chronicle.hash.impl.VanillaChronicleHashHolder;
import net.openhft.sg.Stage;
import net.openhft.sg.StageRef;
import net.openhft.sg.Staged;

@Staged
Expand All @@ -36,4 +39,13 @@ public void setHashLookupPos(long hashLookupPos) {
}

public abstract void closeHashLookupPos();

@StageRef VanillaChronicleHashHolder<?, ?, ?> hh;
@StageRef SegmentStages s;

public void putValueVolatile(long newValue) {
CompactOffHeapLinearHashTable hashLookup = hh.h().hashLookup;
hashLookup.checkValueForPut(newValue);
hashLookup.putValueVolatile(s.segmentBase, hashLookupPos, newValue);
}
}
Expand Up @@ -21,7 +21,7 @@
import net.openhft.chronicle.hash.impl.*;
import net.openhft.chronicle.hash.impl.stage.hash.Chaining;
import net.openhft.chronicle.hash.impl.stage.hash.CheckOnEachPublicOperation;
import net.openhft.chronicle.hash.impl.stage.query.HashQuery;
import net.openhft.chronicle.hash.impl.stage.query.KeySearch;
import net.openhft.chronicle.hash.locks.InterProcessLock;
import net.openhft.lang.collection.DirectBitSet;
import net.openhft.lang.collection.SingleThreadedDirectBitSet;
Expand Down Expand Up @@ -269,8 +269,8 @@ public void readUnlockAndDecrementCount() {
private void linkToSegmentContextsChain() {
SegmentStages innermostContextOnThisSegment = rootContextOnThisSegment;
while (true) {
Data key = ((HashQuery) (Object) innermostContextOnThisSegment).inputKey;
if (Objects.equals(key, ((HashQuery) (Object) this).inputKey)) {
Data key = ((KeySearch) (Object) innermostContextOnThisSegment).inputKey;
if (Objects.equals(key, ((KeySearch) (Object) this).inputKey)) {
throw new IllegalStateException("Nested same-thread contexts cannot access " +
"the same key " + key);
}
Expand Down Expand Up @@ -324,14 +324,10 @@ public void setLocalLockState(LocalLockState newState) {
localLockState = newState;
}

// TODO their isHeldByCurrentThread should be _context_ local
// i. e. if outer same-thread context hold the lock, isHeld() should be false, but
// lock() op could be shallow
@StageRef public ReadLock innerReadLock;
@StageRef public UpdateLock innerUpdateLock;
@StageRef public WriteLock innerWriteLock;


@NotNull
@Override
public InterProcessLock readLock() {
Expand All @@ -352,21 +348,20 @@ public InterProcessLock writeLock() {
checkOnEachPublicOperation.checkOnEachPublicOperation();
return innerWriteLock;
}


@StageRef public HashLookup hashLookup;

@Stage("Segment") MultiStoreBytes freeListBytes = new MultiStoreBytes();
@Stage("Segment") public SingleThreadedDirectBitSet freeList = new SingleThreadedDirectBitSet();
@Stage("Segment") long entrySpaceOffset = 0;

@Stage("Segment") public long segmentBase;

boolean segmentInit() {
return entrySpaceOffset > 0;
}

void initSegment() {
VanillaChronicleHash<?, ?, ?, ?, ?, ?> h = hh.h();
long hashLookupOffset = h.segmentOffset(segmentIndex);
segmentBase = hh.h().ms.address() + hashLookupOffset;
long freeListOffset = hashLookupOffset + h.segmentHashLookupOuterSize;
freeListBytes.storePositionAndSize(h.ms, freeListOffset, h.segmentFreeListInnerSize);
freeList.reuse(freeListBytes);
Expand Down Expand Up @@ -432,7 +427,7 @@ public void updateNextPosToSearchFrom(long allocated, int chunks) {

public void clearSegment() {
innerWriteLock.lock();
hashLookup.clearHashLookup();
hh.h().hashLookup.clearHashLookup(segmentBase);
freeList.clear();
nextPosToSearchFrom(0L);
entries(0L);
Expand Down
Expand Up @@ -17,8 +17,9 @@
package net.openhft.chronicle.hash.impl.stage.iter;

import net.openhft.chronicle.hash.HashEntry;
import net.openhft.chronicle.hash.impl.CompactOffHeapLinearHashTable;
import net.openhft.chronicle.hash.impl.VanillaChronicleHashHolder;
import net.openhft.chronicle.hash.impl.stage.entry.HashEntryStages;
import net.openhft.chronicle.hash.impl.stage.entry.HashLookup;
import net.openhft.chronicle.hash.impl.stage.entry.HashLookupPos;
import net.openhft.chronicle.hash.impl.stage.entry.SegmentStages;
import net.openhft.chronicle.hash.impl.stage.hash.CheckOnEachPublicOperation;
Expand All @@ -33,7 +34,7 @@ public abstract class HashSegmentIteration<K, E extends HashEntry<K>> implements

@StageRef public SegmentStages s;
@StageRef HashEntryStages<K> e;
@StageRef protected HashLookup hashLookup;
@StageRef VanillaChronicleHashHolder<?, ?, ?> hh;
@StageRef public CheckOnEachPublicOperation checkOnEachPublicOperation;
@StageRef protected HashLookupPos hlp;

Expand All @@ -57,13 +58,14 @@ public boolean forEachSegmentEntryWhile(Predicate<? super E> action) {
return true;
boolean interrupted = false;
long startPos = 0L;
while (!hashLookup.empty(hashLookup.readEntry(startPos))) {
CompactOffHeapLinearHashTable hashLookup = hh.h().hashLookup;
while (!hashLookup.empty(hashLookup.readEntry(s.segmentBase, startPos))) {
startPos = hashLookup.step(startPos);
}
hlp.initHashLookupPos(startPos);
do {
hlp.setHashLookupPos(hashLookup.step(hlp.hashLookupPos));
long entry = hashLookup.readEntry(hlp.hashLookupPos);
long entry = hashLookup.readEntry(s.segmentBase, hlp.hashLookupPos);
if (!hashLookup.empty(entry)) {
e.readExistingEntry(hashLookup.value(entry));
if (entryIsPresent()) {
Expand Down Expand Up @@ -105,10 +107,10 @@ public void doRemove() {
s.innerWriteLock.lock();
try {
// this condition mean -- some other entry taken place of the removed one
if (hashLookup.remove(hlp.hashLookupPos) != hlp.hashLookupPos) {
if (hh.h().hashLookup.remove(s.segmentBase, hlp.hashLookupPos) != hlp.hashLookupPos) {
// if so, should make step back, to compensate step forward on the next iteration,
// to consume the shifted entry
hlp.setHashLookupPos(hashLookup.stepBack(hlp.hashLookupPos));
hlp.setHashLookupPos(hh.h().hashLookup.stepBack(hlp.hashLookupPos));
}
e.innerRemoveEntryExceptHashLookupUpdate();
} finally {
Expand Down

0 comments on commit ff99f5c

Please sign in to comment.