diff --git a/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java b/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java index 7f06455eee..68ba8d4252 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java @@ -197,7 +197,7 @@ private void processResponses(final Map> re failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); pipelineData.getZoneResponses().add(response.getNode().getZoneId()); - // TODO: Are the next two lines necessary!?!?!? + // TODO: Are the next two lines necessary!?!?!? YES, they are. Response rCast = Utils.uncheckedCast(response); pipelineData.getResponses().add(rCast); diff --git a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java index 93910f64aa..b8a4be9d88 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java @@ -275,6 +275,13 @@ public void requestComplete(Object result, long requestTime) { hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop); } } + /*- + if(isHintedHandoffEnabled() && pipeline.isFinished()) { + if(response.getValue() instanceof UnreachableStoreException) { + new Thread(new DoHintedHandoff()); + } + } + */ attemptsLatch.countDown(); blocksLatch.countDown(); @@ -298,6 +305,96 @@ public void requestComplete(Object result, long requestTime) { handleResponseError(response, pipeline, failureDetector); } } + /*- + if(pipeline.isFinished() && response.getValue() instanceof Exception + && !(response.getValue() instanceof ObsoleteVersionException)) { + new Thread(new DoErrorHandling(response)); + } + */ + + /*- + if(pipeline.isFinished() && response.getValue() instanceof Exception) { + logger.error("OMG DoExceptionHandling.run() invoked!"); + new Thread(new DoExceptionHandling(response)); + } + */ + } + + public class DoHintedHandoff implements Runnable { + + DoHintedHandoff() {} + + @Override + public void run() { + // TODO: remove logger.error... + logger.error("OMG DoHintedHandoff.run() invoked!"); + Slop slop = new Slop(pipelineData.getStoreName(), + Slop.Operation.PUT, + key, + versionedCopy.getValue(), + transforms, + node.getId(), + new Date()); + pipelineData.addFailedNode(node); + hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop); + } + } + + public class DoErrorHandling implements Runnable { + + Response response; + + DoErrorHandling(Response response) { + this.response = response; + } + + @Override + public void run() { + // TODO: remove logger.error... + logger.error("OMG DoErrorHandling.run() invoked!"); + if(response.getValue() instanceof InvalidMetadataException) { + pipelineData.reportException((InvalidMetadataException) response.getValue()); + logger.warn("Received invalid metadata problem after a successful " + + pipeline.getOperation().getSimpleName() + " call on node " + + node.getId() + ", store '" + pipelineData.getStoreName() + "'"); + } else { + handleResponseError(response, pipeline, failureDetector); + } + } + } + + public class DoExceptionHandling implements Runnable { + + Response response; + + DoExceptionHandling(Response response) { + this.response = response; + } + + @Override + public void run() { + // TODO: remove logger.error... + logger.error("OMG DoExceptionHandling.run() invoked!"); + if(response.getValue() instanceof UnreachableStoreException + && isHintedHandoffEnabled()) { + Slop slop = new Slop(pipelineData.getStoreName(), + Slop.Operation.PUT, + key, + versionedCopy.getValue(), + transforms, + node.getId(), + new Date()); + pipelineData.addFailedNode(node); + hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop); + } else if(response.getValue() instanceof InvalidMetadataException) { + pipelineData.reportException((InvalidMetadataException) response.getValue()); + logger.warn("Received invalid metadata problem after a successful " + + pipeline.getOperation().getSimpleName() + " call on node " + + node.getId() + ", store '" + pipelineData.getStoreName() + "'"); + } else { + handleResponseError(response, pipeline, failureDetector); + } + } } } } diff --git a/src/java/voldemort/store/routed/action/PerformParallelRequests.java b/src/java/voldemort/store/routed/action/PerformParallelRequests.java index 222d6d4576..2721b38c67 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelRequests.java @@ -136,7 +136,7 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS) failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); pipelineData.getZoneResponses().add(response.getNode().getZoneId()); - // TODO: What about these two operations!? ARe they needed? + // TODO: Are the next two lines necessary!?!?!? YES, they are. Response rCast = Utils.uncheckedCast(response); pipelineData.getResponses().add(rCast); }