diff --git a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java index cc544d12ef..0e5db30ee6 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java @@ -165,12 +165,12 @@ public void requestComplete(Object result, long requestTime) { + "} Start registering Slop(node:" + node.getId() + ",host:" + node.getHost() + ")"); - hintedHandoff.sendHintSerial(node, - versionedCopy.getVersion(), - slop); + hintedHandoff.sendHintParallel(node, + versionedCopy.getVersion(), + slop); if(logger.isDebugEnabled()) logger.debug("PUT {key:" + key - + "} Finished registering Slop(node:" + + "} Sent out request to register Slop(node: " + node.getId() + ",host:" + node.getHost() + ")"); } diff --git a/src/java/voldemort/store/slop/HintedHandoff.java b/src/java/voldemort/store/slop/HintedHandoff.java index 12b477486c..0ea8adf712 100644 --- a/src/java/voldemort/store/slop/HintedHandoff.java +++ b/src/java/voldemort/store/slop/HintedHandoff.java @@ -17,6 +17,7 @@ package voldemort.store.slop; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -99,12 +100,9 @@ public HintedHandoff(FailureDetector failureDetector, * @see #sendHintSerial(voldemort.cluster.Node, * voldemort.versioning.Version, Slop) */ - public void sendHintParallel(final Node failedNode, final Version version, final Slop slop) { - boolean slopAsyncSent = false; - final ByteArray slopKey = slop.makeKey(); - Versioned slopVersioned = new Versioned(slopSerializer.toBytes(slop), - version); - List nodes = handoffStrategy.routeHint(failedNode); + public void sendHintParallel(Node failedNode, Version version, Slop slop) { + List nodes = new LinkedList(); + nodes.addAll(handoffStrategy.routeHint(failedNode)); if(logger.isTraceEnabled()) { List nodeIds = new ArrayList(); for(Node node: nodes) { @@ -112,83 +110,75 @@ public void sendHintParallel(final Node failedNode, final Version version, final } logger.debug("Hint preference list: " + nodeIds.toString()); } - for(final Node node: nodes) { - int nodeId = node.getId(); - - if(!failedNodes.contains(node) && failureDetector.isAvailable(node)) { - if(logger.isDebugEnabled()) - logger.debug("Sending an async hint to " + nodeId); - NonblockingStore nonblockingStore = nonblockingSlopStores.get(nodeId); - Utils.notNull(nonblockingStore); - final long startNs = System.nanoTime(); - - if(logger.isDebugEnabled()) - logger.debug("Slop attempt to write " + slop.getKey() + " for " + failedNode - + " to node " + node); - - NonblockingStoreCallback callback = new NonblockingStoreCallback() { + sendOneAsyncHint(slop.makeKey(), new Versioned(slopSerializer.toBytes(slop), + version), nodes); + } - public void requestComplete(Object result, long requestTime) { - logger.debug("Got response for async hint request"); - Response response = new Response(node, - slopKey, - result, - requestTime); - if(response.getValue() instanceof Exception) { - if(response.getValue() instanceof ObsoleteVersionException) { - // Ignore + private void sendOneAsyncHint(final ByteArray slopKey, + final Versioned slopVersioned, + final List routeNodes) { + Node nodeToHostHint = null; + while(routeNodes.size() > 0) { + nodeToHostHint = routeNodes.remove(0); + if(!failedNodes.contains(nodeToHostHint) && failureDetector.isAvailable(nodeToHostHint)) { + break; + } else { + nodeToHostHint = null; + } + } + if(nodeToHostHint == null) { + logger.error("trying to send an async hint but used up all nodes"); + return; + } + final Node node = nodeToHostHint; + int nodeId = node.getId(); - // TODO: Treating ObsoleteVersionException as - // "success", but there is no logger.debug to - // note that the slop was written, nor is there - // a failureDetector.recordSuccess invocation. - } else { - // Use the blocking approach - if(!failedNodes.contains(node)) - failedNodes.add(node); - if(response.getValue() instanceof UnreachableStoreException) { - UnreachableStoreException use = (UnreachableStoreException) response.getValue(); + NonblockingStore nonblockingStore = nonblockingSlopStores.get(nodeId); + Utils.notNull(nonblockingStore); - if(logger.isDebugEnabled()) { - logger.debug("Write of key " + slop.getKey() + " for " - + failedNode + " to node " + node - + " failed due to unreachable: " - + use.getMessage()); - } + final Long startNs = System.nanoTime(); + NonblockingStoreCallback callback = new NonblockingStoreCallback() { - failureDetector.recordException(node, - (System.nanoTime() - startNs) - / Time.NS_PER_MS, - use); - } - sendHintSerial(failedNode, version, slop); - } - return; + @Override + public void requestComplete(Object result, long requestTime) { + Slop slop = null; + boolean loggerDebugEnabled = logger.isDebugEnabled(); + if(loggerDebugEnabled) { + slop = slopSerializer.toObject(slopVersioned.getValue()); + } + Response response = new Response(node, + slopKey, + result, + requestTime); + if(response.getValue() instanceof Exception + && !(response.getValue() instanceof ObsoleteVersionException)) { + if(!failedNodes.contains(node)) + failedNodes.add(node); + if(response.getValue() instanceof UnreachableStoreException) { + UnreachableStoreException use = (UnreachableStoreException) response.getValue(); + + if(loggerDebugEnabled) { + logger.debug("Write of key " + slop.getKey() + " for " + + slop.getNodeId() + " to node " + node + + " failed due to unreachable: " + use.getMessage()); } - if(logger.isDebugEnabled()) - logger.debug("Slop write of key " + slop.getKey() + " for " - + failedNode + " to node " + node + " succeeded in " - + (System.nanoTime() - startNs) + " ns"); + failureDetector.recordException(node, (System.nanoTime() - startNs) + / Time.NS_PER_MS, use); + } + sendOneAsyncHint(slopKey, slopVersioned, routeNodes); + } - failureDetector.recordSuccess(node, (System.nanoTime() - startNs) - / Time.NS_PER_MS); + if(loggerDebugEnabled) + logger.debug("Slop write of key " + slop.getKey() + " for node " + + slop.getNodeId() + " to node " + node + " succeeded in " + + (System.nanoTime() - startNs) + " ns"); - } - }; + failureDetector.recordSuccess(node, (System.nanoTime() - startNs) / Time.NS_PER_MS); - nonblockingStore.submitPutRequest(slopKey, slopVersioned, null, callback, timeoutMs); - slopAsyncSent = true; - break; - } else { - if(logger.isDebugEnabled()) { - logger.debug("Skipping node " + nodeId); - } } - } - if(logger.isDebugEnabled() && !slopAsyncSent) { - logger.warn("Skipped all nodes. Did not send hint for key: " + slop.getKey()); - } + }; + nonblockingStore.submitPutRequest(slopKey, slopVersioned, null, callback, timeoutMs); } /** @@ -201,6 +191,7 @@ public void requestComplete(Object result, long requestTime) { * @param slop The hint * @return True if persisted on another node, false otherwise */ + @Deprecated public boolean sendHintSerial(Node failedNode, Version version, Slop slop) { boolean persisted = false; for(Node node: handoffStrategy.routeHint(failedNode)) {