Skip to content

Commit

Permalink
Replication updates with local identifier shouldn't use the same time…
Browse files Browse the repository at this point in the history
…stamp, because this allows two different updates which replace each other on different nodes, and don't converge. Extend default TimeProvider precision to nanoseconds, to exclude possibility of 'microsecond overflow' on frequent changes to the same entry
  • Loading branch information
leventov committed Dec 26, 2015
1 parent 0ead523 commit 1743f1c
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 19 deletions.
Expand Up @@ -396,7 +396,7 @@ public interface ChronicleHashBuilder<K, H extends ChronicleHash<K, ?, ?, ?>,
* replication consensus protocol (conflicting data updates resolution). * replication consensus protocol (conflicting data updates resolution).
* *
* <p>Default time provider uses system time ({@link System#currentTimeMillis()}) in * <p>Default time provider uses system time ({@link System#currentTimeMillis()}) in
* <i>microsecond</i> precision. * <i>nanosecond</i> precision.
* *
* @param timeProvider a new time provider for replication needs * @param timeProvider a new time provider for replication needs
* @return this builder back * @return this builder back
Expand Down
Expand Up @@ -49,7 +49,7 @@ public static AcceptanceDecision decideOnRemoteModification(
long originTimestamp = entry.originTimestamp(); long originTimestamp = entry.originTimestamp();
boolean shouldAccept = remoteTimestamp > originTimestamp || boolean shouldAccept = remoteTimestamp > originTimestamp ||
(remoteTimestamp == originTimestamp && (remoteTimestamp == originTimestamp &&
context.remoteIdentifier() <= entry.originIdentifier()); context.remoteIdentifier() < entry.originIdentifier());
return shouldAccept ? AcceptanceDecision.ACCEPT : AcceptanceDecision.DISCARD; return shouldAccept ? AcceptanceDecision.ACCEPT : AcceptanceDecision.DISCARD;
} }


Expand Down
Expand Up @@ -184,7 +184,7 @@ enum ChecksumEntries {YES, NO, IF_PERSISTED}
private boolean removeReturnsNull = false; private boolean removeReturnsNull = false;


// replication // replication
private TimeProvider timeProvider = MicrosecondPrecisionSystemTimeProvider.instance(); private TimeProvider timeProvider = NanosecondPrecisionSystemTimeProvider.instance();
/** /**
* Default timeout is 1 minute. Even loopback tests converge often in the course of seconds, * Default timeout is 1 minute. Even loopback tests converge often in the course of seconds,
* let alone WAN replication over many nodes might take tens of seconds. * let alone WAN replication over many nodes might take tens of seconds.
Expand Down
Expand Up @@ -21,31 +21,31 @@
import java.io.ObjectStreamException; import java.io.ObjectStreamException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;


final class MicrosecondPrecisionSystemTimeProvider extends TimeProvider { final class NanosecondPrecisionSystemTimeProvider extends TimeProvider {
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


private static final MicrosecondPrecisionSystemTimeProvider INSTANCE = private static final NanosecondPrecisionSystemTimeProvider INSTANCE =
new MicrosecondPrecisionSystemTimeProvider(); new NanosecondPrecisionSystemTimeProvider();


public static MicrosecondPrecisionSystemTimeProvider instance() { public static NanosecondPrecisionSystemTimeProvider instance() {
return INSTANCE; return INSTANCE;
} }


private MicrosecondPrecisionSystemTimeProvider() {} private NanosecondPrecisionSystemTimeProvider() {}


@Override @Override
public long currentTime() { public long currentTime() {
return MILLISECONDS.toMicros(System.currentTimeMillis()); return MILLISECONDS.toNanos(System.currentTimeMillis());
} }


@Override @Override
public long systemTimeIntervalBetween( public long systemTimeIntervalBetween(
long earlierTimeMicros, long laterTimeMicros, TimeUnit systemTimeIntervalUnit) { long earlierTimeNanos, long laterTimeNanos, TimeUnit systemTimeIntervalUnit) {
long intervalMicros = laterTimeMicros - earlierTimeMicros; long intervalNanos = laterTimeNanos - earlierTimeNanos;
return systemTimeIntervalUnit.convert(intervalMicros, MICROSECONDS); return systemTimeIntervalUnit.convert(intervalNanos, NANOSECONDS);
} }


@Override @Override
Expand Down
Expand Up @@ -128,12 +128,7 @@ public boolean isChanged() {
public void updatedReplicationStateOnPresentEntry() { public void updatedReplicationStateOnPresentEntry() {
if (!ru.replicationUpdateInit()) { if (!ru.replicationUpdateInit()) {
s.innerWriteLock.lock(); s.innerWriteLock.lock();
long timestamp; long timestamp = Math.max(timestamp() + 1, mh.m().timeProvider.currentTime());
if (identifier() != mh.m().identifier()) {
timestamp = Math.max(timestamp() + 1, mh.m().timeProvider.currentTime());
} else {
timestamp = mh.m().timeProvider.currentTime();
}
updateReplicationState(mh.m().identifier(), timestamp); updateReplicationState(mh.m().identifier(), timestamp);
} }
} }
Expand Down

0 comments on commit 1743f1c

Please sign in to comment.