Skip to content

Commit

Permalink
HCOLL-119 reveting Romans changes as they broke tests, but slowly int…
Browse files Browse the repository at this point in the history
…roducing them back in to find where the build breaks. Checking in here, as this set works but not all of Romans changes are currently included, they will be added soon.
  • Loading branch information
Rob Austin committed Aug 11, 2014
1 parent f3d279c commit d0f5449
Showing 1 changed file with 88 additions and 35 deletions.
Expand Up @@ -25,18 +25,19 @@
import net.openhft.lang.model.Byteable;
import net.openhft.lang.model.DataValueClasses;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReferenceArray;

import static net.openhft.collections.AbstractVanillaSharedHashMap.Hasher.hash;
import static net.openhft.collections.Replica.EntryResolver;
import static net.openhft.lang.collection.DirectBitSet.NOT_FOUND;

Expand Down Expand Up @@ -119,13 +120,8 @@ public VanillaSharedReplicatedHashMap(@NotNull SharedHashMapBuilder builder,
this.eventListener = modificationDelegator;
}

/**
* this is used to iterate over all the modification iterators
*
* @return
*/
int assignedModIterBitSetSizeInBytes() {
return (int) align64((long) Math.ceil(127 + RESERVED_MOD_ITER / 8));
private int assignedModIterBitSetSizeInBytes() {
return (int) align64((127 + RESERVED_MOD_ITER) / 8);
}

@Override
Expand Down Expand Up @@ -386,8 +382,7 @@ class Segment extends VanillaSharedHashMap<K, V>.Segment {
@Override
void createHashLookups(long start) {
hashLookupLiveAndDeleted = createMultiMap(start);

start += sizeOfMultiMap();
start += align64(sizeOfMultiMap() + sizeOfMultiMapBitSet());
hashLookupLiveOnly = createMultiMap(start);
}

Expand Down Expand Up @@ -880,42 +875,40 @@ public void accept(int hash, int pos) {
}
}

@Override
public Entry<K, V> getEntry(long pos) {
long offset = offsetFromPos(pos);
NativeBytes entry = entry(offset);
entry.readStopBit();
K key = entry.readInstance(kClass, null);
long timestamp = entry.readLong();
entry.skip(2L); // identifier and isDeleted flag
V value = readValue(entry, null);
return new TimestampTrackingEntry(key, value, timestamp);
}

/**
* removes all the entries
*/
@Override
void clear() {

// we have to make sure that every calls notifies on remove,
// so that the replicators can pick it up
for (K k : keySet()) {
VanillaSharedReplicatedHashMap.this.remove(k);
}

}
}

void visit(IntIntMultiMap.EntryConsumer entryConsumer) {
hashLookupLiveOnly.forEach(entryConsumer);
class TimestampTrackingEntry extends SimpleEntry<K, V> {
long timestamp;
public TimestampTrackingEntry(K key, V value, long timestamp) {
super(key, value);
this.timestamp = timestamp;
}

/**
* returns a null value if the entry has been deleted
*
* @param pos
* @return a null value if the entry has been deleted
*/
@Nullable
public Entry<K, V> getEntry(long pos) {
long offset = offsetFromPos(pos);
NativeBytes entry = entry(offset);
entry.readStopBit();
K key = entry.readInstance(kClass, null); //todo: readUsing?

// skip timestamp and id
entry.skip(10);

V value = readValue(entry, null); //todo: reusable container
return new WriteThroughEntry(key, value);
@Override
public V setValue(V value) {
long newTimestamp = timestamp = timeProvider.currentTimeMillis();
put(getKey(), value, localIdentifier, newTimestamp);
return super.setValue(value);
}
}

Expand Down Expand Up @@ -1074,6 +1067,66 @@ public void readExternalEntry(@NotNull Bytes source) {
}
}

class EntryIterator extends VanillaSharedHashMap<K, V>.EntryIterator {
@Override
void removePresent(VanillaSharedHashMap.Segment seg, int pos) {
@SuppressWarnings("unchecked")
Segment segment = (Segment) seg;

final long offset = segment.offsetFromPos(pos);
final NativeBytes entry = segment.entry(offset);
final long limit = entry.limit();

final long keyLen = entry.readStopBit();
long keyPosition = entry.position();
entry.skip(keyLen);
long timestamp = entry.readLong();
entry.position(keyPosition);
if (timestamp > ((TimestampTrackingEntry) returnedEntry).timestamp) {
// The entry was updated after being returned from iterator.next()
// Check that it is still the entry with the same key
K key = returnedEntry.getKey();
DirectBytes returnedKeyBytes = getKeyAsBytes(key);
if (returnedKeyBytes.remaining() != keyLen || !entry.startsWith(returnedKeyBytes)) {
// The case:
// 1. iterator.next() - thread 1
// 2. map.put() which cause relocation of the key, returned above - thread 2
// OR map.remove() which remove this key - thread 2
// 3. map.put() which place a new key on the `pos` in current segment - thread 3
// 4. iterator.remove() - thread 1
VanillaSharedReplicatedHashMap.this.remove(key);
return;
}
}
entry.limit(entry.position() + keyLen);
final int segmentHash = hasher.segmentHash(hash(entry));
entry.limit(limit);

segment.hashLookupLiveOnly.remove(segmentHash, pos);
segment.decrementSize();

entry.skip(keyLen);
entry.writeLong(timeProvider.currentTimeMillis());
entry.writeByte(localIdentifier);
entry.writeBoolean(true);

segment.notifyRemoved(offset, returnedEntry.getKey(), returnedEntry.getValue(), pos);
}
}

class EntrySet extends VanillaSharedHashMap<K, V>.EntrySet {
@NotNull
@Override
public Iterator<Entry<K, V>> iterator() {
return new EntryIterator();
}
}

@NotNull
@Override
public Set<Entry<K, V>> entrySet() {
return new EntrySet();
}

/**
* receive an update from the map, via the SharedMapEventListener and delegates the changes to the currently active
Expand Down

0 comments on commit d0f5449

Please sign in to comment.