Skip to content

Commit

Permalink
[SPARK-31926][SQL][TESTS] Fix concurrency issue for ThriftCLIService …
Browse files Browse the repository at this point in the history
…to getPortNumber
  • Loading branch information
yaooqinn committed Jun 8, 2020
1 parent 9d5b5d0 commit 0379d6e
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 41 deletions.
Expand Up @@ -33,6 +33,8 @@ trait SharedThriftServer extends SharedSparkSession {
private var hiveServer2: HiveThriftServer2 = _
private var serverPort: Int = 0

def mode: ServerMode.Value = ServerMode.binary

override def beforeAll(): Unit = {
super.beforeAll()
// Retries up to 3 times with different port numbers if the server fails to start
Expand All @@ -53,11 +55,21 @@ trait SharedThriftServer extends SharedSparkSession {
}
}

protected def jdbcUri: String = if (mode == ServerMode.http) {
s"""jdbc:hive2://localhost:$serverPort/
|default;
|transportMode=http;
|httpPath=cliservice
""".stripMargin.split("\n").mkString.trim
} else {
s"""jdbc:hive2://localhost:$serverPort"""
}

protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
val user = System.getProperty("user.name")
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
val connections =
fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
val statements = connections.map(_.createStatement())

try {
Expand All @@ -74,6 +86,11 @@ trait SharedThriftServer extends SharedSparkSession {
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use.
// It's much more robust than set a random port generated by ourselves ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
// Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick any free port to use.
// It's much more robust than set a random port generated by ourselves ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)

hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
hiveServer2.getServices.asScala.foreach {
case t: ThriftCLIService if t.getPortNumber != 0 =>
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

class ThriftServerWithSparkContextSuite extends SharedThriftServer {
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("SPARK-29911: Uncache cached tables when session closed") {
val cacheManager = spark.sharedState.cacheManager
Expand All @@ -42,3 +42,9 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer {
}
}
}


class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite
class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite {
override def mode: ServerMode.Value = ServerMode.http
}
Expand Up @@ -19,6 +19,7 @@
package org.apache.hive.service.cli.thrift;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
Expand All @@ -29,6 +30,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
Expand All @@ -46,7 +48,7 @@ public ThriftBinaryCLIService(CLIService cliService) {
}

@Override
public void run() {
protected void initializeServer() {
try {
// Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
Expand All @@ -59,10 +61,8 @@ public void run() {
TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TServerSocket serverSocket = null;
List<String> sslVersionBlacklist = new ArrayList<String>();
for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
sslVersionBlacklist.add(sslVersion);
}
List<String> sslVersionBlacklist =
new ArrayList(Arrays.asList(hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")));
if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
serverSocket = HiveAuthUtils.getServerSocket(hiveHost, portNum);
} else {
Expand All @@ -84,9 +84,9 @@ public void run() {
// Server args
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
int requestTimeout = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
int beBackoffSlotLength = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory).transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
Expand All @@ -101,11 +101,17 @@ public void run() {
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
} catch (Exception t) {
throw new ServiceException("Error initializing " + getName(), t);
}
}

@Override
public void run() {
try {
server.serve();
} catch (Throwable t) {
LOG.error(
"Error starting HiveServer2: could not start "
+ ThriftBinaryCLIService.class.getSimpleName(), t);
LOG.error("Error starting HiveServer2: could not start " + getName(), t);
System.exit(-1);
}
}
Expand Down
Expand Up @@ -176,6 +176,7 @@ public synchronized void init(HiveConf hiveConf) {
public synchronized void start() {
super.start();
if (!isStarted && !isEmbedded) {
initializeServer();
new Thread(this).start();
isStarted = true;
}
Expand Down Expand Up @@ -670,6 +671,8 @@ public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req)
return resp;
}

protected abstract void initializeServer();

@Override
public abstract void run();

Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.rpc.thrift.TCLIService;
Expand All @@ -54,20 +55,15 @@ public ThriftHttpCLIService(CLIService cliService) {
super(cliService, ThriftHttpCLIService.class.getSimpleName());
}

/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
* e.g. http://gateway:port/hive2/servlets/thrifths2/
*/
@Override
public void run() {
protected void initializeServer() {
try {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
String threadPoolName = "HiveServer2-HttpHandler-Pool";
ThreadPoolExecutor executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);

// HTTP Server
Expand All @@ -82,10 +78,10 @@ public void run() {
if (useSsl) {
String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
if (keyStorePath.isEmpty()) {
throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ " Not configured for SSL connection");
+ " Not configured for SSL connection");
}
SslContextFactory sslContextFactory = new SslContextFactory.Server();
String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
Expand All @@ -96,25 +92,25 @@ public void run() {
sslContextFactory.setKeyStorePath(keyStorePath);
sslContextFactory.setKeyStorePassword(keyStorePassword);
connectionFactories = AbstractConnectionFactory.getFactories(
sslContextFactory, new HttpConnectionFactory());
sslContextFactory, new HttpConnectionFactory());
} else {
connectionFactories = new ConnectionFactory[] { new HttpConnectionFactory() };
}
ServerConnector connector = new ServerConnector(
httpServer,
null,
// Call this full constructor to set this, which forces daemon threads:
new ScheduledExecutorScheduler("HiveServer2-HttpHandler-JettyScheduler", true),
null,
-1,
-1,
connectionFactories);
httpServer,
null,
// Call this full constructor to set this, which forces daemon threads:
new ScheduledExecutorScheduler("HiveServer2-HttpHandler-JettyScheduler", true),
null,
-1,
-1,
connectionFactories);

connector.setPort(portNum);
// Linux:yes, Windows:no
connector.setReuseAddress(!Shell.WINDOWS);
int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS);
connector.setIdleTimeout(maxIdleTime);

httpServer.addConnector(connector);
Expand All @@ -130,14 +126,14 @@ public void run() {
UserGroupInformation httpUGI = cliService.getHttpUGI();
String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
serviceUGI, httpUGI, hiveAuthFactory);
serviceUGI, httpUGI, hiveAuthFactory);

// Context handler
final ServletContextHandler context = new ServletContextHandler(
ServletContextHandler.SESSIONS);
ServletContextHandler.SESSIONS);
context.setContextPath("/");
String httpPath = getHttpPath(hiveConf
.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
httpServer.setHandler(context);
context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);

Expand All @@ -147,15 +143,26 @@ public void run() {
// In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with
// 0 which represents any free port, we should set it to the actual one
portNum = connector.getLocalPort();
String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+ " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+ maxWorkerThreads + " worker threads";
String msg = "Started " + getName() + " in " + schemeName + " mode on port " + portNum
+ " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads
+ " worker threads";
LOG.info(msg);
} catch (Exception t) {
throw new ServiceException("Error initializing " + getName(), t);
}
}

/**
* Configure Jetty to serve http requests. Example of a client connection URL:
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
* e.g. http://gateway:port/hive2/servlets/thrifths2/
*/
@Override
public void run() {
try {
httpServer.join();
} catch (Throwable t) {
LOG.error(
"Error starting HiveServer2: could not start "
+ ThriftHttpCLIService.class.getSimpleName(), t);
LOG.error("Error starting HiveServer2: could not start " + getName(), t);
System.exit(-1);
}
}
Expand Down

0 comments on commit 0379d6e

Please sign in to comment.