Skip to content

Commit

Permalink
Better Runtime failure mgmt (#1671)
Browse files Browse the repository at this point in the history
* Retooled management of Runtime failures

* Removed extra code
  • Loading branch information
srkukarni authored and sijie committed Apr 27, 2018
1 parent 66c8428 commit f307080
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 68 deletions.
Expand Up @@ -565,7 +565,7 @@ void runCmd() throws Exception {
instanceConfig,
userCodeFile,
containerFactory,
null);
0);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
Expand Down
Expand Up @@ -88,8 +88,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
@Getter(AccessLevel.PACKAGE)
private Table<ByteBuf, ByteBuf> stateTable;

@Getter
private Exception failureException;
private JavaInstance javaInstance;
private AtomicBoolean running = new AtomicBoolean(true);

Expand Down Expand Up @@ -229,10 +227,7 @@ public void run() {
}
} catch (Exception ex) {
log.error("Uncaught exception in Java Instance", ex);
if (running.get()) {
failureException = ex;
throw new RuntimeException(ex);
}
throw new RuntimeException(ex);
} finally {
close();
}
Expand Down
Expand Up @@ -189,7 +189,7 @@ public void start() throws Exception {
instanceConfig,
jarFile,
containerFactory,
null);
0);

server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(runtimeSpawner))
Expand Down
Expand Up @@ -54,7 +54,8 @@ class ProcessRuntime implements Runtime {
@Getter
private List<String> processArgs;
private int instancePort;
private Exception startupException;
@Getter
private Exception deathException;
private ManagedChannel channel;
private InstanceControlGrpc.InstanceControlFutureStub stub;

Expand Down Expand Up @@ -227,8 +228,8 @@ public CompletableFuture<FunctionStatus> getFunctionStatus() {
public void onFailure(Throwable throwable) {
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
builder.setRunning(false);
if (startupException != null) {
builder.setFailureException(startupException.getMessage());
if (deathException != null) {
builder.setFailureException(deathException.getMessage());
} else {
builder.setFailureException(throwable.getMessage());
}
Expand Down Expand Up @@ -280,69 +281,62 @@ private int findAvailablePort() {
}

private void startProcess() {
startupException = null;
deathException = null;
try {
ProcessBuilder processBuilder = new ProcessBuilder(processArgs);
log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
process = processBuilder.start();
} catch (Exception ex) {
log.error("Starting process failed", ex);
startupException = ex;
deathException = ex;
return;
}
try {
int exitValue = process.exitValue();
log.error("Instance Process quit unexpectedly with return value " + exitValue);
tryExtractingDeathException();
} catch (IllegalThreadStateException ex) {
log.info("Started process successfully");
}
}

@Override
public boolean isAlive() {
return process != null && process.isAlive();
if (process == null) {
return false;
}
if (!process.isAlive()) {
if (deathException == null) {
tryExtractingDeathException();
}
return false;
}
FunctionStatus status;
try {
status = getFunctionStatus().get();
} catch (Exception ex) {
return false;
}
if (!status.getRunning()) {
if (status.getFailureException() != null && !status.getFailureException().isEmpty()) {
deathException = new Exception(status.getFailureException());
}
return false;
}
return true;
}

@Override
public Exception getDeathException() {
if (isAlive()) return null;
if (startupException != null) return startupException;
private void tryExtractingDeathException() {
InputStream errorStream = process.getErrorStream();
try {
byte[] errorBytes = new byte[errorStream.available()];
errorStream.read(errorBytes);
String errorMessage = new String(errorBytes);
startupException = new RuntimeException(errorMessage);
deathException = new RuntimeException(errorMessage);
log.error("Extracted Process death exception", deathException);
} catch (Exception ex) {
startupException = ex;
deathException = ex;
log.error("Error extracting Process death exception", deathException);
}
return startupException;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
int port = Integer.parseInt(args[0]);

ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port)
.usePlaintext(true)
.build();
InstanceControlFutureStub stub = InstanceControlGrpc.newFutureStub(channel);
ListenableFuture<FunctionStatus> response = stub.getFunctionStatus(Empty.newBuilder().build());
CompletableFuture<FunctionStatus> future = new CompletableFuture<>();
Futures.addCallback(response, new FutureCallback<FunctionStatus>() {
@Override
public void onFailure(Throwable throwable) {
log.info("GetFunctionStatus:", throwable);
future.completeExceptionally(throwable);
}

@Override
public void onSuccess(InstanceCommunication.FunctionStatus t) {
log.info("GetFunctionStatus: {}", t);
future.complete(t);
}
});
FunctionStatus status = future.get();

log.info("Function Status : {}", status);
}
}
Expand Up @@ -43,12 +43,13 @@ public class RuntimeSpawner implements AutoCloseable {
private Runtime runtime;
private Timer processLivenessCheckTimer;
private int numRestarts;
private Long instanceLivenessCheckFreqMs;
private long instanceLivenessCheckFreqMs;
private Exception runtimeDeathException;


public RuntimeSpawner(InstanceConfig instanceConfig,
String codeFile,
RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) {
RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
this.instanceConfig = instanceConfig;
this.runtimeFactory = containerFactory;
this.codeFile = codeFile;
Expand All @@ -63,14 +64,15 @@ public void start() throws Exception {
runtime.start();

// monitor function runtime to make sure it is running. If not, restart the function runtime
if (instanceLivenessCheckFreqMs != null) {
if (instanceLivenessCheckFreqMs > 0) {
processLivenessCheckTimer = new Timer();
processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (!runtime.isAlive()) {
log.error("Function Container is dead with exception", runtime.getDeathException());
log.error("Restarting...");
runtimeDeathException = runtime.getDeathException();
runtime.start();
numRestarts++;
}
Expand All @@ -89,8 +91,8 @@ public CompletableFuture<FunctionStatus> getFunctionStatus() {
return runtime.getFunctionStatus().thenApply(f -> {
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceId());
if (runtime.getDeathException() != null) {
builder.setFailureException(runtime.getDeathException().getMessage());
if (runtimeDeathException != null) {
builder.setFailureException(runtimeDeathException.getMessage());
}
return builder.build();
});
Expand Down
Expand Up @@ -80,14 +80,6 @@ public void start() {
public void uncaughtException(Thread t, Throwable e) {
startupException = new Exception(e);
log.error("Error occured in java instance:", e);
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
//ignore
}
// restart
start();

}
});
this.fnThread.start();
Expand Down Expand Up @@ -117,13 +109,14 @@ public void stop() {

@Override
public CompletableFuture<FunctionStatus> getFunctionStatus() {
FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
if (javaInstanceRunnable.getFailureException() != null) {
if (!isAlive()) {
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException(javaInstanceRunnable.getFailureException().getMessage());
} else {
functionStatusBuilder.setRunning(true);
functionStatusBuilder.setFailureException(getDeathException().getMessage());
return CompletableFuture.completedFuture(functionStatusBuilder.build());
}
FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
functionStatusBuilder.setRunning(true);
return CompletableFuture.completedFuture(functionStatusBuilder.build());
}

Expand All @@ -147,8 +140,6 @@ public Exception getDeathException() {
return null;
} else if (null != startupException) {
return startupException;
} else if (null != javaInstanceRunnable){
return javaInstanceRunnable.getFailureException();
} else {
return null;
}
Expand Down

0 comments on commit f307080

Please sign in to comment.