Skip to content

Commit

Permalink
[ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown
Browse files Browse the repository at this point in the history
### What is this PR for?
There is the chance to have a RemoteServerInterpreter hang forever during shutdown

### What type of PR is it?
[Bug Fix]

### What is the Jira issue?
[ZEPPELIN-2502]

### How should this be tested?
Unit test provided for the fix.

### Questions:
* Is there breaking changes for older versions?
* Does this needs documentation?

Author: andrea <andrea.peruffo1982@gmail.com>

Closes #2322 from andreaTP/processHang and squashes the following commits:

e58483e [andrea] [ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown

(cherry picked from commit c7c9aa1)
Signed-off-by: Lee moon soo <moon@apache.org>
  • Loading branch information
andreaTP authored and Leemoonsoo committed May 20, 2017
1 parent 9cccd86 commit 95bf617
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,18 @@ public void onMetaInfosReceived(Map<String, String> infos) {
/**
* Wait for eventQueue becomes empty
*/
public void waitForEventQueueBecomesEmpty() {
public void waitForEventQueueBecomesEmpty(long atMost) {
long startTime = System.currentTimeMillis();
synchronized (eventQueue) {
while (!eventQueue.isEmpty()) {
while (!eventQueue.isEmpty() && (System.currentTimeMillis() - startTime) < atMost) {
try {
eventQueue.wait(100);
} catch (InterruptedException e) {
// ignore exception
}
}
if (!eventQueue.isEmpty())
eventQueue.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class RemoteInterpreterServer
private Map<String, Object> remoteWorksResponsePool;
private ZeppelinRemoteWorksController remoteWorksController;

private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;

public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;

Expand All @@ -98,7 +100,7 @@ public void run() {

@Override
public void shutdown() throws TException {
eventClient.waitForEventQueueBecomesEmpty();
eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
if (interpreterGroup != null) {
interpreterGroup.close();
}
Expand All @@ -110,7 +112,8 @@ public void shutdown() throws TException {
// this case, need to force kill the process

long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 2000 && server.isServing()) {
while (System.currentTimeMillis() - startTime < DEFAULT_SHUTDOWN_TIMEOUT &&
server.isServing()) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
Expand Down Expand Up @@ -72,5 +75,65 @@ public void testStartStop() throws InterruptedException, IOException, TException
assertEquals(false, running);
}

class ShutdownRun implements Runnable {
private RemoteInterpreterServer serv = null;
public ShutdownRun(RemoteInterpreterServer serv) {
this.serv = serv;
}
@Override
public void run() {
try {
serv.shutdown();
} catch (Exception ex) {};
}
};

@Test
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer(
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
assertEquals(false, server.isRunning());

server.start();
long startTime = System.currentTimeMillis();
boolean running = false;

while (System.currentTimeMillis() - startTime < 10 * 1000) {
if (server.isRunning()) {
running = true;
break;
} else {
Thread.sleep(200);
}
}

assertEquals(true, running);
assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort()));

//just send an event on the client queue
server.eventClient.onAppStatusUpdate("","","","");

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

Runnable task = new ShutdownRun(server);

executor.schedule(task, 0, TimeUnit.MILLISECONDS);

while (System.currentTimeMillis() - startTime < 10 * 1000) {
if (server.isRunning()) {
Thread.sleep(200);
} else {
running = false;
break;
}
}

executor.shutdown();

//cleanup environment for next tests
server.shutdown();

assertEquals(false, running);
}

}

0 comments on commit 95bf617

Please sign in to comment.