Skip to content

Commit

Permalink
[LIVY-466][RSC] Fix RSCDriver exception during RPC shutdown
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

During RSCDriver's shutdown, it will first shutdown RPC server, and then all the RPC clients. When RPC client is closed, it will register a timeout to avoid orphaned RSCDriver, but this is not necessary during RSCDriver's shutdown, so here fixing this issue. The details can be seen in [JIRA](https://issues.apache.org/jira/browse/LIVY-466).

## How was this patch tested?

Local verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #90 from jerryshao/LIVY-466.
  • Loading branch information
jerryshao committed May 3, 2018
1 parent 9d381bd commit e3f45a0
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.channel.ChannelHandler.Sharable;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class RSCDriver extends BaseProtocol {
protected final RSCConf livyConf;

private final AtomicReference<ScheduledFuture<?>> idleTimeout;
private final AtomicBoolean inShutdown;

public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwx------");
Expand All @@ -110,6 +112,7 @@ public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
this.activeJobs = new ConcurrentHashMap<>();
this.bypassJobs = new ConcurrentLinkedDeque<>();
this.idleTimeout = new AtomicReference<>();
this.inShutdown = new AtomicBoolean(false);
}

private synchronized void shutdown() {
Expand Down Expand Up @@ -217,7 +220,9 @@ private void registerClient(final Rpc client) {
@Override
public void onSuccess(Void unused) {
clients.remove(client);
setupIdleTimeout();
if (!inShutdown.get()) {
setupIdleTimeout();
}
}
});
LOG.debug("Registered new connection from {}.", client.getChannel());
Expand Down Expand Up @@ -304,6 +309,7 @@ protected void shutdownContext() {
}

private void shutdownServer() {
inShutdown.compareAndSet(false, true);
if (server != null) {
server.close();
}
Expand Down

0 comments on commit e3f45a0

Please sign in to comment.