37
37
import java .sql .*;
38
38
import java .util .*;
39
39
import java .util .Map .Entry ;
40
- import java .util .concurrent .TimeUnit ;
41
40
import java .util .concurrent .locks .ReentrantLock ;
42
41
import javax .net .ssl .KeyManagerFactory ;
43
42
import javax .net .ssl .SSLContext ;
51
50
import org .apache .http .NoHttpResponseException ;
52
51
import org .apache .http .client .CookieStore ;
53
52
import org .apache .http .client .ServiceUnavailableRetryStrategy ;
53
+ import org .apache .http .client .config .RequestConfig ;
54
54
import org .apache .http .config .Registry ;
55
55
import org .apache .http .config .RegistryBuilder ;
56
56
import org .apache .http .conn .socket .ConnectionSocketFactory ;
@@ -95,7 +95,8 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
95
95
private SQLWarning warningChain = null ;
96
96
private TSessionHandle sessHandle = null ;
97
97
private final List <TProtocolVersion > supportedProtocols = new LinkedList <>();
98
- private int loginTimeout = 0 ;
98
+ private int connectTimeout = 0 ;
99
+ private int socketTimeout = 0 ;
99
100
private TProtocolVersion protocol ;
100
101
private int fetchSize = KyuubiStatement .DEFAULT_FETCH_SIZE ;
101
102
private String initFile = null ;
@@ -123,13 +124,12 @@ public static List<JdbcConnectionParams> getAllUrls(String zookeeperBasedHS2Url)
123
124
}
124
125
125
126
public KyuubiConnection (String uri , Properties info ) throws SQLException {
126
- setupLoginTimeout ( );
127
+ isBeeLineMode = Boolean . parseBoolean ( info . getProperty ( BEELINE_MODE_PROPERTY ) );
127
128
try {
128
129
connParams = Utils .parseURL (uri , info );
129
130
} catch (ZooKeeperHiveClientException e ) {
130
131
throw new KyuubiSQLException (e );
131
132
}
132
- isBeeLineMode = Boolean .parseBoolean (info .getProperty (BEELINE_MODE_PROPERTY ));
133
133
jdbcUriString = connParams .getJdbcUriString ();
134
134
// JDBC URL: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
135
135
// each list: <key1>=<val1>;<key2>=<val2> and so on
@@ -140,6 +140,8 @@ public KyuubiConnection(String uri, Properties info) throws SQLException {
140
140
port = connParams .getPort ();
141
141
sessConfMap = connParams .getSessionVars ();
142
142
143
+ setupTimeout ();
144
+
143
145
if (sessConfMap .containsKey (FETCH_SIZE )) {
144
146
fetchSize = Integer .parseInt (sessConfMap .get (FETCH_SIZE ));
145
147
}
@@ -153,17 +155,7 @@ public KyuubiConnection(String uri, Properties info) throws SQLException {
153
155
}
154
156
155
157
// add supported protocols
156
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V1 );
157
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V2 );
158
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V3 );
159
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V4 );
160
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V5 );
161
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V6 );
162
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V7 );
163
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V8 );
164
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V9 );
165
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V10 );
166
- supportedProtocols .add (TProtocolVersion .HIVE_CLI_SERVICE_PROTOCOL_V11 );
158
+ Collections .addAll (supportedProtocols , TProtocolVersion .values ());
167
159
168
160
int maxRetries = 1 ;
169
161
try {
@@ -471,6 +463,15 @@ private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException {
471
463
customCookies );
472
464
}
473
465
HttpClientBuilder httpClientBuilder = HttpClientBuilder .create ();
466
+
467
+ // Set timeout
468
+ RequestConfig config =
469
+ RequestConfig .custom ()
470
+ .setConnectTimeout (connectTimeout )
471
+ .setSocketTimeout (socketTimeout )
472
+ .build ();
473
+ httpClientBuilder .setDefaultRequestConfig (config );
474
+
474
475
// Configure http client for cookie based authentication
475
476
if (isCookieEnabled ) {
476
477
// Create a http client with a retry mechanism when the server returns a status code of 401.
@@ -576,15 +577,15 @@ private TTransport createUnderlyingTransport() throws TTransportException {
576
577
String sslTrustStorePassword = sessConfMap .get (SSL_TRUST_STORE_PASSWORD );
577
578
578
579
if (sslTrustStore == null || sslTrustStore .isEmpty ()) {
579
- transport = ThriftUtils .getSSLSocket (host , port , loginTimeout );
580
+ transport = ThriftUtils .getSSLSocket (host , port , connectTimeout , socketTimeout );
580
581
} else {
581
582
transport =
582
583
ThriftUtils .getSSLSocket (
583
- host , port , loginTimeout , sslTrustStore , sslTrustStorePassword );
584
+ host , port , connectTimeout , socketTimeout , sslTrustStore , sslTrustStorePassword );
584
585
}
585
586
} else {
586
587
// get non-SSL socket transport
587
- transport = ThriftUtils .getSocketTransport (host , port , loginTimeout );
588
+ transport = ThriftUtils .getSocketTransport (host , port , connectTimeout , socketTimeout );
588
589
}
589
590
return transport ;
590
591
}
@@ -856,13 +857,24 @@ private String getSessionValue(String varName, String varDefault) {
856
857
return varValue ;
857
858
}
858
859
859
- // copy loginTimeout from driver manager. Thrift timeout needs to be in millis
860
- private void setupLoginTimeout () {
861
- long timeOut = TimeUnit .SECONDS .toMillis (DriverManager .getLoginTimeout ());
862
- if (timeOut > Integer .MAX_VALUE ) {
863
- loginTimeout = Integer .MAX_VALUE ;
864
- } else {
865
- loginTimeout = (int ) timeOut ;
860
+ private void setupTimeout () {
861
+ if (sessConfMap .containsKey (CONNECT_TIMEOUT )) {
862
+ String connectTimeoutStr = sessConfMap .get (CONNECT_TIMEOUT );
863
+ try {
864
+ long connectTimeoutMs = Long .parseLong (connectTimeoutStr );
865
+ connectTimeout = (int ) Math .max (0 , Math .min (connectTimeoutMs , Integer .MAX_VALUE ));
866
+ } catch (NumberFormatException e ) {
867
+ LOG .info ("Failed to parse connectTimeout of value " + connectTimeoutStr );
868
+ }
869
+ }
870
+ if (sessConfMap .containsKey (SOCKET_TIMEOUT )) {
871
+ String socketTimeoutStr = sessConfMap .get (SOCKET_TIMEOUT );
872
+ try {
873
+ long socketTimeoutMs = Long .parseLong (socketTimeoutStr );
874
+ socketTimeout = (int ) Math .max (0 , Math .min (socketTimeoutMs , Integer .MAX_VALUE ));
875
+ } catch (NumberFormatException e ) {
876
+ LOG .info ("Failed to parse socketTimeout of value " + socketTimeoutStr );
877
+ }
866
878
}
867
879
}
868
880
0 commit comments