Skip to content

Commit

Permalink
[SPARK-34373][SQL] HiveThriftServer2 startWithContext may hang with a…
Browse files Browse the repository at this point in the history
… race issue

### What changes were proposed in this pull request?

fix a race issue by interrupting the thread

### Why are the changes needed?

```
21:43:26.809 WARN org.apache.thrift.server.TThreadPoolServer: Transport error occurred during acceptance of message.
org.apache.thrift.transport.TTransportException: No underlying server socket.
at org.apache.thrift.transport.TServerSocket.acceptImpl(TServerSocket.java:126)
at org.apache.thrift.transport.TServerSocket.acceptImpl(TServerSocket.java:35)
at org.apache.thrift.transport.TServerTransport.acceException in thread "Thread-15" java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
at java.io.BufferedInputStream.read(BufferedInputStream.java:336)
at java.io.FilterInputStream.read(FilterInputStream.java:107)
at scala.sys.process.BasicIO$.loop$1(BasicIO.scala:238)
at scala.sys.process.BasicIO$.transferFullyImpl(BasicIO.scala:246)
at scala.sys.process.BasicIO$.transferFully(BasicIO.scala:227)
at scala.sys.process.BasicIO$.$anonfun$toStdOut$1(BasicIO.scala:221)
```
when the TServer try to `serve` after `stop`, it hangs with the log above forever
### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

passing ci

Closes #31479 from yaooqinn/SPARK-34373.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
yaooqinn authored and HyukjinKwon committed Feb 21, 2021
1 parent fadd0f5 commit 1fac706
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@
import org.apache.thrift.TException;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;


public class ThriftBinaryCLIService extends ThriftCLIService {

protected TServer server;

public ThriftBinaryCLIService(CLIService cliService) {
super(cliService, ThriftBinaryCLIService.class.getSimpleName());
}
Expand Down Expand Up @@ -111,6 +114,13 @@ protected void initializeServer() {
}
}

@Override
protected void stopServer() {
server.stop();
server = null;
LOG.info("Thrift server has stopped");
}

@Override
public TGetQueryIdResp GetQueryId(TGetQueryIdReq req) throws TException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
Expand All @@ -61,8 +60,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
protected int portNum;
protected InetAddress serverIPAddress;
protected String hiveHost;
protected TServer server;
protected org.eclipse.jetty.server.Server httpServer;
private Thread serverThread = null;

private boolean isStarted = false;
protected boolean isEmbedded = false;
Expand Down Expand Up @@ -177,26 +175,23 @@ public synchronized void start() {
super.start();
if (!isStarted && !isEmbedded) {
initializeServer();
new Thread(this).start();
serverThread = new Thread(this);
serverThread.setName(getName());
serverThread.start();
isStarted = true;
}
}

protected abstract void stopServer();

@Override
public synchronized void stop() {
if (isStarted && !isEmbedded) {
if(server != null) {
server.stop();
LOG.info("Thrift server has stopped");
}
if((httpServer != null) && httpServer.isStarted()) {
try {
httpServer.stop();
LOG.info("Http server has stopped");
} catch (Exception e) {
LOG.error("Error stopping Http server: ", e);
}
if (serverThread != null) {
serverThread.interrupt();
serverThread = null;
}
stopServer();
isStarted = false;
}
super.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

public class ThriftHttpCLIService extends ThriftCLIService {

protected org.eclipse.jetty.server.Server httpServer;

public ThriftHttpCLIService(CLIService cliService) {
super(cliService, ThriftHttpCLIService.class.getSimpleName());
}
Expand Down Expand Up @@ -152,6 +154,19 @@ protected void initializeServer() {
}
}

@Override
protected void stopServer() {
if ((httpServer != null) && httpServer.isStarted()) {
try {
httpServer.stop();
httpServer = null;
LOG.info("Thrift HTTP server has been stopped");
} catch (Exception e) {
LOG.error("Error stopping HTTP server: ", e);
}
}
}

/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
Expand All @@ -162,10 +177,14 @@ public void run() {
try {
httpServer.join();
} catch (Throwable t) {
LOG.error(
"Error starting HiveServer2: could not start "
+ ThriftHttpCLIService.class.getSimpleName(), t);
System.exit(-1);
if (t instanceof InterruptedException) {
// This is likely a shutdown
LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down thrift server.");
} else {
LOG.error("Error starting HiveServer2: could not start "
+ ThriftHttpCLIService.class.getSimpleName(), t);
System.exit(-1);
}
}
}

Expand Down

0 comments on commit 1fac706

Please sign in to comment.