Skip to content

Commit

Permalink
Clean up of TODOs and cruft code that had been introduced.
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjwylie committed Jan 15, 2013
1 parent 526deff commit efe9f21
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 113 deletions.
Expand Up @@ -71,7 +71,6 @@ public PerformParallelDeleteRequests(PD pipelineData,
this.hintedHandoff = hintedHandoff;
}

// TODO: This is almost identical to PerformParallelPutRequests.execute
@Override
public void execute(final Pipeline pipeline) {
List<Node> nodes = pipelineData.getNodes();
Expand Down Expand Up @@ -159,8 +158,6 @@ public void execute(final Pipeline pipeline) {
}
}

// TODO: except for start time, this is same as
// PerformParallelPutRequests.waitForResponses
private void waitForResponses(long startTimeNs,
CountDownLatch latch,
final Map<Integer, Response<ByteArray, Object>> responses,
Expand All @@ -179,8 +176,6 @@ private void waitForResponses(long startTimeNs,
}
}

// TODO: except for two lines, this is same as
// PerformParallelPutRequests.processResponses
private void processResponses(final Map<Integer, Response<ByteArray, Object>> responses,
final Pipeline pipeline) {
for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Expand All @@ -197,7 +192,6 @@ private void processResponses(final Map<Integer, Response<ByteArray, Object>> re
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());

// TODO: Are the next two lines necessary!?!?!? YES, they are.
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);

Expand All @@ -206,8 +200,6 @@ private void processResponses(final Map<Integer, Response<ByteArray, Object>> re
}
}

// TODO: Almost identical to PerformParallelPutRequests.Callback. Anyway to
// refactor into common code?
public class Callback implements NonblockingStoreCallback {

final Pipeline pipeline;
Expand Down Expand Up @@ -240,7 +232,6 @@ public void requestComplete(Object result, long requestTime) {
requestTime);
responses.put(node.getId(), response);

// TODO: Must move heavy-weight ops out of callback
if(enableHintedHandoff && pipeline.isFinished()
&& response.getValue() instanceof UnreachableStoreException) {
Slop slop = new Slop(pipelineData.getStoreName(),
Expand All @@ -251,6 +242,7 @@ public void requestComplete(Object result, long requestTime) {
node.getId(),
new Date());
pipelineData.addFailedNode(node);
// TODO: Should not have blocking operation in callback
hintedHandoff.sendHintSerial(node, version, slop);
}

Expand All @@ -261,7 +253,6 @@ public void requestComplete(Object result, long requestTime) {
logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
+ " for " + blocksLatch.getCount() + " more ");

// TODO: Must move heavy-weight ops out of callback
// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
Expand All @@ -273,6 +264,8 @@ public void requestComplete(Object result, long requestTime) {
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
// TODO: Should not have operation that acquires locks and
// may do blocking operations in callback
handleResponseError(response, pipeline, failureDetector);
}
}
Expand Down
Expand Up @@ -181,7 +181,6 @@ public void requestComplete(Object result, long requestTime) {
responses.put(node.getId(), response);
attemptsLatch.countDown();

// TODO: Must move heavy-weight ops out of callback
// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
Expand All @@ -192,6 +191,8 @@ public void requestComplete(Object result, long requestTime) {
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
// TODO: Should not have operation that acquires locks and
// may do blocking operations in callback
handleResponseError(response, pipeline, failureDetector);
}

Expand Down
Expand Up @@ -215,6 +215,7 @@ private void processResponses(final Map<Integer, Response<ByteArray, Object>> re
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());

responses.remove(responseEntry.getKey());
}
}
Expand Down Expand Up @@ -261,7 +262,6 @@ public void requestComplete(Object result, long requestTime) {
+ requestTime + " ms on node " + node.getId() + "(" + node.getHost()
+ ")");

// TODO: Must move heavy-weight ops out of callback
if(isHintedHandoffEnabled() && pipeline.isFinished()) {
if(response.getValue() instanceof UnreachableStoreException) {
Slop slop = new Slop(pipelineData.getStoreName(),
Expand All @@ -272,16 +272,10 @@ public void requestComplete(Object result, long requestTime) {
node.getId(),
new Date());
pipelineData.addFailedNode(node);
// TODO: Should not have blocking operation in callback
hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop);
}
}
/*-
if(isHintedHandoffEnabled() && pipeline.isFinished()) {
if(response.getValue() instanceof UnreachableStoreException) {
new Thread(new DoHintedHandoff());
}
}
*/

attemptsLatch.countDown();
blocksLatch.countDown();
Expand All @@ -290,7 +284,6 @@ public void requestComplete(Object result, long requestTime) {
logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
+ " for " + blocksLatch.getCount() + " more ");

// TODO: Must move heavy-weight ops out of callback
// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
Expand All @@ -302,99 +295,12 @@ public void requestComplete(Object result, long requestTime) {
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
handleResponseError(response, pipeline, failureDetector);
}
}
/*-
if(pipeline.isFinished() && response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
new Thread(new DoErrorHandling(response));
}
*/

/*-
if(pipeline.isFinished() && response.getValue() instanceof Exception) {
logger.error("OMG DoExceptionHandling.run() invoked!");
new Thread(new DoExceptionHandling(response));
}
*/
}

public class DoHintedHandoff implements Runnable {

DoHintedHandoff() {}

@Override
public void run() {
// TODO: remove logger.error...
logger.error("OMG DoHintedHandoff.run() invoked!");
Slop slop = new Slop(pipelineData.getStoreName(),
Slop.Operation.PUT,
key,
versionedCopy.getValue(),
transforms,
node.getId(),
new Date());
pipelineData.addFailedNode(node);
hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop);
}
}

public class DoErrorHandling implements Runnable {

Response<ByteArray, Object> response;

DoErrorHandling(Response<ByteArray, Object> response) {
this.response = response;
}

@Override
public void run() {
// TODO: remove logger.error...
logger.error("OMG DoErrorHandling.run() invoked!");
if(response.getValue() instanceof InvalidMetadataException) {
pipelineData.reportException((InvalidMetadataException) response.getValue());
logger.warn("Received invalid metadata problem after a successful "
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
// TODO: Should not have operation that acquires locks and
// may do blocking operations in callback
handleResponseError(response, pipeline, failureDetector);
}
}
}

public class DoExceptionHandling implements Runnable {

Response<ByteArray, Object> response;

DoExceptionHandling(Response<ByteArray, Object> response) {
this.response = response;
}

@Override
public void run() {
// TODO: remove logger.error...
logger.error("OMG DoExceptionHandling.run() invoked!");
if(response.getValue() instanceof UnreachableStoreException
&& isHintedHandoffEnabled()) {
Slop slop = new Slop(pipelineData.getStoreName(),
Slop.Operation.PUT,
key,
versionedCopy.getValue(),
transforms,
node.getId(),
new Date());
pipelineData.addFailedNode(node);
hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop);
} else if(response.getValue() instanceof InvalidMetadataException) {
pipelineData.reportException((InvalidMetadataException) response.getValue());
logger.warn("Received invalid metadata problem after a successful "
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
handleResponseError(response, pipeline, failureDetector);
}
}
}
}
}
Expand Up @@ -118,8 +118,6 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS)
+ pipeline.getOperation());
}

// TODO: Wait on attemptsLatch and processiong responses could be
// refactored to use PerformParallelPut.waitForResponses.
try {
attemptsLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
} catch(InterruptedException e) {
Expand All @@ -136,7 +134,6 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS)
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());

// TODO: Are the next two lines necessary!?!?!? YES, they are.
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);
}
Expand Down Expand Up @@ -232,7 +229,6 @@ public void requestComplete(Object result, long requestTime) {
responses.put(node.getId(), response);
attemptsLatch.countDown();

// TODO: Must move heavy-weight ops out of callback
// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
Expand All @@ -243,6 +239,8 @@ public void requestComplete(Object result, long requestTime) {
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
// TODO: Should not have operation that acquires locks and
// may do blocking operations in callback
handleResponseError(response, pipeline, failureDetector);
}
}
Expand Down

0 comments on commit efe9f21

Please sign in to comment.