From 0379d6ede7f0918ac934a8cc8f520f8953b326ea Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 8 Jun 2020 16:31:31 +0800 Subject: [PATCH] [SPARK-31926][SQL][TESTS] Fix concurrency issue for ThriftCLIService to getPortNumber --- .../thriftserver/SharedThriftServer.scala | 19 +++++- .../ThriftServerWithSparkContextSuite.scala | 8 ++- .../cli/thrift/ThriftBinaryCLIService.java | 26 +++++--- .../service/cli/thrift/ThriftCLIService.java | 3 + .../cli/thrift/ThriftHttpCLIService.java | 65 ++++++++++--------- 5 files changed, 80 insertions(+), 41 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index e002bc0117c8..9f24367193b8 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -33,6 +33,8 @@ trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ private var serverPort: Int = 0 + def mode: ServerMode.Value = ServerMode.binary + override def beforeAll(): Unit = { super.beforeAll() // Retries up to 3 times with different port numbers if the server fails to start @@ -53,11 +55,21 @@ trait SharedThriftServer extends SharedSparkSession { } } + protected def jdbcUri: String = if (mode == ServerMode.http) { + s"""jdbc:hive2://localhost:$serverPort/ + |default; + |transportMode=http; + |httpPath=cliservice + """.stripMargin.split("\n").mkString.trim + } else { + s"""jdbc:hive2://localhost:$serverPort""" + } + protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = { val user = System.getProperty("user.name") require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2") val connections = - fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") } + fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } val statements = connections.map(_.createStatement()) try { @@ -74,6 +86,11 @@ trait SharedThriftServer extends SharedSparkSession { // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use. // It's much more robust than set a random port generated by ourselves ahead sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") + // Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick any free port to use. + // It's much more robust than set a random port generated by ourselves ahead + sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0") + sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString) + hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) hiveServer2.getServices.asScala.foreach { case t: ThriftCLIService if t.getPortNumber != 0 => diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 3e1fce78ae71..dc6a1a207c76 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver -class ThriftServerWithSparkContextSuite extends SharedThriftServer { +trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("SPARK-29911: Uncache cached tables when session closed") { val cacheManager = spark.sharedState.cacheManager @@ -42,3 +42,9 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer { } } } + + +class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite +class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite { + override def mode: ServerMode.Value = ServerMode.http +} diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index a7de9c0f3d0d..b3001627b3d4 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.thrift; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -29,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; @@ -46,7 +48,7 @@ public ThriftBinaryCLIService(CLIService cliService) { } @Override - public void run() { + protected void initializeServer() { try { // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; @@ -59,10 +61,8 @@ public void run() { TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); TServerSocket serverSocket = null; - List sslVersionBlacklist = new ArrayList(); - for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) { - sslVersionBlacklist.add(sslVersion); - } + List sslVersionBlacklist = + new ArrayList(Arrays.asList(hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(","))); if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { serverSocket = HiveAuthUtils.getServerSocket(hiveHost, portNum); } else { @@ -84,9 +84,9 @@ public void run() { // Server args int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); int requestTimeout = (int) hiveConf.getTimeVar( - HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS); + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS); int beBackoffSlotLength = (int) hiveConf.getTimeVar( - HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS); + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS); TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket) .processorFactory(processorFactory).transportFactory(transportFactory) .protocolFactory(new TBinaryProtocol.Factory()) @@ -101,11 +101,17 @@ public void run() { String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + @Override + public void run() { + try { server.serve(); } catch (Throwable t) { - LOG.error( - "Error starting HiveServer2: could not start " - + ThriftBinaryCLIService.class.getSimpleName(), t); + LOG.error("Error starting HiveServer2: could not start " + getName(), t); System.exit(-1); } } diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index d41c3b493bb4..e46799a1c427 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -176,6 +176,7 @@ public synchronized void init(HiveConf hiveConf) { public synchronized void start() { super.start(); if (!isStarted && !isEmbedded) { + initializeServer(); new Thread(this).start(); isStarted = true; } @@ -670,6 +671,8 @@ public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req) return resp; } + protected abstract void initializeServer(); + @Override public abstract void run(); diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 73d5f84476af..e75c197e6a6f 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.rpc.thrift.TCLIService; @@ -54,20 +55,15 @@ public ThriftHttpCLIService(CLIService cliService) { super(cliService, ThriftHttpCLIService.class.getSimpleName()); } - /** - * Configure Jetty to serve http requests. Example of a client connection URL: - * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, - * e.g. http://gateway:port/hive2/servlets/thrifths2/ - */ @Override - public void run() { + protected void initializeServer() { try { // Server thread pool // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests String threadPoolName = "HiveServer2-HttpHandler-Pool"; ThreadPoolExecutor executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, - workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), - new ThreadFactoryWithGarbageCleanup(threadPoolName)); + workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); // HTTP Server @@ -82,10 +78,10 @@ public void run() { if (useSsl) { String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, - HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); + HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); if (keyStorePath.isEmpty()) { throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname - + " Not configured for SSL connection"); + + " Not configured for SSL connection"); } SslContextFactory sslContextFactory = new SslContextFactory.Server(); String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(","); @@ -96,25 +92,25 @@ public void run() { sslContextFactory.setKeyStorePath(keyStorePath); sslContextFactory.setKeyStorePassword(keyStorePassword); connectionFactories = AbstractConnectionFactory.getFactories( - sslContextFactory, new HttpConnectionFactory()); + sslContextFactory, new HttpConnectionFactory()); } else { connectionFactories = new ConnectionFactory[] { new HttpConnectionFactory() }; } ServerConnector connector = new ServerConnector( - httpServer, - null, - // Call this full constructor to set this, which forces daemon threads: - new ScheduledExecutorScheduler("HiveServer2-HttpHandler-JettyScheduler", true), - null, - -1, - -1, - connectionFactories); + httpServer, + null, + // Call this full constructor to set this, which forces daemon threads: + new ScheduledExecutorScheduler("HiveServer2-HttpHandler-JettyScheduler", true), + null, + -1, + -1, + connectionFactories); connector.setPort(portNum); // Linux:yes, Windows:no connector.setReuseAddress(!Shell.WINDOWS); int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); connector.setIdleTimeout(maxIdleTime); httpServer.addConnector(connector); @@ -130,14 +126,14 @@ public void run() { UserGroupInformation httpUGI = cliService.getHttpUGI(); String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType, - serviceUGI, httpUGI, hiveAuthFactory); + serviceUGI, httpUGI, hiveAuthFactory); // Context handler final ServletContextHandler context = new ServletContextHandler( - ServletContextHandler.SESSIONS); + ServletContextHandler.SESSIONS); context.setContextPath("/"); String httpPath = getHttpPath(hiveConf - .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); + .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); httpServer.setHandler(context); context.addServlet(new ServletHolder(thriftHttpServlet), httpPath); @@ -147,15 +143,26 @@ public void run() { // In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with // 0 which represents any free port, we should set it to the actual one portNum = connector.getLocalPort(); - String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName - + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." - + maxWorkerThreads + " worker threads"; + String msg = "Started " + getName() + " in " + schemeName + " mode on port " + portNum + + " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + + " worker threads"; LOG.info(msg); + } catch (Exception t) { + throw new ServiceException("Error initializing " + getName(), t); + } + } + + /** + * Configure Jetty to serve http requests. Example of a client connection URL: + * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, + * e.g. http://gateway:port/hive2/servlets/thrifths2/ + */ + @Override + public void run() { + try { httpServer.join(); } catch (Throwable t) { - LOG.error( - "Error starting HiveServer2: could not start " - + ThriftHttpCLIService.class.getSimpleName(), t); + LOG.error("Error starting HiveServer2: could not start " + getName(), t); System.exit(-1); } }