Skip to content

Commit

Permalink
Add assertions to prevent negative TTLs in default service scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 8, 2018
1 parent 03e470d commit 0e156b1
Showing 1 changed file with 11 additions and 17 deletions.
Expand Up @@ -38,6 +38,7 @@
import java.util.Queue;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand All @@ -49,7 +50,7 @@ public class DefaultServiceExecutor implements ServiceExecutor {
private final Queue<Runnable> tasks = new LinkedList<>();
private final List<ScheduledTask> scheduledTasks = new ArrayList<>();
private final List<ScheduledTask> complete = new ArrayList<>();
private final Map<String, InternalOperation> operations = new HashMap<>();
private final Map<String, Function<Commit<byte[]>, byte[]>> operations = new HashMap<>();
private OperationType operationType;
private long timestamp;

Expand Down Expand Up @@ -104,7 +105,7 @@ private void checkOperation(OperationType type, String message) {
public void handle(OperationId operationId, Function<Commit<byte[]>, byte[]> callback) {
checkNotNull(operationId, "operationId cannot be null");
checkNotNull(callback, "callback cannot be null");
operations.put(operationId.id(), new InternalOperation(operationId, callback));
operations.put(operationId.id(), callback);
log.debug("Registered operation callback {}", operationId);
}

Expand All @@ -116,15 +117,15 @@ public byte[] apply(Commit<byte[]> commit) {
this.timestamp = commit.wallClockTime().unixTimestamp();

// Look up the registered callback for the operation.
InternalOperation operation = operations.get(commit.operation().id());
Function<Commit<byte[]>, byte[]> operation = operations.get(commit.operation().id());

if (operation == null) {
throw new IllegalStateException("Unknown state machine operation: " + commit.operation());
} else {
// Execute the operation. If the operation return value is a Future, await the result,
// otherwise immediately complete the execution future.
try {
return operation.callback.apply(commit);
return operation.apply(commit);
} catch (Exception e) {
log.warn("State machine operation failed: {}", e.getMessage());
throw new PrimitiveException.ServiceException();
Expand All @@ -151,36 +152,29 @@ private void runTasks() {
@Override
public void execute(Runnable callback) {
checkOperation(OperationType.COMMAND, "callbacks can only be scheduled during command execution");
checkNotNull(callback, "callback cannot be null");
tasks.add(callback);
}

@Override
public Scheduled schedule(Duration delay, Runnable callback) {
checkOperation(OperationType.COMMAND, "callbacks can only be scheduled during command execution");
checkArgument(!delay.isNegative(), "delay cannot be negative");
checkNotNull(callback, "callback cannot be null");
log.trace("Scheduled callback {} with delay {}", callback, delay);
return new ScheduledTask(callback, delay.toMillis()).schedule();
}

@Override
public Scheduled schedule(Duration initialDelay, Duration interval, Runnable callback) {
checkOperation(OperationType.COMMAND, "callbacks can only be scheduled during command execution");
checkArgument(!initialDelay.isNegative(), "initialDelay cannot be negative");
checkArgument(!interval.isNegative(), "interval cannot be negative");
checkNotNull(callback, "callback cannot be null");
log.trace("Scheduled repeating callback {} with initial delay {} and interval {}", callback, initialDelay, interval);
return new ScheduledTask(callback, initialDelay.toMillis(), interval.toMillis()).schedule();
}

/**
* Internal operation container.
*/
private static class InternalOperation {
private final OperationId operation;
private final Function<Commit<byte[]>, byte[]> callback;

public InternalOperation(OperationId operation, Function<Commit<byte[]>, byte[]> callback) {
this.operation = operation;
this.callback = callback;
}
}

/**
* Scheduled task.
*/
Expand Down

0 comments on commit 0e156b1

Please sign in to comment.