Skip to content

Commit

Permalink
[HIVE-26336] Introduce connectTimeout as JDBC url parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Jul 15, 2022
1 parent b51dcfe commit cdf1872
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,20 @@
public class HiveAuthUtils {
private static final Logger LOG = LoggerFactory.getLogger(HiveAuthUtils.class);

public static TTransport getSocketTransport(String host, int port, int loginTimeout) throws TTransportException {
return new TSocket(new TConfiguration(),host, port, loginTimeout);
public static TTransport getSocketTransport(String host, int port, int connectTimeout, int socketTimeout)
throws TTransportException {
return new TSocket(new TConfiguration(),host, port, socketTimeout, connectTimeout);
}

public static TTransport getSSLSocket(String host, int port, int loginTimeout)
public static TTransport getSSLSocket(String host, int port, int connectTimeout, int socketTimeout)
throws TTransportException {
// The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout);
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout);
tSSLSocket.setConnectTimeout(connectTimeout);
return getSSLSocketWithHttps(tSSLSocket);
}

public static TTransport getSSLSocket(String host, int port, int loginTimeout,
public static TTransport getSSLSocket(String host, int port, int connectTimeout, int socketTimeout,
String trustStorePath, String trustStorePassWord, String trustStoreType,
String trustStoreAlgorithm) throws TTransportException {
TSSLTransportFactory.TSSLTransportParameters params =
Expand All @@ -73,7 +75,8 @@ public static TTransport getSSLSocket(String host, int port, int loginTimeout,
params.requireClientAuth(true);
// The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT and
// SSLContext created with the given params
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout, params);
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout, params);
tSSLSocket.setConnectTimeout(connectTimeout);
return getSSLSocketWithHttps(tSSLSocket);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.jdbc;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.sql.DriverManager;
import java.util.HashMap;

import static org.junit.Assert.assertEquals;

public class TestJdbcTimeout {

private static MiniHS2 miniHS2 = null;

@BeforeClass
public static void beforeTest() throws Exception {
Class.forName(MiniHS2.getJdbcDriverName());
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager");
miniHS2 = new MiniHS2.Builder().withConf(conf).build();
miniHS2.start(new HashMap<>());
}

@AfterClass
public static void afterTest() throws Exception {
if (miniHS2 != null && miniHS2.isStarted()) {
miniHS2.stop();
}
}

@Test
public void testConfigureTimeoutThroughJdbcUrl() throws Exception {
String url1 = miniHS2.getJdbcURL("default", "connectTimeout=20000");
try (HiveConnection conn = (HiveConnection) DriverManager.getConnection(url1)) {
assertEquals(20000, conn.getConnectTimeout());
assertEquals(0, conn.getSocketTimeout());
}
String url2 = miniHS2.getJdbcURL("default", "socketTimeout=10000");
try (HiveConnection conn = (HiveConnection) DriverManager.getConnection(url2)) {
assertEquals(0, conn.getConnectTimeout());
assertEquals(10000, conn.getSocketTimeout());
}
String url3 = miniHS2.getJdbcURL("default", "connectTimeout=20000;socketTimeout=10000");
try (HiveConnection conn = (HiveConnection) DriverManager.getConnection(url3)) {
assertEquals(20000, conn.getConnectTimeout());
assertEquals(10000, conn.getSocketTimeout());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void setUp() throws Exception {

@Test
public void testExecuteReturnWithInfoMessage() throws Exception {
TTransport transport = HiveAuthUtils.getSocketTransport(host, cliPort, 0);
TTransport transport = HiveAuthUtils.getSocketTransport(host, cliPort, 0, 0);
try {
transport.open();
TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private TCLIService.Client getClient(TTransport transport) throws Exception {
}

private TTransport getRawBinaryTransport() throws Exception {
return HiveAuthUtils.getSocketTransport(ThriftCLIServiceTest.host, ThriftCLIServiceTest.port, 0);
return HiveAuthUtils.getSocketTransport(ThriftCLIServiceTest.host, ThriftCLIServiceTest.port, 0, 0);
}

private static TTransport getHttpTransport() throws Exception {
Expand Down
67 changes: 39 additions & 28 deletions jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -172,7 +171,8 @@ public class HiveConnection implements java.sql.Connection {
private SQLWarning warningChain = null;
private TSessionHandle sessHandle = null;
private final List<TProtocolVersion> supportedProtocols = new LinkedList<TProtocolVersion>();
private int loginTimeout = 0;
private int connectTimeout = 0;
private int socketTimeout = 0;
private TProtocolVersion protocol;
private final int initFetchSize;
private int defaultFetchSize;
Expand Down Expand Up @@ -304,7 +304,7 @@ protected HiveConnection(String uri, Properties info,
// hive_conf_list -> hiveConfMap
// hive_var_list -> hiveVarMap
sessConfMap = connParams.getSessionVars();
setupLoginTimeout();
setupTimeout();
if (isKerberosAuthMode()) {
host = Utils.getCanonicalHostName(connParams.getHost());
} else if (isBrowserAuthMode() && !isHttpTransportMode()) {
Expand Down Expand Up @@ -509,6 +509,14 @@ private void openTransport() throws Exception {
logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort());
}

public int getConnectTimeout() {
return connectTimeout;
}

public int getSocketTimeout() {
return socketTimeout;
}

public String getConnectedUrl() {
return jdbcUriString;
}
Expand Down Expand Up @@ -759,9 +767,9 @@ protected boolean requestIsAborted(final HttpRequest request) {

// set the specified timeout (socketTimeout jdbc param) for http connection as well
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(loginTimeout * 1000)
.setConnectionRequestTimeout(loginTimeout * 1000)
.setSocketTimeout(loginTimeout * 1000).build();
.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectTimeout)
.setSocketTimeout(socketTimeout).build();
httpClientBuilder.setDefaultRequestConfig(config);

// Configure http client for SSL
Expand Down Expand Up @@ -864,7 +872,7 @@ private TTransport createUnderlyingTransport() throws TTransportException {
JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);

if (sslTrustStore == null || sslTrustStore.isEmpty()) {
transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout);
transport = HiveAuthUtils.getSSLSocket(host, port, connectTimeout, socketTimeout);
} else {
String trustStoreType =
sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_TYPE);
Expand All @@ -876,12 +884,12 @@ private TTransport createUnderlyingTransport() throws TTransportException {
if (trustStoreAlgorithm == null) {
trustStoreAlgorithm = "";
}
transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout,
transport = HiveAuthUtils.getSSLSocket(host, port, connectTimeout, socketTimeout,
sslTrustStore, sslTrustStorePassword, trustStoreType, trustStoreAlgorithm);
}
} else {
// get non-SSL socket transport
transport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout);
transport = HiveAuthUtils.getSocketTransport(host, port, connectTimeout, socketTimeout);
}
return transport;
}
Expand Down Expand Up @@ -1302,10 +1310,7 @@ private boolean disableSSLValidation() {

private boolean isHttpTransportMode() {
String transportMode = sessConfMap.get(JdbcConnectionParams.TRANSPORT_MODE);
if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
return true;
}
return false;
return "http".equalsIgnoreCase(transportMode);
}

private void logZkDiscoveryMessage(String message) {
Expand All @@ -1329,21 +1334,27 @@ private String getSessionValue(String varName, String varDefault) {
return varValue;
}

// use socketTimeout from jdbc connection url. Thrift timeout needs to be in millis
private void setupLoginTimeout() {
String socketTimeoutStr = sessConfMap.getOrDefault(JdbcConnectionParams.SOCKET_TIMEOUT, "0");
long timeOut = 0;
try {
timeOut = Long.parseLong(socketTimeoutStr);
} catch (NumberFormatException e) {
LOG.info("Failed to parse socketTimeout of value " + socketTimeoutStr);
}
if (timeOut > Integer.MAX_VALUE) {
loginTimeout = Integer.MAX_VALUE;
} else if (timeOut < 0) {
loginTimeout = 0;
} else {
loginTimeout = (int) timeOut;
private void setupTimeout() {
if (sessConfMap.containsKey(JdbcConnectionParams.CONNECT_TIMEOUT)) {
long connectTimeoutMs = 0;

String loginTimeoutStr = sessConfMap.getOrDefault(JdbcConnectionParams.CONNECT_TIMEOUT, "0");
try {
connectTimeoutMs = Long.parseLong(loginTimeoutStr);
} catch (NumberFormatException e) {
LOG.info("Failed to parse connectTimeout of value " + loginTimeoutStr);
}
connectTimeout = (int) Math.max(0, Math.min(connectTimeoutMs, Integer.MAX_VALUE));
}
if (sessConfMap.containsKey(JdbcConnectionParams.SOCKET_TIMEOUT)) {
long socketTimeoutMs = 0;
String socketTimeoutStr = sessConfMap.get(JdbcConnectionParams.SOCKET_TIMEOUT);
try {
socketTimeoutMs = Long.parseLong(socketTimeoutStr);
} catch (NumberFormatException e) {
LOG.info("Failed to parse socketTimeout of value " + socketTimeoutStr);
}
socketTimeout = (int) Math.max(0, Math.min(socketTimeoutMs, Integer.MAX_VALUE));
}
}

Expand Down
2 changes: 2 additions & 0 deletions jdbc/src/java/org/apache/hive/jdbc/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ public static class JdbcConnectionParams {
static final String HTTP_COOKIE_PREFIX = "http.cookie.";
// Create external purge table by default
static final String CREATE_TABLE_AS_EXTERNAL = "hiveCreateAsExternalLegacy";

public static final String CONNECT_TIMEOUT = "connectTimeout";
public static final String SOCKET_TIMEOUT = "socketTimeout";

// We support ways to specify application name modeled after some existing DBs, since
Expand Down

0 comments on commit cdf1872

Please sign in to comment.