Skip to content

Commit

Permalink
Follow fixes to abortable rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed May 8, 2013
1 parent 63beb58 commit f31e48f
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 112 deletions.
7 changes: 5 additions & 2 deletions src/java/voldemort/server/rebalance/Rebalancer.java
Expand Up @@ -133,6 +133,10 @@ public synchronized void releaseRebalancingPermit(int nodeId) {
* bridges have not been setup and we either miss a proxy put or return a
* null for get/getalls
*
* TODO:refactor The rollback logic here is too convoluted. Specifically,
* the independent updates to each key could be split up into their own
* methods.
*
* @param cluster Cluster metadata to change
* @param rebalancePartitionsInfo List of rebalance partitions info
* @param swapRO Boolean to indicate swapping of RO store
Expand Down Expand Up @@ -317,10 +321,9 @@ private void changeCluster(String clusterKey, final Cluster cluster) {
try {
metadataStore.writeLock.lock();
try {
// TODO why increment server 0 all the time?
VectorClock updatedVectorClock = ((VectorClock) metadataStore.get(clusterKey, null)
.get(0)
.getVersion()).incremented(0,
.getVersion()).incremented(metadataStore.getNodeId(),
System.currentTimeMillis());
metadataStore.put(clusterKey, Versioned.value((Object) cluster, updatedVectorClock));
} finally {
Expand Down
11 changes: 10 additions & 1 deletion src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -1031,9 +1031,18 @@ protected void stopInner() {
lastException = e;
}
}

logger.info("Closed failure detector.");

// shut down the proxy put thread pool
this.proxyPutWorkerPool.shutdown();
try {
if(!this.proxyPutWorkerPool.awaitTermination(10, TimeUnit.SECONDS))
this.proxyPutWorkerPool.shutdownNow();
} catch(InterruptedException e) {
this.proxyPutWorkerPool.shutdownNow();
}
logger.info("Closed proxy put thread pool.");

/* If there is an exception, throw it */
if(lastException instanceof VoldemortException)
throw (VoldemortException) lastException;
Expand Down
55 changes: 28 additions & 27 deletions src/java/voldemort/store/rebalancing/AsyncProxyPutTask.java
Expand Up @@ -16,7 +16,6 @@

package voldemort.store.rebalancing;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import voldemort.cluster.Node;
Expand Down Expand Up @@ -52,6 +51,7 @@ public class AsyncProxyPutTask implements Runnable {
private final Versioned<byte[]> value;
private final byte[] transforms;
private final int destinationNode;
private final MetadataStore metadata;

AsyncProxyPutTask(RedirectingStore redirectingStore,
ByteArray key,
Expand All @@ -63,53 +63,54 @@ public class AsyncProxyPutTask implements Runnable {
this.transforms = transforms;
this.redirectingStore = redirectingStore;
this.destinationNode = destinationNode;
logger.setLevel(Level.TRACE);
this.metadata = redirectingStore.getMetadataStore();
}

@Override
public void run() {

MetadataStore metadata = redirectingStore.getMetadataStore();
Node donorNode = metadata.getCluster().getNodeById(destinationNode);
Node proxyNode = metadata.getCluster().getNodeById(destinationNode);
long startNs = System.nanoTime();
try {
// TODO there are no retries now if the node we want to write to is
// unavailable
redirectingStore.checkNodeAvailable(donorNode);
redirectingStore.checkNodeAvailable(proxyNode);
Store<ByteArray, byte[], byte[]> socketStore = redirectingStore.getRedirectingSocketStore(redirectingStore.getName(),
destinationNode);

socketStore.put(key, value, transforms);
redirectingStore.recordSuccess(donorNode, startNs);
redirectingStore.recordSuccess(proxyNode, startNs);
if(logger.isTraceEnabled()) {
logger.trace("Proxy write for store " + redirectingStore.getName() + " key "
+ ByteUtils.toBinaryString(key.get()) + " to destinationNode:"
+ destinationNode);
}
} catch(UnreachableStoreException e) {
redirectingStore.recordException(donorNode, startNs, e);
logger.error("Failed to reach proxy node " + donorNode, e);
redirectingStore.reporteProxyPutFailure();
redirectingStore.recordException(proxyNode, startNs, e);
logFailedProxyPutIfNeeded(e);
} catch(ObsoleteVersionException ove) {
// Proxy puts can get an OVE if somehow there are two stealers for
// the same donor and the other stealer's proxy put already got to
// the donor.. This will not result from online put winning, since
// we don't issue proxy puts if the donor is still a replica
if(logger.isTraceEnabled()) {
logger.trace("OVE in proxy put for destinationNode: " + destinationNode
+ " from node:" + metadata.getNodeId() + " on key "
+ ByteUtils.toHexString(key.get()) + " Version:"
+ value.getVersion(),
ove);
}
redirectingStore.reporteProxyPutFailure();
/*
* Proxy puts can get an OVE if somehow there are two stealers for
* the same proxy node and the other stealer's proxy put already got
* tothe proxy node.. This will not result from online put winning,
* since we don't issue proxy puts if the proxy node is still a
* replica
*/
logFailedProxyPutIfNeeded(ove);
} catch(Exception e) {
// Just log the key.. Not sure having values in the log is a good
// idea.
logger.error("Unexpected exception in proxy put for destinationNode: "
+ destinationNode + " from node:" + metadata.getNodeId() + " on key "
+ ByteUtils.toHexString(key.get()) + " Version:" + value.getVersion(), e);
redirectingStore.reporteProxyPutFailure();
logFailedProxyPutIfNeeded(e);
}
}

private void logFailedProxyPutIfNeeded(Exception e) {
redirectingStore.reportProxyPutFailure();
// only log OVE if trace debugging is on.
if(e instanceof ObsoleteVersionException && !logger.isTraceEnabled()) {
return;
}
logger.error("Exception in proxy put for proxyNode: " + destinationNode + " from node:"
+ metadata.getNodeId() + " on key " + ByteUtils.toHexString(key.get())
+ " Version:" + value.getVersion(), e);
}
}
}
3 changes: 2 additions & 1 deletion src/java/voldemort/store/rebalancing/ProxyPutStats.java
Expand Up @@ -53,6 +53,7 @@ public void reportProxyPutCompletion() {
}

public void reportProxyPutFailure() {
this.reportProxyPutCompletion();
this.numProxyPutFailures.incrementAndGet();
if(this.parent != null) {
this.parent.reportProxyPutFailure();
Expand All @@ -68,4 +69,4 @@ public long getNumProxyPutFailures() {
public long getNumPendingProxyPuts() {
return numPendingProxyPuts.get();
}
}
}

0 comments on commit f31e48f

Please sign in to comment.