Skip to content

Commit

Permalink
added in a fix for net.openhft.chronicle.map.ReplicateProxyMapUpdated…
Browse files Browse the repository at this point in the history
…ViaSharedMemoryTest.testReplicateProxyMapUpdatedViaSharedMemory

Conflicts:
	src/main/java/net/openhft/chronicle/map/ChannelProvider.java
	src/main/java/net/openhft/chronicle/map/Replica.java
	src/main/java/net/openhft/chronicle/map/ReplicatedChronicleMap.java
  • Loading branch information
Rob Austin authored and leventov committed Jul 25, 2015
1 parent dedaa11 commit e3fc5c9
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 80 deletions.
41 changes: 31 additions & 10 deletions src/main/java/net/openhft/chronicle/map/ChannelProvider.java
Expand Up @@ -41,7 +41,6 @@
import static java.nio.ByteBuffer.wrap; import static java.nio.ByteBuffer.wrap;
import static net.openhft.chronicle.map.Replica.EntryExternalizable; import static net.openhft.chronicle.map.Replica.EntryExternalizable;
import static net.openhft.chronicle.map.Replica.ModificationIterator; import static net.openhft.chronicle.map.Replica.ModificationIterator;
import static net.openhft.chronicle.map.Replica.ModificationNotifier.NOP;


/** /**
* @author Rob Austin. * @author Rob Austin.
Expand Down Expand Up @@ -150,6 +149,8 @@ public void readExternalEntry(
} }
} }
}; };


private final byte localIdentifier; private final byte localIdentifier;
final Replica asReplica = new Replica() { final Replica asReplica = new Replica() {
@Override @Override
Expand All @@ -159,14 +160,18 @@ public byte identifier() {


@Override @Override
public ModificationIterator acquireModificationIterator( public ModificationIterator acquireModificationIterator(
final byte remoteIdentifier, final ModificationNotifier notifier) { final byte remoteIdentifier) {

channelDataLock.writeLock().lock(); channelDataLock.writeLock().lock();
try { try {
final ModificationIterator result = modificationIterator.get(remoteIdentifier); final ModificationIterator result = modificationIterator.get(remoteIdentifier);
if (result != null) if (result != null)
return result; return result;


final ModificationIterator result0 = new ModificationIterator() { final ModificationIterator result0 = new ModificationIterator() {

volatile Replica.ModificationNotifier notifier0;

@Override @Override
public boolean hasNext() { public boolean hasNext() {
channelDataReadLock(); channelDataReadLock();
Expand All @@ -176,7 +181,7 @@ public boolean hasNext() {
for (Replica chronicleChannel : chronicleChannelMap.values()) { for (Replica chronicleChannel : chronicleChannelMap.values()) {
final ModificationIterator modificationIterator = final ModificationIterator modificationIterator =
chronicleChannel.acquireModificationIterator( chronicleChannel.acquireModificationIterator(
remoteIdentifier, notifier); remoteIdentifier);
if (modificationIterator.hasNext()) if (modificationIterator.hasNext())
return true; return true;
} }
Expand All @@ -191,12 +196,11 @@ public boolean nextEntry(@NotNull EntryCallback callback,
final int na) { final int na) {
channelDataReadLock(); channelDataReadLock();
try { try {

for (Map.Entry<Integer, Replica> chronicleChannel : chronicleChannelMap.entrySet()) { for (Map.Entry<Integer, Replica> chronicleChannel : chronicleChannelMap.entrySet()) {


final ModificationIterator modificationIterator = final ModificationIterator modificationIterator =
chronicleChannel.getValue() chronicleChannel.getValue()
.acquireModificationIterator(remoteIdentifier, notifier); .acquireModificationIterator(remoteIdentifier);
if (modificationIterator if (modificationIterator
.nextEntry(callback, chronicleChannel.getKey())) .nextEntry(callback, chronicleChannel.getKey()))
return true; return true;
Expand All @@ -213,14 +217,24 @@ public void dirtyEntries(long fromTimeStamp) {
try { try {
for (Replica chronicleChannel : chronicleChannelMap.values()) { for (Replica chronicleChannel : chronicleChannelMap.values()) {


chronicleChannel.acquireModificationIterator(remoteIdentifier, notifier) chronicleChannel.acquireModificationIterator(remoteIdentifier)
.dirtyEntries(fromTimeStamp); .dirtyEntries(fromTimeStamp);
notifier.onChange(); notifier0.onChange();
} }
} finally { } finally {
channelDataLock.readLock().unlock(); channelDataLock.readLock().unlock();
} }
} }

@Override
public void setModificationNotifier(
@NotNull ModificationNotifier modificationNotifier) {
for (Replica chronicleChannel : chronicleChannelMap.values()) {
chronicleChannel.acquireModificationIterator(remoteIdentifier)
.setModificationNotifier(modificationNotifier);
}
notifier0 = modificationNotifier;
}
}; };


modificationIterator.set((int) remoteIdentifier, result0); modificationIterator.set((int) remoteIdentifier, result0);
Expand Down Expand Up @@ -344,7 +358,7 @@ private void onBootstrapMessage(Bytes bytes) {


// this could be null if one node has a chronicle channel before the other // this could be null if one node has a chronicle channel before the other
if (chronicleChannels[chronicleChannel] != null) { if (chronicleChannels[chronicleChannel] != null) {
chronicleChannels[chronicleChannel].acquireModificationIterator(remoteIdentifier, NOP) chronicleChannels[chronicleChannel].acquireModificationIterator(remoteIdentifier)
.dirtyEntries(lastModificationTime); .dirtyEntries(lastModificationTime);
} }
} }
Expand Down Expand Up @@ -448,14 +462,16 @@ public byte identifier() {


@Override @Override
public ModificationIterator acquireModificationIterator( public ModificationIterator acquireModificationIterator(
final byte remoteIdentifier, final ModificationNotifier modificationNotifier) { final byte remoteIdentifier) {
final ModificationIterator result = systemModificationIterator.get(remoteIdentifier); final ModificationIterator result = systemModificationIterator.get(remoteIdentifier);


if (result != null) if (result != null)
return result; return result;


final PayloadProvider iterator = new PayloadProvider() { final PayloadProvider iterator = new PayloadProvider() {

final Queue<Bytes> payloads = new LinkedTransferQueue<Bytes>(); final Queue<Bytes> payloads = new LinkedTransferQueue<Bytes>();
ModificationNotifier modificationNotifier0;


@Override @Override
public boolean hasNext() { public boolean hasNext() {
Expand All @@ -476,14 +492,19 @@ public void dirtyEntries(long fromTimeStamp) {
// do nothing // do nothing
} }


@Override
public void setModificationNotifier(@NotNull ModificationNotifier modificationNotifier) {
modificationNotifier0 = modificationNotifier;
}

@Override @Override
public void addPayload(Bytes bytes) { public void addPayload(Bytes bytes) {
if (bytes.remaining() == 0) if (bytes.remaining() == 0)
return; return;
payloads.add(bytes); payloads.add(bytes);
// notifies that a change has been made, this will nudge the OP_WRITE // notifies that a change has been made, this will nudge the OP_WRITE
// selector to push this update out over the nio socket // selector to push this update out over the nio socket
modificationNotifier.onChange(); modificationNotifier0.onChange();
} }
}; };


Expand Down
18 changes: 9 additions & 9 deletions src/main/java/net/openhft/chronicle/map/Replica.java
Expand Up @@ -51,14 +51,11 @@ public interface Replica extends Closeable {
* remote node, this weak associated is bound using the {@code identifier}. * remote node, this weak associated is bound using the {@code identifier}.
* *
* @param remoteIdentifier the identifier of the remote node * @param remoteIdentifier the identifier of the remote node
* @param modificationNotifier called when ever there is a change applied to the modification
* iterator
* @return the ModificationIterator dedicated for replication to the remote node with the given * @return the ModificationIterator dedicated for replication to the remote node with the given
* identifier * identifier
* @see #identifier() * @see #identifier()
*/ */
ModificationIterator acquireModificationIterator(byte remoteIdentifier, ModificationIterator acquireModificationIterator(byte remoteIdentifier);
ModificationNotifier modificationNotifier);


/** /**
* Returns the timestamp of the last change from the specified remote node, already replicated * Returns the timestamp of the last change from the specified remote node, already replicated
Expand All @@ -76,11 +73,6 @@ ModificationIterator acquireModificationIterator(byte remoteIdentifier,
* notifies when there is a changed to the modification iterator * notifies when there is a changed to the modification iterator
*/ */
interface ModificationNotifier { interface ModificationNotifier {
public static final ModificationNotifier NOP = new ModificationNotifier() {
@Override
public void onChange() {
}
};


/** /**
* called when ever there is a change applied to the modification iterator * called when ever there is a change applied to the modification iterator
Expand Down Expand Up @@ -120,6 +112,14 @@ interface ModificationIterator {
* @param fromTimeStamp the timestamp from which all entries should be dirty * @param fromTimeStamp the timestamp from which all entries should be dirty
*/ */
void dirtyEntries(long fromTimeStamp); void dirtyEntries(long fromTimeStamp);

/**
* the {@code modificationNotifier} is called when ever there is a change applied to the
* modification iterator
*
* @param modificationNotifier gets notified when a change occurs
*/
void setModificationNotifier(@NotNull final ModificationNotifier modificationNotifier);
} }


/** /**
Expand Down
31 changes: 17 additions & 14 deletions src/main/java/net/openhft/chronicle/map/ReplicatedChronicleMap.java
Expand Up @@ -242,8 +242,7 @@ public byte identifier() {
} }


@Override @Override
public Replica.ModificationIterator acquireModificationIterator( public ModificationIterator acquireModificationIterator(byte remoteIdentifier) {
byte remoteIdentifier, @NotNull final ModificationNotifier modificationNotifier) {
ModificationIterator modificationIterator = modificationIterators.get(remoteIdentifier); ModificationIterator modificationIterator = modificationIterators.get(remoteIdentifier);
if (modificationIterator != null) if (modificationIterator != null)
return modificationIterator; return modificationIterator;
Expand All @@ -259,7 +258,7 @@ public Replica.ModificationIterator acquireModificationIterator(
modIterBitSetSizeInBytes()); modIterBitSetSizeInBytes());


final ModificationIterator newModificationIterator = new ModificationIterator( final ModificationIterator newModificationIterator = new ModificationIterator(
bytes, modificationNotifier); bytes);


modificationIterators.set(remoteIdentifier, newModificationIterator); modificationIterators.set(remoteIdentifier, newModificationIterator);
modIterSet.set(remoteIdentifier); modIterSet.set(remoteIdentifier);
Expand All @@ -271,7 +270,7 @@ public void raiseChange(long segmentIndex, long pos) {
for (long next = modIterSet.nextSetBit(0L); next > 0L; for (long next = modIterSet.nextSetBit(0L); next > 0L;
next = modIterSet.nextSetBit(next + 1L)) { next = modIterSet.nextSetBit(next + 1L)) {
try { try {
modificationIterators.get((int) next).raiseChange(segmentIndex, pos); acquireModificationIterator((byte) next).raiseChange(segmentIndex, pos);
} catch (Exception e) { } catch (Exception e) {
LOG.error("", e); LOG.error("", e);
} }
Expand All @@ -282,7 +281,7 @@ public void dropChange(long segmentIndex, long pos) {
for (long next = modIterSet.nextSetBit(0L); next > 0L; for (long next = modIterSet.nextSetBit(0L); next > 0L;
next = modIterSet.nextSetBit(next + 1L)) { next = modIterSet.nextSetBit(next + 1L)) {
try { try {
modificationIterators.get((int) next).dropChange(segmentIndex, pos); acquireModificationIterator((byte) next).dropChange(segmentIndex, pos);
} catch (Exception e) { } catch (Exception e) {
LOG.error("", e); LOG.error("", e);
} }
Expand All @@ -293,7 +292,8 @@ public void moveChange(long segmentIndex, long oldPos, long newPos) {
for (long next = modIterSet.nextSetBit(0L); next > 0L; for (long next = modIterSet.nextSetBit(0L); next > 0L;
next = modIterSet.nextSetBit(next + 1L)) { next = modIterSet.nextSetBit(next + 1L)) {
try { try {
ModificationIterator modificationIterator = modificationIterators.get((int) next); ModificationIterator modificationIterator =
acquireModificationIterator((byte) next);
if (modificationIterator.dropChange(segmentIndex, oldPos)) if (modificationIterator.dropChange(segmentIndex, oldPos))
modificationIterator.raiseChange(segmentIndex, newPos); modificationIterator.raiseChange(segmentIndex, newPos);
} catch (Exception e) { } catch (Exception e) {
Expand Down Expand Up @@ -483,7 +483,7 @@ public void readExternalEntry(@NotNull Bytes source) {
* @author Rob Austin. * @author Rob Austin.
*/ */
class ModificationIterator implements Replica.ModificationIterator { class ModificationIterator implements Replica.ModificationIterator {
private final ModificationNotifier modificationNotifier; private ModificationNotifier modificationNotifier;
private final SingleThreadedDirectBitSet changesForUpdates; private final SingleThreadedDirectBitSet changesForUpdates;
// to getVolatile when reading changes bits, because we iterate when without lock. // to getVolatile when reading changes bits, because we iterate when without lock.
// hardly this is needed on x86, probably on other architectures too. // hardly this is needed on x86, probably on other architectures too.
Expand All @@ -495,20 +495,23 @@ class ModificationIterator implements Replica.ModificationIterator {
// records the current position of the cursor in the bitset // records the current position of the cursor in the bitset
private volatile long position = -1L; private volatile long position = -1L;


/** public ModificationIterator(@NotNull final Bytes bytes) {
* @param bytes the back the bitset, used to mark which entries have changed
* @param modificationNotifier called when ever there is a change applied
*/
public ModificationIterator(@NotNull final Bytes bytes,
@NotNull final ModificationNotifier modificationNotifier) {
this.modificationNotifier = modificationNotifier;
long bitsPerSegment = bitsPerSegmentInModIterBitSet(); long bitsPerSegment = bitsPerSegmentInModIterBitSet();
segmentIndexShift = Long.numberOfTrailingZeros(bitsPerSegment); segmentIndexShift = Long.numberOfTrailingZeros(bitsPerSegment);
posMask = bitsPerSegment - 1L; posMask = bitsPerSegment - 1L;
changesForUpdates = new SingleThreadedDirectBitSet(bytes); changesForUpdates = new SingleThreadedDirectBitSet(bytes);
changesForIteration = new ATSDirectBitSet(bytes); changesForIteration = new ATSDirectBitSet(bytes);
} }


public ModificationIterator(@NotNull final Bytes bytes, ModificationNotifier notifier) {
this(bytes);
setModificationNotifier(notifier);
}

public void setModificationNotifier(ModificationNotifier modificationNotifier) {
this.modificationNotifier = modificationNotifier;
}

/** /**
* used to merge multiple segments and positions into a single index used by the bit map * used to merge multiple segments and positions into a single index used by the bit map
* *
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/net/openhft/chronicle/map/TcpReplicator.java
Expand Up @@ -647,7 +647,9 @@ private void doHandShaking(@NotNull final SelectionKey key,
"remoteIdentifier=" + remoteIdentifier); "remoteIdentifier=" + remoteIdentifier);


attached.remoteModificationIterator = attached.remoteModificationIterator =
replica.acquireModificationIterator(remoteIdentifier, attached); replica.acquireModificationIterator(remoteIdentifier);

attached.remoteModificationIterator.setModificationNotifier(attached);


writer.writeRemoteBootstrapTimestamp(replica.lastModificationTime(remoteIdentifier)); writer.writeRemoteBootstrapTimestamp(replica.lastModificationTime(remoteIdentifier));


Expand Down
4 changes: 2 additions & 2 deletions src/main/java/net/openhft/chronicle/map/UdpReplicator.java
Expand Up @@ -49,8 +49,8 @@ public UdpReplicator(@NotNull final Replica replica,
super(replicationConfig, replica.identifier()); super(replicationConfig, replica.identifier());


Replica.ModificationIterator modificationIterator = replica.acquireModificationIterator( Replica.ModificationIterator modificationIterator = replica.acquireModificationIterator(
ChronicleMapBuilder.UDP_REPLICATION_MODIFICATION_ITERATOR_ID, this); ChronicleMapBuilder.UDP_REPLICATION_MODIFICATION_ITERATOR_ID);

modificationIterator.setModificationNotifier(this);
setReader(new UdpSocketChannelEntryReader(replicationConfig.udpBufferSize(), entryExternalizable)); setReader(new UdpSocketChannelEntryReader(replicationConfig.udpBufferSize(), entryExternalizable));


setWriter(new UdpSocketChannelEntryWriter(replicationConfig.udpBufferSize(), setWriter(new UdpSocketChannelEntryWriter(replicationConfig.udpBufferSize(),
Expand Down
44 changes: 0 additions & 44 deletions src/test/java/net/openhft/chronicle/TestReplication.java
Expand Up @@ -67,50 +67,6 @@ public void testAllDataGetsReplicated() throws InterruptedException {


private ReplicationHub hubOnServer1; private ReplicationHub hubOnServer1;


@Test
public void testAllDataGetsReplicated2() throws InterruptedException, IOException, ExecutionException {

try
// server 1 with identifier = 1
{
ChronicleMapBuilder<Short, Short> builder =
ChronicleMapBuilder.of(Short.class, Short.class)
.entries(1000);

byte identifier = (byte) 1;

TcpTransportAndNetworkConfig tcpConfig = TcpTransportAndNetworkConfig
.of(8086);

hubOnServer1 = ReplicationHub.builder()
.tcpTransportAndNetwork(tcpConfig)
.maxNumberOfChannels(SIZE)
.createWithId(identifier);

for (short channel1 = 3; channel1 < SIZE; channel1++) {
ChronicleMap<Short, Short> map = builder.instance()
.replicatedViaChannel(hubOnServer1.createChannel(channel1)).create();

for (int i = 0; i < 10; i++) {
map.put((short) i, (short) i);
}

maps.put(channel1, map);

System.out.println("" + channel1);


}

} catch (Exception e) {
e.printStackTrace();
}

for (; ; ) {
Thread.sleep(1000);
}

}




} }
Expand Down

0 comments on commit e3fc5c9

Please sign in to comment.