Skip to content

Commit

Permalink
using treemap to store clocks in VectorClock
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiewu committed May 8, 2013
1 parent 8fd7ea6 commit 3a5e6f6
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 34 deletions.
87 changes: 53 additions & 34 deletions src/java/voldemort/versioning/VectorClock.java
Expand Up @@ -19,15 +19,17 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeMap;

import voldemort.annotations.concurrency.NotThreadsafe;
import voldemort.utils.ByteUtils;
import voldemort.utils.Utils;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

/**
* A vector of the number of writes mastered by each node. The vector is stored
Expand All @@ -45,7 +47,7 @@ public class VectorClock implements Version, Serializable {
private static final int MAX_NUMBER_OF_VERSIONS = Short.MAX_VALUE;

/* A map of versions keyed by nodeId */
private final Map<Short, Long> versionMap;
private final TreeMap<Short, Long> versionMap;

/*
* The time of the last update on the server on which the update was
Expand All @@ -61,7 +63,7 @@ public VectorClock() {
}

public VectorClock(long timestamp) {
this.versionMap = new HashMap<Short, Long>(0);
this.versionMap = new TreeMap<Short, Long>();
this.timestamp = timestamp;
}

Expand All @@ -72,14 +74,20 @@ public VectorClock(long timestamp) {
*/
@Deprecated
public VectorClock(List<ClockEntry> versions, long timestamp) {
this.versionMap = new HashMap<Short, Long>(0);
this.versionMap = new TreeMap<Short, Long>();
this.timestamp = timestamp;
for(ClockEntry clockEntry: versions) {
this.versionMap.put(clockEntry.getNodeId(), clockEntry.getVersion());
}
}

public VectorClock(Map<Short, Long> versionMap, long timestamp) {
/**
* Only used for cloning
*
* @param versionMap
* @param timestamp
*/
private VectorClock(TreeMap<Short, Long> versionMap, long timestamp) {
this.versionMap = Utils.notNull(versionMap);
this.timestamp = timestamp;
}
Expand Down Expand Up @@ -114,7 +122,7 @@ public VectorClock(byte[] bytes, int offset) {
throw new IllegalArgumentException("Too few bytes: expected at least " + minimumBytes
+ " but found only " + bytes.length + ".");

this.versionMap = new HashMap<Short, Long>(numEntries);
this.versionMap = new TreeMap<Short, Long>();
int index = 3 + offset;
for(int i = 0; i < numEntries; i++) {
short nodeId = ByteUtils.readShort(bytes, index);
Expand All @@ -141,12 +149,11 @@ public int toBytes(byte[] buf, int offset) {
offset++;

int clockEntrySize = ByteUtils.SIZE_OF_SHORT + versionSize;
for(Map.Entry<Short, Long> entry: versionMap.entrySet()) {
ByteUtils.writeShort(buf, entry.getKey(), offset);
ByteUtils.writeBytes(buf,
entry.getValue(),
offset + ByteUtils.SIZE_OF_SHORT,
versionSize);
SortedSet<Short> nodeIds = versionMap.navigableKeySet();
for(Short nodeId: nodeIds) {
Long version = versionMap.get(nodeId);
ByteUtils.writeShort(buf, nodeId, offset);
ByteUtils.writeBytes(buf, version, offset + ByteUtils.SIZE_OF_SHORT, versionSize);
offset += clockEntrySize;
}
ByteUtils.writeLong(buf, this.timestamp, offset);
Expand Down Expand Up @@ -186,8 +193,7 @@ public void incrementVersion(int node, long time) {
}

/**
* List Get new vector clock based on this clock but incremented on index
* nodeId
* Get new vector clock based on this clock but incremented on index nodeId
*
* @param nodeId The id of the node to increment
* @return A vector clock equal on each element execept that indexed by
Expand All @@ -201,7 +207,7 @@ public VectorClock incremented(int nodeId, long time) {

@Override
public VectorClock clone() {
return new VectorClock(Maps.newHashMap(versionMap), this.timestamp);
return new VectorClock(Maps.newTreeMap(versionMap), this.timestamp);
}

@Override
Expand Down Expand Up @@ -273,10 +279,13 @@ public Occurred compare(Version v) {
}

/**
* Compare two VectorClocks, the outcomes will be one of the following: --
* v1 <= v2 : BEFORE (implies v2 AFTER or BEFORE v1)<br>
* v1 > v2 => AFTER (implies v2 BEFORE v1)<br>
* v1 <> v2 => CONCURRENTLY (implies v2 CONCURRENTLY to v1)
* Compare two VectorClocks, the outcomes will be one of the following: <br>
* -- Clock 1 is BEFORE clock 2, if there exists an nodeId such that
* c1(nodeId) <= c2(nodeId) and there does not exist another nodeId such
* that c1(nodeId) > c2(nodeId). <br>
* -- Clock 1 is CONCURRENT to clock 2 if there exists an nodeId, nodeId2
* such that c1(nodeId) < c2(nodeId) and c1(nodeId2) > c2(nodeId2)<br>
* -- Clock 1 is AFTER clock 2 otherwise
*
* @param v1 The first VectorClock
* @param v2 The second VectorClock
Expand All @@ -288,24 +297,34 @@ public static Occurred compare(VectorClock v1, VectorClock v2) {
boolean v1Bigger = false;
boolean v2Bigger = false;

Map<Short, Long> v1MapCopy = Maps.newHashMap(v1.versionMap);
for(Map.Entry<Short, Long> v2Entry: v2.versionMap.entrySet()) {
Long v1Version = v1MapCopy.remove(v2Entry.getKey());
Long v2Version = v2Entry.getValue();
if(v1Version == null) {
v2Bigger = true;
SortedSet<Short> v1Nodes = v1.versionMap.navigableKeySet();
SortedSet<Short> v2Nodes = v2.versionMap.navigableKeySet();
// get clocks(nodeIds) that both v1 and v2 has
SortedSet<Short> commonNodes = Sets.newTreeSet(v1Nodes);
commonNodes.retainAll(v2Nodes);
// if v1 has more nodes than common nodes
// v1 has clocks that v2 does not
if(v1Nodes.size() > commonNodes.size()) {
v1Bigger = true;
}
// if v2 has more nodes than common nodes
// v2 has clocks that v1 does not
if(v2Nodes.size() > commonNodes.size()) {
v2Bigger = true;
}
// compare the common parts
for(Short nodeId: commonNodes) {
// no need to compare more
if(v1Bigger && v2Bigger) {
break;
}
long v1Version = v1.versionMap.get(nodeId);
long v2Version = v2.versionMap.get(nodeId);
if(v1Version > v2Version) {
v1Bigger = true;
} else if(v1Version < v2Version) {
v2Bigger = true;
} else if(v1Version > v2Version) {
v1Bigger = true;
}
// two clockEntry (of same nodeId) with the same version
// will not make either version bigger
}
// if there are clockEntry not touched in v1MapCopy then v1 has bigger
// clocks
if(v1MapCopy.size() > 0) {
v1Bigger = true;
}

/*
Expand Down
37 changes: 37 additions & 0 deletions test/unit/voldemort/versioning/VectorClockTest.java
Expand Up @@ -55,6 +55,8 @@ public void testComparisons() {
getClock(1).compare(getClock(2)) == Occurred.CONCURRENTLY);
assertTrue("Clocks with different events should be concurrent.",
getClock(1, 1, 2).compare(getClock(1, 1, 3)) == Occurred.CONCURRENTLY);
assertTrue("Clocks with different events should be concurrent.",
getClock(1, 2, 3, 3).compare(getClock(1, 1, 2, 3)) == Occurred.CONCURRENTLY);
assertTrue(getClock(2, 2).compare(getClock(1, 2, 2, 3)) == Occurred.BEFORE
&& getClock(1, 2, 2, 3).compare(getClock(2, 2)) == Occurred.AFTER);
}
Expand Down Expand Up @@ -104,6 +106,41 @@ public void testSerialization() {
new VectorClock(clock.toBytes()));
}

@Test
public void testSerializationBackwardCompatibility() {
assertEquals("The empty clock serializes incorrectly.",
getClock(),
new VectorClock(getClock().toBytes()));
VectorClock clock = getClock(1, 1, 2, 3, 4, 4, 6);
// Old Vector Clock would serialize to this:
// 0 5 1 0 1 2 0 2 1 0 3 1 0 4 2 0 6 1 [timestamp]
byte[] knownSerializedHead = { 0, 5, 1, 0, 1, 2, 0, 2, 1, 0, 3, 1, 0, 4, 2, 0, 6, 1 };
byte[] serialized = clock.toBytes();
for(int index = 0; index < knownSerializedHead.length; index++) {
assertEquals("byte at index " + index + " is not equal",
knownSerializedHead[index],
serialized[index]);
}
}

/**
* Pre-condition: timestamp is ignored in determine vector clock equality
*/
@Test
public void testDeserializationBackwardCompatibility() {
assertEquals("The empty clock serializes incorrectly.",
getClock(),
new VectorClock(getClock().toBytes()));
VectorClock clock = getClock(1, 1, 2, 3, 4, 4, 6);
// Old Vector Clock would serialize to this:
// 0 5; 1; 0 1, 2; 0 2, 1; 0 3, 1; 0 4, 2; 0 6, 1; [timestamp=random]
byte[] knownSerialized = { 0, 5, 1, 0, 1, 2, 0, 2, 1, 0, 3, 1, 0, 4, 2, 0, 6, 1, 0, 0, 1,
0x3e, 0x7b, (byte) 0x8c, (byte) 0x9d, 0x19 };
assertEquals("vector clock does not deserialize correctly on given byte array",
clock,
new VectorClock(knownSerialized));
}

@Test
public void testSerializationWraps() {
VectorClock clock = getClock(1, 1, 2, 3, 3, 6);
Expand Down

0 comments on commit 3a5e6f6

Please sign in to comment.