Skip to content

Commit

Permalink
deprecate send hint serial
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiewu committed May 16, 2013
1 parent c1400bd commit 204615f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 77 deletions.
Expand Up @@ -165,12 +165,12 @@ public void requestComplete(Object result, long requestTime) {
+ "} Start registering Slop(node:"
+ node.getId() + ",host:" + node.getHost()
+ ")");
hintedHandoff.sendHintSerial(node,
versionedCopy.getVersion(),
slop);
hintedHandoff.sendHintParallel(node,
versionedCopy.getVersion(),
slop);
if(logger.isDebugEnabled())
logger.debug("PUT {key:" + key
+ "} Finished registering Slop(node:"
+ "} Sent out request to register Slop(node: "
+ node.getId() + ",host:" + node.getHost()
+ ")");
}
Expand Down
137 changes: 64 additions & 73 deletions src/java/voldemort/store/slop/HintedHandoff.java
Expand Up @@ -17,6 +17,7 @@
package voldemort.store.slop;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -99,96 +100,85 @@ public HintedHandoff(FailureDetector failureDetector,
* @see #sendHintSerial(voldemort.cluster.Node,
* voldemort.versioning.Version, Slop)
*/
public void sendHintParallel(final Node failedNode, final Version version, final Slop slop) {
boolean slopAsyncSent = false;
final ByteArray slopKey = slop.makeKey();
Versioned<byte[]> slopVersioned = new Versioned<byte[]>(slopSerializer.toBytes(slop),
version);
List<Node> nodes = handoffStrategy.routeHint(failedNode);
public void sendHintParallel(Node failedNode, Version version, Slop slop) {
List<Node> nodes = new LinkedList<Node>();
nodes.addAll(handoffStrategy.routeHint(failedNode));
if(logger.isTraceEnabled()) {
List<Integer> nodeIds = new ArrayList<Integer>();
for(Node node: nodes) {
nodeIds.add(node.getId());
}
logger.debug("Hint preference list: " + nodeIds.toString());
}
for(final Node node: nodes) {
int nodeId = node.getId();

if(!failedNodes.contains(node) && failureDetector.isAvailable(node)) {
if(logger.isDebugEnabled())
logger.debug("Sending an async hint to " + nodeId);
NonblockingStore nonblockingStore = nonblockingSlopStores.get(nodeId);
Utils.notNull(nonblockingStore);
final long startNs = System.nanoTime();

if(logger.isDebugEnabled())
logger.debug("Slop attempt to write " + slop.getKey() + " for " + failedNode
+ " to node " + node);

NonblockingStoreCallback callback = new NonblockingStoreCallback() {
sendOneAsyncHint(slop.makeKey(), new Versioned<byte[]>(slopSerializer.toBytes(slop),
version), nodes);
}

public void requestComplete(Object result, long requestTime) {
logger.debug("Got response for async hint request");
Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
slopKey,
result,
requestTime);
if(response.getValue() instanceof Exception) {
if(response.getValue() instanceof ObsoleteVersionException) {
// Ignore
private void sendOneAsyncHint(final ByteArray slopKey,
final Versioned<byte[]> slopVersioned,
final List<Node> routeNodes) {
Node nodeToHostHint = null;
while(routeNodes.size() > 0) {
nodeToHostHint = routeNodes.remove(0);
if(!failedNodes.contains(nodeToHostHint) && failureDetector.isAvailable(nodeToHostHint)) {
break;
} else {
nodeToHostHint = null;
}
}
if(nodeToHostHint == null) {
logger.error("trying to send an async hint but used up all nodes");
return;
}
final Node node = nodeToHostHint;
int nodeId = node.getId();

// TODO: Treating ObsoleteVersionException as
// "success", but there is no logger.debug to
// note that the slop was written, nor is there
// a failureDetector.recordSuccess invocation.
} else {
// Use the blocking approach
if(!failedNodes.contains(node))
failedNodes.add(node);
if(response.getValue() instanceof UnreachableStoreException) {
UnreachableStoreException use = (UnreachableStoreException) response.getValue();
NonblockingStore nonblockingStore = nonblockingSlopStores.get(nodeId);
Utils.notNull(nonblockingStore);

if(logger.isDebugEnabled()) {
logger.debug("Write of key " + slop.getKey() + " for "
+ failedNode + " to node " + node
+ " failed due to unreachable: "
+ use.getMessage());
}
final Long startNs = System.nanoTime();
NonblockingStoreCallback callback = new NonblockingStoreCallback() {

failureDetector.recordException(node,
(System.nanoTime() - startNs)
/ Time.NS_PER_MS,
use);
}
sendHintSerial(failedNode, version, slop);
}
return;
@Override
public void requestComplete(Object result, long requestTime) {
Slop slop = null;
boolean loggerDebugEnabled = logger.isDebugEnabled();
if(loggerDebugEnabled) {
slop = slopSerializer.toObject(slopVersioned.getValue());
}
Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
slopKey,
result,
requestTime);
if(response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(!failedNodes.contains(node))
failedNodes.add(node);
if(response.getValue() instanceof UnreachableStoreException) {
UnreachableStoreException use = (UnreachableStoreException) response.getValue();

if(loggerDebugEnabled) {
logger.debug("Write of key " + slop.getKey() + " for "
+ slop.getNodeId() + " to node " + node
+ " failed due to unreachable: " + use.getMessage());
}

if(logger.isDebugEnabled())
logger.debug("Slop write of key " + slop.getKey() + " for "
+ failedNode + " to node " + node + " succeeded in "
+ (System.nanoTime() - startNs) + " ns");
failureDetector.recordException(node, (System.nanoTime() - startNs)
/ Time.NS_PER_MS, use);
}
sendOneAsyncHint(slopKey, slopVersioned, routeNodes);
}

failureDetector.recordSuccess(node, (System.nanoTime() - startNs)
/ Time.NS_PER_MS);
if(loggerDebugEnabled)
logger.debug("Slop write of key " + slop.getKey() + " for node "
+ slop.getNodeId() + " to node " + node + " succeeded in "
+ (System.nanoTime() - startNs) + " ns");

}
};
failureDetector.recordSuccess(node, (System.nanoTime() - startNs) / Time.NS_PER_MS);

nonblockingStore.submitPutRequest(slopKey, slopVersioned, null, callback, timeoutMs);
slopAsyncSent = true;
break;
} else {
if(logger.isDebugEnabled()) {
logger.debug("Skipping node " + nodeId);
}
}
}
if(logger.isDebugEnabled() && !slopAsyncSent) {
logger.warn("Skipped all nodes. Did not send hint for key: " + slop.getKey());
}
};
nonblockingStore.submitPutRequest(slopKey, slopVersioned, null, callback, timeoutMs);
}

/**
Expand All @@ -201,6 +191,7 @@ public void requestComplete(Object result, long requestTime) {
* @param slop The hint
* @return True if persisted on another node, false otherwise
*/
@Deprecated
public boolean sendHintSerial(Node failedNode, Version version, Slop slop) {
boolean persisted = false;
for(Node node: handoffStrategy.routeHint(failedNode)) {
Expand Down

0 comments on commit 204615f

Please sign in to comment.