Skip to content

Commit

Permalink
Add ReplicableEntry.raiseChangedFor(), raiseChangedForAllExcept(), im…
Browse files Browse the repository at this point in the history
…prove MapRemoteOperations
  • Loading branch information
leventov committed Dec 30, 2015
1 parent 2c1721e commit 04adb46
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 107 deletions.
Expand Up @@ -47,6 +47,16 @@ default void raiseChanged() {
d().raiseChanged(); d().raiseChanged();
} }


@Override
default void raiseChangedFor(byte remoteIdentifier) {
d().raiseChangedFor(remoteIdentifier);
}

@Override
default void raiseChangedForAllExcept(byte remoteIdentifier) {
d().raiseChangedForAllExcept(remoteIdentifier);
}

@Override @Override
default boolean isChanged() { default boolean isChanged() {
return d().isChanged(); return d().isChanged();
Expand Down
Expand Up @@ -25,13 +25,26 @@
* @param <K> the key type of accessed {@link ChronicleHash} * @param <K> the key type of accessed {@link ChronicleHash}
*/ */
public interface RemoteOperationContext<K> extends HashContext<K> { public interface RemoteOperationContext<K> extends HashContext<K> {


/**
* {@link ReplicableEntry#originIdentifier()} of the replicated entry.
*/
byte remoteIdentifier(); byte remoteIdentifier();


/** /**
* @see ReplicableEntry#originTimestamp() * {@link ReplicableEntry#originTimestamp()} of the replicated entry.
*/ */
long remoteTimestamp(); long remoteTimestamp();


/**
* Returns the identifier of the current Chronicle Node (this context object is obtained on).
*/
byte currentNodeIdentifier(); byte currentNodeIdentifier();

/**
* Returns the identifier of the node, from which current replication event came from, or -1 if
* unknown or not applicable (the current replication event came not from another node, but,
* e. g., applied manually).
*/
byte remoteNodeIdentifier();
} }
Expand Up @@ -72,21 +72,37 @@ public interface ReplicableEntry {
void updateOrigin(byte newIdentifier, long newTimestamp); void updateOrigin(byte newIdentifier, long newTimestamp);


/** /**
* Suppress the entry, if it was scheduled to be replicated over to remote Chronicle nodes. * Suppress the entry, if it was scheduled to be replicated over to all remote Chronicle nodes.
*/ */
void dropChanged(); void dropChanged();


/** /**
* Propagate the entry, schedule it to be replicated over to remote Chronicle nodes. * Propagate the entry, schedule it to be replicated over to all remote Chronicle nodes.
*/ */
void raiseChanged(); void raiseChanged();


/**
* Propagate the entry, schedule it to be replicated over to the remote Chronicle nodes with
* the specified identifier.
*
* @param remoteIdentifier the identifier of the node to replicate this entry to
*/
void raiseChangedFor(byte remoteIdentifier);

/**
* Propagate the entry, schedule it to be replicated over to all remote Chronicle nodes, except
* the node with the specified identifier.
*
* @param remoteIdentifier the identifier of the node not to replicate this entry to
*/
void raiseChangedForAllExcept(byte remoteIdentifier);

/** /**
* Check is the entry is scheduled to be replicated to the remote Chronicle nodes, to which * Check is the entry is scheduled to be replicated to the remote Chronicle nodes, to which
* the connection is currently established. * the connection is currently established.
* *
* @return {@code true} is the entry is "dirty" locally, i. e. should be replicated to remote * @return {@code true} is the entry is "dirty" locally, i. e. should be replicated to any of
* Chronicle nodes, {@code false} otherwise * remote Chronicle nodes, {@code false} otherwise
*/ */
boolean isChanged(); boolean isChanged();


Expand Down
20 changes: 18 additions & 2 deletions src/main/java/net/openhft/chronicle/map/Replica.java
Expand Up @@ -30,9 +30,25 @@ public interface Replica extends Closeable {


interface QueryContext<K, V> { interface QueryContext<K, V> {


void remotePut(Data<V> newValue, byte remoteIdentifier, long timestamp); /**
* @param newValue the value of the replicated entry
* @param remoteEntryIdentifier origin identifier of the replicated entry
* @param remoteEntryTimestamp timestamp of the replicated entry
* @param remoteNodeIdentifier identifier of the remote node this replication came from,
* or -1 if unknown/not applicable
*/
void remotePut(
Data<V> newValue,
byte remoteEntryIdentifier, long remoteEntryTimestamp, byte remoteNodeIdentifier);


void remoteRemove(byte remoteIdentifier, long timestamp); /**
* @param remoteEntryIdentifier origin identifier of the replicated entry
* @param remoteEntryTimestamp timestamp of the replicated entry
* @param remoteNodeIdentifier identifier of the remote node this replication came from,
* or -1 if unknown/not applicable
*/
void remoteRemove(
byte remoteEntryIdentifier, long remoteEntryTimestamp, byte remoteNodeIdentifier);
} }


/** /**
Expand Down
32 changes: 21 additions & 11 deletions src/main/java/net/openhft/chronicle/map/ReplicatedChronicleMap.java
Expand Up @@ -334,19 +334,30 @@ public ModificationIterator acquireModificationIterator(byte remoteIdentifier) {
} }


public void raiseChange(long tierIndex, long pos) { public void raiseChange(long tierIndex, long pos) {
// -1 is invalid remoteIdentifier => raise change for all
raiseChangeForAllExcept(tierIndex, pos, (byte) -1);
}

public void raiseChangeFor(long tierIndex, long pos, byte remoteIdentifier) {
acquireModificationIterator(remoteIdentifier).raiseChange0(tierIndex, pos);
}

public void raiseChangeForAllExcept(long tierIndex, long pos, byte remoteIdentifier) {
if (tierIndex <= actualSegments) { if (tierIndex <= actualSegments) {
long segmentIndex = tierIndex - 1; long segmentIndex = tierIndex - 1;
long offsetToTierBitSet = segmentIndex * tierModIterBitSetOuterSize; long offsetToTierBitSet = segmentIndex * tierModIterBitSetOuterSize;
for (ModificationIterator modificationIterator : assignedModificationIterators) { for (ModificationIterator it : assignedModificationIterators) {
modificationIterator.raiseChangeInSegment(offsetToTierBitSet, pos); if (it.remoteIdentifier != remoteIdentifier)
it.raiseChangeInSegment(offsetToTierBitSet, pos);
} }
} else { } else {
long extraTierIndex = tierIndex - 1 - actualSegments; long extraTierIndex = tierIndex - 1 - actualSegments;
int bulkIndex = (int) (extraTierIndex >> log2TiersInBulk); int bulkIndex = (int) (extraTierIndex >> log2TiersInBulk);
long offsetToTierBitSet = long offsetToTierBitSet =
(extraTierIndex & (tiersInBulk - 1)) * tierModIterBitSetOuterSize; (extraTierIndex & (tiersInBulk - 1)) * tierModIterBitSetOuterSize;
for (ModificationIterator modificationIterator : assignedModificationIterators) { for (ModificationIterator it : assignedModificationIterators) {
modificationIterator.raiseChangeInTierBulk(bulkIndex, offsetToTierBitSet, pos); if (it.remoteIdentifier != remoteIdentifier)
it.raiseChangeInTierBulk(bulkIndex, offsetToTierBitSet, pos);
} }
} }
} }
Expand Down Expand Up @@ -545,7 +556,7 @@ public void readExternalEntry(@NotNull Bytes source, byte remoteNodeIdentifier)
} else { } else {
assert hunk == ENTRY_HUNK; assert hunk == ENTRY_HUNK;
try (CompiledReplicatedMapQueryContext<K, V, R> remoteOpContext = mapContext()) { try (CompiledReplicatedMapQueryContext<K, V, R> remoteOpContext = mapContext()) {
remoteOpContext.processReplicatedEvent(source); remoteOpContext.processReplicatedEvent(remoteNodeIdentifier, source);
} }
} }
} }
Expand Down Expand Up @@ -858,7 +869,7 @@ public void dirtyEntries(long fromTimeStamp) {
if (bootstrapOnlyLocalEntries) { if (bootstrapOnlyLocalEntries) {
if (e.originIdentifier() == localIdentifier && if (e.originIdentifier() == localIdentifier &&
e.originTimestamp() >= fromTimeStamp) { e.originTimestamp() >= fromTimeStamp) {
raiseChangeDuringBootstrap(c); raiseChange0(c.tierIndex(), c.pos());
} }
} else { } else {
// TODO currently, all entries, originating not from the current node, // TODO currently, all entries, originating not from the current node,
Expand All @@ -867,26 +878,25 @@ public void dirtyEntries(long fromTimeStamp) {
// reconnecting vs. different Map start-up // reconnecting vs. different Map start-up
if (e.originIdentifier() != localIdentifier || if (e.originIdentifier() != localIdentifier ||
e.originTimestamp() >= fromTimeStamp) { e.originTimestamp() >= fromTimeStamp) {
raiseChangeDuringBootstrap(c); raiseChange0(c.tierIndex(), c.pos());
} }
} }
}); });
} }
} }
} }


private void raiseChangeDuringBootstrap(CompiledReplicatedMapIterationContext<K, V, R> c) { void raiseChange0(long tierIndex, long pos) {
long tierIndex = c.tierIndex();
if (tierIndex <= actualSegments) { if (tierIndex <= actualSegments) {
long segmentIndex = tierIndex - 1; long segmentIndex = tierIndex - 1;
long offsetToTierBitSet = segmentIndex * tierModIterBitSetOuterSize; long offsetToTierBitSet = segmentIndex * tierModIterBitSetOuterSize;
raiseChangeInSegment(offsetToTierBitSet, c.pos()); raiseChangeInSegment(offsetToTierBitSet, pos);
} else { } else {
long extraTierIndex = tierIndex - 1 - actualSegments; long extraTierIndex = tierIndex - 1 - actualSegments;
int bulkIndex = (int) (extraTierIndex >> log2TiersInBulk); int bulkIndex = (int) (extraTierIndex >> log2TiersInBulk);
long offsetToTierBitSet = long offsetToTierBitSet =
(extraTierIndex & (tiersInBulk - 1)) * tierModIterBitSetOuterSize; (extraTierIndex & (tiersInBulk - 1)) * tierModIterBitSetOuterSize;
raiseChangeInTierBulk(bulkIndex, offsetToTierBitSet, c.pos()); raiseChangeInTierBulk(bulkIndex, offsetToTierBitSet, pos);
} }
} }
} }
Expand Down
Expand Up @@ -119,6 +119,20 @@ public void raiseChanged() {
ru.raiseChange(); ru.raiseChange();
} }


@Override
public void raiseChangedFor(byte remoteIdentifier) {
checkOnEachPublicOperation.checkOnEachPublicOperation();
s.innerUpdateLock.lock();
ru.raiseChangeFor(remoteIdentifier);
}

@Override
public void raiseChangedForAllExcept(byte remoteIdentifier) {
checkOnEachPublicOperation.checkOnEachPublicOperation();
s.innerUpdateLock.lock();
ru.raiseChangeForAllExcept(remoteIdentifier);
}

@Override @Override
public boolean isChanged() { public boolean isChanged() {
checkOnEachPublicOperation.checkOnEachPublicOperation(); checkOnEachPublicOperation.checkOnEachPublicOperation();
Expand Down
Expand Up @@ -48,10 +48,10 @@ public Data<V> dummyZeroValue() {
return dummyValue; return dummyValue;
} }


public void processReplicatedEvent(Bytes replicatedInputBytes) { public void processReplicatedEvent(byte remoteNodeIdentifier, Bytes replicatedInputBytes) {
long timestamp = replicatedInputBytes.readStopBit(); long timestamp = replicatedInputBytes.readStopBit();
byte identifier = replicatedInputBytes.readByte(); byte identifier = replicatedInputBytes.readByte();
ru.initReplicationUpdate(identifier, timestamp); ru.initReplicationUpdate(identifier, timestamp, remoteNodeIdentifier);


boolean isDeleted = replicatedInputBytes.readBoolean(); boolean isDeleted = replicatedInputBytes.readBoolean();
long keySize = mh.m().keySizeMarshaller.readSize(replicatedInputBytes); long keySize = mh.m().keySizeMarshaller.readSize(replicatedInputBytes);
Expand All @@ -73,15 +73,18 @@ public void processReplicatedEvent(Bytes replicatedInputBytes) {
} }


@Override @Override
public void remotePut(Data<V> newValue, byte remoteIdentifier, long timestamp) { public void remotePut(
ru.initReplicationUpdate(remoteIdentifier, timestamp); Data<V> newValue,
byte remoteEntryIdentifier, long remoteEntryTimestamp, byte remoteNodeIdentifier) {
ru.initReplicationUpdate(remoteEntryIdentifier, remoteEntryTimestamp, remoteNodeIdentifier);
s.innerUpdateLock.lock(); s.innerUpdateLock.lock();
mh.m().remoteOperations.put(this, newValue); mh.m().remoteOperations.put(this, newValue);
} }


@Override @Override
public void remoteRemove(byte remoteIdentifier, long timestamp) { public void remoteRemove(
ru.initReplicationUpdate(remoteIdentifier, timestamp); byte remoteEntryIdentifier, long remoteEntryTimestamp, byte remoteNodeIdentifier) {
ru.initReplicationUpdate(remoteEntryIdentifier, remoteEntryTimestamp, remoteNodeIdentifier);
s.innerWriteLock.lock(); s.innerWriteLock.lock();
mh.m().remoteOperations.remove(this); mh.m().remoteOperations.remove(this);
} }
Expand Down
Expand Up @@ -34,14 +34,18 @@ public abstract class ReplicationUpdate<K> implements RemoteOperationContext<K>


@Stage("ReplicationUpdate") public byte innerRemoteIdentifier = (byte) 0; @Stage("ReplicationUpdate") public byte innerRemoteIdentifier = (byte) 0;
@Stage("ReplicationUpdate") public long innerRemoteTimestamp; @Stage("ReplicationUpdate") public long innerRemoteTimestamp;
@Stage("ReplicationUpdate") public byte innerRemoteNodeIdentifier;


public abstract boolean replicationUpdateInit(); public abstract boolean replicationUpdateInit();


public void initReplicationUpdate(byte identifier, long timestamp) { public void initReplicationUpdate(byte identifier, long timestamp, byte remoteNodeIdentifier) {
innerRemoteTimestamp = timestamp; innerRemoteTimestamp = timestamp;
if (identifier == 0) if (identifier == 0)
throw new IllegalStateException("identifier can't be 0"); throw new IllegalStateException("identifier can't be 0");
innerRemoteIdentifier = identifier; innerRemoteIdentifier = identifier;
if (remoteNodeIdentifier == 0)
throw new IllegalStateException("remote node identifier can't be 0");
innerRemoteNodeIdentifier = remoteNodeIdentifier;
} }


public void dropChange() { public void dropChange() {
Expand All @@ -62,6 +66,14 @@ public void raiseChange() {
mh.m().raiseChange(s.tierIndex, e.pos); mh.m().raiseChange(s.tierIndex, e.pos);
} }


public void raiseChangeFor(byte remoteIdentifier) {
mh.m().raiseChangeFor(s.tierIndex, e.pos, remoteIdentifier);
}

public void raiseChangeForAllExcept(byte remoteIdentifier) {
mh.m().raiseChangeForAllExcept(s.tierIndex, e.pos, remoteIdentifier);
}

public boolean changed() { public boolean changed() {
return mh.m().isChanged(s.tierIndex, e.pos); return mh.m().isChanged(s.tierIndex, e.pos);
} }
Expand All @@ -78,6 +90,12 @@ public byte remoteIdentifier() {
return innerRemoteIdentifier; return innerRemoteIdentifier;
} }


@Override
public byte remoteNodeIdentifier() {
checkOnEachPublicOperation.checkOnEachPublicOperation();
return innerRemoteNodeIdentifier;
}

@Override @Override
public byte currentNodeIdentifier() { public byte currentNodeIdentifier() {
return mh.m().identifier(); return mh.m().identifier();
Expand Down

0 comments on commit 04adb46

Please sign in to comment.