Skip to content

Commit

Permalink
Adding proxy write tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Apr 18, 2013
1 parent fb8f1a8 commit 927b02c
Show file tree
Hide file tree
Showing 6 changed files with 686 additions and 39 deletions.
33 changes: 27 additions & 6 deletions src/java/voldemort/store/rebalancing/RedirectingStore.java
Expand Up @@ -39,6 +39,7 @@
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Time;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Version;
Expand Down Expand Up @@ -107,7 +108,8 @@ private RebalancePartitionsInfo redirectingKey(ByteArray key) {
@Override
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
RebalancePartitionsInfo stealInfo = redirectingKey(key);

logger.info("BEGIN GET from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
/**
* If I am rebalancing for this key, try to do remote get(), put it
* locally first to get the correct version ignoring any
Expand All @@ -119,14 +121,16 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
if(stealInfo != null) {
proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms);
}

logger.info("END GET from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
return getInnerStore().get(key, transforms);
}

@Override
public List<Version> getVersions(ByteArray key) {
RebalancePartitionsInfo stealInfo = redirectingKey(key);

logger.info("BEGIN GETVERSIONS from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
/**
* If I am rebalancing for this key, try to do remote get(), put it
* locally first to get the correct version ignoring any
Expand All @@ -138,7 +142,8 @@ public List<Version> getVersions(ByteArray key) {
if(stealInfo != null) {
proxyGetAndLocalPut(key, stealInfo.getDonorId(), null);
}

logger.info("END GETVERSIONS from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
return getInnerStore().getVersions(key);
}

Expand Down Expand Up @@ -167,7 +172,8 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {
RebalancePartitionsInfo stealInfo = redirectingKey(key);

logger.info("BEGIN PUT from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()) + " value " + value);
/**
* If I am rebalancing for this key, try to do remote get() , put it
* locally first to get the correct version ignoring any
Expand All @@ -181,6 +187,8 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
proxyPut(key, value, transforms, stealInfo.getDonorId());
// TODO if I fail though for some reason, the aborting rebalance can
// surface phantom writes
logger.info("END PUT from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()) + " value " + value);
getInnerStore().put(key, value, transforms);
}

Expand Down Expand Up @@ -245,6 +253,8 @@ private List<Versioned<byte[]>> proxyGet(ByteArray key, int donorNodeId, byte[]
private void proxyPut(ByteArray key, Versioned<byte[]> value, byte[] transforms, int donorNodeId) {
Node donorNode = metadata.getCluster().getNodeById(donorNodeId);
checkNodeAvailable(donorNode);
logger.info("BEGIN PROXY PUT for donor: " + donorNodeId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));
long startNs = System.nanoTime();
try {
Store<ByteArray, byte[], byte[]> redirectingStore = getRedirectingSocketStore(getName(),
Expand All @@ -254,7 +264,14 @@ private void proxyPut(ByteArray key, Versioned<byte[]> value, byte[] transforms,
} catch(UnreachableStoreException e) {
recordException(donorNode, startNs, e);
throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode, e);
} catch(ObsoleteVersionException ove) {
logger.error("OVE in proxy put for donor: " + donorNodeId + " from stealer:"
+ metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()),
ove);
}
logger.info("END PROXY PUT for donor: " + donorNodeId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));
}

private void checkNodeAvailable(Node donorNode) {
Expand Down Expand Up @@ -328,14 +345,18 @@ private List<Versioned<byte[]>> proxyGetAndLocalPut(ByteArray key,
int donorId,
byte[] transforms)
throws VoldemortException {
logger.info("BEGIN PROXY GET LOCAL PUT for donor: " + donorId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));
List<Versioned<byte[]>> proxyValues = proxyGet(key, donorId, transforms);
for(Versioned<byte[]> proxyValue: proxyValues) {
try {
getInnerStore().put(key, proxyValue, null);
} catch(ObsoleteVersionException e) {
// ignore these
logger.info("OVE in proxy get local put", e);
}
}
logger.info("END PROXY GET LOCAL PUT for donor: " + donorId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));
return proxyValues;
}

Expand Down
Expand Up @@ -113,6 +113,9 @@ public List<Node> getNodes(ByteArray key, Operation op) {
if(clientZoneNodes != null && clientZoneNodes.size() > 0)
nodes.addAll(clientZoneNodes);
// ...followed by other zones sorted by proximity list
// NOTE : its imperative that the proximity list does not contain the
// client zone. If this happens, we will add those nodes twice to the
// list
for(int index = 0; index < zoneProximityList.size(); index++) {
List<Node> zoneNodes = zoneIdToNode.get(zoneProximityList.get(index));
if(zoneNodes != null && zoneNodes.size() > 0) {
Expand Down
6 changes: 2 additions & 4 deletions test/common/voldemort/ServerTestUtils.java
Expand Up @@ -347,7 +347,7 @@ public static Cluster getLocalZonedCluster(int numberOfNodes,
for(int i = 0; i < numberOfZones; i++) {
LinkedList<Integer> proximityList = Lists.newLinkedList();
int zoneId = i + 1;
for(int j = 0; j < numberOfZones; j++) {
for(int j = 0; j < numberOfZones - 1; j++) {
proximityList.add(zoneId % numberOfZones);
zoneId++;
}
Expand Down Expand Up @@ -405,7 +405,7 @@ public static Cluster getLocalCluster(int numberOfNodes,
for(int i = 0; i < numberOfZones; i++) {
LinkedList<Integer> proximityList = Lists.newLinkedList();
int zoneId = i + 1;
for(int j = 0; j < numberOfZones; j++) {
for(int j = 0; j < numberOfZones - 1; j++) {
proximityList.add(zoneId % numberOfZones);
zoneId++;
}
Expand Down Expand Up @@ -840,7 +840,6 @@ public static boolean waitForAsyncOperationOnServer(VoldemortServer server,
matchingOperationIds = service.getMatchingAsyncOperationList(asyncOperationPattern,
true);
if(matchingOperationIds.size() > 0) {
System.err.println(">>" + matchingOperationIds);
break;
}
}
Expand All @@ -849,7 +848,6 @@ public static boolean waitForAsyncOperationOnServer(VoldemortServer server,
List<Integer> completedOps = new ArrayList<Integer>(matchingOperationIds.size());
for(Integer op: matchingOperationIds) {
if(service.isComplete(op)) {
System.err.println("Operation " + op + " is complete");
completedOps.add(op);
}
}
Expand Down

0 comments on commit 927b02c

Please sign in to comment.