From 29da9effe88e48ced329fdae27bf805247ec29eb Mon Sep 17 00:00:00 2001 From: Abhinay Nagpal Date: Wed, 24 Apr 2013 11:38:47 -0700 Subject: [PATCH] Remove redundant callbacks --- .../protocol/admin/StreamingClient.java | 127 ++++++++---------- 1 file changed, 54 insertions(+), 73 deletions(-) diff --git a/src/java/voldemort/client/protocol/admin/StreamingClient.java b/src/java/voldemort/client/protocol/admin/StreamingClient.java index 50430d2c40..e64b1467d5 100644 --- a/src/java/voldemort/client/protocol/admin/StreamingClient.java +++ b/src/java/voldemort/client/protocol/admin/StreamingClient.java @@ -51,21 +51,19 @@ /** * - * @author anagpal * - * The streaming API allows for send events into voldemort stores in the - * async fashion. All the partition and replication logic will be taken - * care of internally. + * The streaming API allows for send events into voldemort stores in the async + * fashion. All the partition and replication logic will be taken care of + * internally. * - * The users is expected to provide two callbacks, one for performing - * period checkpoints and one for recovering the streaming process from - * the last checkpoint. + * The users is expected to provide two callbacks, one for performing period + * checkpoints and one for recovering the streaming process from the last + * checkpoint. * - * NOTE: The API is not thread safe, since if multiple threads use this - * API we cannot make any guarantees about correctness of the - * checkpointing mechanism. + * NOTE: The API is not thread safe, since if multiple threads use this API we + * cannot make any guarantees about correctness of the checkpointing mechanism. * - * Right now we expect this to used by a single thread per data source + * Right now we expect this to used by a single thread per data source * */ public class StreamingClient { @@ -91,9 +89,6 @@ public class StreamingClient { // Every batch size we commit private static int CHECKPOINT_COMMIT_SIZE; - // TODO - // provide knobs to tune this - private static int TIME_COMMIT_SIZE = 30; // we have to throttle to a certain qps private static int THROTTLE_QPS; private int entriesProcessed; @@ -519,12 +514,9 @@ public synchronized void streamingPut(ByteArray key, Versioned value, St } - int secondsTime = calendar.get(Calendar.SECOND); - if(entriesProcessed == CHECKPOINT_COMMIT_SIZE || secondsTime % TIME_COMMIT_SIZE == 0) { + if(entriesProcessed == CHECKPOINT_COMMIT_SIZE) { entriesProcessed = 0; - commitToVoldemort(); - } throttler.maybeThrottle(1); @@ -557,6 +549,8 @@ private void commitToVoldemort(List storeNamesToCommit) { if(logger.isDebugEnabled()) { logger.debug("Trying to commit to Voldemort"); } + + boolean hasError = false; for(Node node: nodesToStream) { for(String store: storeNamesToCommit) { @@ -576,66 +570,55 @@ private void commitToVoldemort(List storeNamesToCommit) { VAdminProto.UpdatePartitionEntriesResponse.Builder updateResponse = ProtoUtils.readToBuilder(inputStream, VAdminProto.UpdatePartitionEntriesResponse.newBuilder()); if(updateResponse.hasError()) { - logger.warn("Invoking the Recovery Callback"); - Future future = streamingresults.submit(recoveryCallback); - try { - future.get(); - - } catch(InterruptedException e1) { - MARKED_BAD = true; - logger.error("Recovery Callback failed"); - e1.printStackTrace(); - throw new VoldemortException("Recovery Callback failed"); - } catch(ExecutionException e1) { - MARKED_BAD = true; - logger.error("Recovery Callback failed"); - e1.printStackTrace(); - throw new VoldemortException("Recovery Callback failed"); - } - } else { - if(logger.isDebugEnabled()) { - logger.debug("Commit successful"); - logger.debug("calling checkpoint callback"); - } - Future future = streamingresults.submit(checkpointCallback); - try { - future.get(); - - } catch(InterruptedException e1) { - - logger.warn("Checkpoint callback failed!"); - e1.printStackTrace(); - } catch(ExecutionException e1) { - logger.warn("Checkpoint callback failed!"); - e1.printStackTrace(); - } + hasError = true; } } catch(IOException e) { - - logger.warn("Invoking the Recovery Callback"); - Future future = streamingresults.submit(recoveryCallback); - try { - future.get(); - - } catch(InterruptedException e1) { - MARKED_BAD = true; - logger.error("Recovery Callback failed"); - e1.printStackTrace(); - throw new VoldemortException("Recovery Callback failed"); - } catch(ExecutionException e1) { - MARKED_BAD = true; - logger.error("Recovery Callback failed"); - e1.printStackTrace(); - throw new VoldemortException("Recovery Callback failed"); - } - e.printStackTrace(); + hasError = true; } } } + // remove redundant callbacks + if(hasError) { + + logger.warn("Invoking the Recovery Callback"); + Future future = streamingresults.submit(recoveryCallback); + try { + future.get(); + + } catch(InterruptedException e1) { + MARKED_BAD = true; + logger.error("Recovery Callback failed"); + e1.printStackTrace(); + throw new VoldemortException("Recovery Callback failed"); + } catch(ExecutionException e1) { + MARKED_BAD = true; + logger.error("Recovery Callback failed"); + e1.printStackTrace(); + throw new VoldemortException("Recovery Callback failed"); + } + } else { + if(logger.isDebugEnabled()) { + logger.debug("Commit successful"); + logger.debug("calling checkpoint callback"); + } + Future future = streamingresults.submit(checkpointCallback); + try { + future.get(); + + } catch(InterruptedException e1) { + + logger.warn("Checkpoint callback failed!"); + e1.printStackTrace(); + } catch(ExecutionException e1) { + logger.warn("Checkpoint callback failed!"); + e1.printStackTrace(); + } + } + } /** @@ -654,11 +637,9 @@ public synchronized void closeStreamingSessions(Callable resetCheckpointCallback future.get(); } catch(InterruptedException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); + logger.warn("Reset check point interrupted" + e1); } catch(ExecutionException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); + logger.warn("Reset check point interrupted" + e1); } }