Skip to content

Commit

Permalink
Remove redundant callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Apr 24, 2013
1 parent 9991605 commit 29da9ef
Showing 1 changed file with 54 additions and 73 deletions.
127 changes: 54 additions & 73 deletions src/java/voldemort/client/protocol/admin/StreamingClient.java
Expand Up @@ -51,21 +51,19 @@

/**
*
* @author anagpal
*
* The streaming API allows for send events into voldemort stores in the
* async fashion. All the partition and replication logic will be taken
* care of internally.
* The streaming API allows for send events into voldemort stores in the async
* fashion. All the partition and replication logic will be taken care of
* internally.
*
* The users is expected to provide two callbacks, one for performing
* period checkpoints and one for recovering the streaming process from
* the last checkpoint.
* The users is expected to provide two callbacks, one for performing period
* checkpoints and one for recovering the streaming process from the last
* checkpoint.
*
* NOTE: The API is not thread safe, since if multiple threads use this
* API we cannot make any guarantees about correctness of the
* checkpointing mechanism.
* NOTE: The API is not thread safe, since if multiple threads use this API we
* cannot make any guarantees about correctness of the checkpointing mechanism.
*
* Right now we expect this to used by a single thread per data source
* Right now we expect this to used by a single thread per data source
*
*/
public class StreamingClient {
Expand All @@ -91,9 +89,6 @@ public class StreamingClient {
// Every batch size we commit
private static int CHECKPOINT_COMMIT_SIZE;

// TODO
// provide knobs to tune this
private static int TIME_COMMIT_SIZE = 30;
// we have to throttle to a certain qps
private static int THROTTLE_QPS;
private int entriesProcessed;
Expand Down Expand Up @@ -519,12 +514,9 @@ public synchronized void streamingPut(ByteArray key, Versioned<byte[]> value, St

}

int secondsTime = calendar.get(Calendar.SECOND);
if(entriesProcessed == CHECKPOINT_COMMIT_SIZE || secondsTime % TIME_COMMIT_SIZE == 0) {
if(entriesProcessed == CHECKPOINT_COMMIT_SIZE) {
entriesProcessed = 0;

commitToVoldemort();

}

throttler.maybeThrottle(1);
Expand Down Expand Up @@ -557,6 +549,8 @@ private void commitToVoldemort(List<String> storeNamesToCommit) {
if(logger.isDebugEnabled()) {
logger.debug("Trying to commit to Voldemort");
}

boolean hasError = false;
for(Node node: nodesToStream) {

for(String store: storeNamesToCommit) {
Expand All @@ -576,66 +570,55 @@ private void commitToVoldemort(List<String> storeNamesToCommit) {
VAdminProto.UpdatePartitionEntriesResponse.Builder updateResponse = ProtoUtils.readToBuilder(inputStream,
VAdminProto.UpdatePartitionEntriesResponse.newBuilder());
if(updateResponse.hasError()) {
logger.warn("Invoking the Recovery Callback");
Future future = streamingresults.submit(recoveryCallback);
try {
future.get();

} catch(InterruptedException e1) {
MARKED_BAD = true;
logger.error("Recovery Callback failed");
e1.printStackTrace();
throw new VoldemortException("Recovery Callback failed");
} catch(ExecutionException e1) {
MARKED_BAD = true;
logger.error("Recovery Callback failed");
e1.printStackTrace();
throw new VoldemortException("Recovery Callback failed");
}
} else {
if(logger.isDebugEnabled()) {
logger.debug("Commit successful");
logger.debug("calling checkpoint callback");
}
Future future = streamingresults.submit(checkpointCallback);
try {
future.get();

} catch(InterruptedException e1) {

logger.warn("Checkpoint callback failed!");
e1.printStackTrace();
} catch(ExecutionException e1) {
logger.warn("Checkpoint callback failed!");
e1.printStackTrace();
}
hasError = true;
}

} catch(IOException e) {

logger.warn("Invoking the Recovery Callback");
Future future = streamingresults.submit(recoveryCallback);
try {
future.get();

} catch(InterruptedException e1) {
MARKED_BAD = true;
logger.error("Recovery Callback failed");
e1.printStackTrace();
throw new VoldemortException("Recovery Callback failed");
} catch(ExecutionException e1) {
MARKED_BAD = true;
logger.error("Recovery Callback failed");
e1.printStackTrace();
throw new VoldemortException("Recovery Callback failed");
}

e.printStackTrace();
hasError = true;
}
}

}

// remove redundant callbacks
if(hasError) {

logger.warn("Invoking the Recovery Callback");
Future future = streamingresults.submit(recoveryCallback);
try {
future.get();

} catch(InterruptedException e1) {
MARKED_BAD = true;
logger.error("Recovery Callback failed");
e1.printStackTrace();
throw new VoldemortException("Recovery Callback failed");
} catch(ExecutionException e1) {
MARKED_BAD = true;
logger.error("Recovery Callback failed");
e1.printStackTrace();
throw new VoldemortException("Recovery Callback failed");
}
} else {
if(logger.isDebugEnabled()) {
logger.debug("Commit successful");
logger.debug("calling checkpoint callback");
}
Future future = streamingresults.submit(checkpointCallback);
try {
future.get();

} catch(InterruptedException e1) {

logger.warn("Checkpoint callback failed!");
e1.printStackTrace();
} catch(ExecutionException e1) {
logger.warn("Checkpoint callback failed!");
e1.printStackTrace();
}
}

}

/**
Expand All @@ -654,11 +637,9 @@ public synchronized void closeStreamingSessions(Callable resetCheckpointCallback
future.get();

} catch(InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
logger.warn("Reset check point interrupted" + e1);
} catch(ExecutionException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
logger.warn("Reset check point interrupted" + e1);
}
}

Expand Down

0 comments on commit 29da9ef

Please sign in to comment.