Skip to content

Commit

Permalink
HIVE-26670: Track every single HTTP request between beeline and hs2
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Oct 28, 2022
1 parent 16ce755 commit 7579d5f
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 9 deletions.
6 changes: 5 additions & 1 deletion beeline/src/main/resources/beeline-log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n

# list of all loggers
loggers = HiveConnection
loggers = HiveConnection, HiveJDBC

# HiveConnection logs useful info for dynamic service discovery
logger.HiveConnection.name = org.apache.hive.jdbc.HiveConnection
logger.HiveConnection.level = INFO

# All jdbc classes can be useful for connection issues, connection tracking
logger.HiveJDBC.name = org.apache.hive.jdbc
logger.HiveJDBC.level = INFO

# root logger
rootLogger.level = ${sys:hive.log.level}
rootLogger.appenderRefs = root
Expand Down
2 changes: 2 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ public class Constants {
public static final Pattern COMPACTION_POOLS_PATTERN = Pattern.compile("hive\\.compactor\\.worker\\.(.*)\\.threads");
public static final String HIVE_COMPACTOR_WORKER_POOL = "hive.compactor.worker.pool";

public static final String HTTP_HEADER_REQUEST_TRACK = "Request-Track";
public static final String TIME_POSTFIX_REQUEST_TRACK = "_TIME";
}
27 changes: 24 additions & 3 deletions jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@
import org.apache.hive.service.rpc.thrift.TSessionHandle;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.NoHttpResponseException;
import org.apache.http.HttpStatus;
Expand All @@ -144,13 +143,15 @@
import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.Args;
import org.apache.thrift.TBaseHelper;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Supplier;

/**
* HiveConnection.
Expand Down Expand Up @@ -559,7 +560,7 @@ private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException {
CookieStore cookieStore = isCookieEnabled ? new BasicCookieStore() : null;
HttpClientBuilder httpClientBuilder = null;
// Request interceptor for any request pre-processing logic
HttpRequestInterceptor requestInterceptor;
HttpRequestInterceptorBase requestInterceptor;
Map<String, String> additionalHttpHeaders = new HashMap<String, String>();
Map<String, String> customCookies = new HashMap<String, String>();

Expand Down Expand Up @@ -752,8 +753,12 @@ protected boolean requestIsAborted(final HttpRequest request) {
httpClientBuilder
.setRedirectStrategy(new HiveJdbcSamlRedirectStrategy(browserClient));
}

requestInterceptor.setRequestTrackingEnabled(isRequestTrackingEnabled());

// Add the request interceptor to the client builder
httpClientBuilder.addInterceptorFirst(requestInterceptor);
httpClientBuilder.addInterceptorFirst(requestInterceptor.sessionId(getSessionId()));
httpClientBuilder.addInterceptorLast(new HttpDefaultResponseInterceptor());

// Add an interceptor to add in an XSRF header
httpClientBuilder.addInterceptorLast(new XsrfHttpRequestInterceptor());
Expand Down Expand Up @@ -813,6 +818,22 @@ RegistryBuilder.<ConnectionSocketFactory> create().register("https", socketFacto
return httpClientBuilder.build();
}

private boolean isRequestTrackingEnabled() {
return Boolean.valueOf(sessConfMap.get(JdbcConnectionParams.JDBC_PARAM_REQUEST_TRACK));
}

private Supplier<String> getSessionId() {
Supplier<String> sessionId = () -> {
if (sessHandle == null) {
return "NO_SESSION";
}
StringBuilder b = new StringBuilder();
TBaseHelper.toString(sessHandle.getSessionId().bufferForGuid(), b);
return b.toString().replaceAll("\\s", "");
};
return sessionId;
}

private String getJWT() {
String jwtCredential = getJWTStringFromSession();
if (jwtCredential == null || jwtCredential.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.jdbc;

import java.io.IOException;

import org.apache.hadoop.hive.conf.Constants;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.protocol.HttpContext;

public class HttpDefaultResponseInterceptor extends HttpResponseInterceptorBase {

@Override
public void process(HttpResponse response, HttpContext context) throws HttpException, IOException {
String trackHeader = (String) context.getAttribute(Constants.HTTP_HEADER_REQUEST_TRACK);
if (trackHeader == null) {
return;
}
long elapsed = System.currentTimeMillis() - (long) context.getAttribute(trackHeader + "_TIME");
LOG.info("Response to {} in {} ms", trackHeader, elapsed);
context.removeAttribute(Constants.HTTP_HEADER_REQUEST_TRACK);
context.removeAttribute(trackHeader + "_TIME");
}
}
36 changes: 34 additions & 2 deletions jdbc/src/java/org/apache/hive/jdbc/HttpRequestInterceptorBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,32 @@
package org.apache.hive.jdbc;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.http.Header;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.client.CookieStore;
import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HttpRequestInterceptorBase implements HttpRequestInterceptor {
protected final Logger LOG = LoggerFactory.getLogger(getClass());

CookieStore cookieStore;
boolean isCookieEnabled;
String cookieName;
boolean isSSL;
Map<String, String> additionalHeaders;
Map<String, String> customCookies;
private Supplier<String> sessionId = null;
private boolean requestTrackingEnabled;
private final AtomicLong requestTrackCounter = new AtomicLong();

// Abstract function to add HttpAuth Header
protected abstract void addHttpAuthHeader(HttpRequest httpRequest, HttpContext httpContext)
Expand All @@ -45,7 +56,7 @@ public HttpRequestInterceptorBase(CookieStore cs, String cn, boolean isSSL,
this.isCookieEnabled = (cs != null);
this.cookieName = cn;
this.isSSL = isSSL;
this.additionalHeaders = additionalHeaders;
this.additionalHeaders = additionalHeaders == null ? new HashMap<>() : additionalHeaders;
this.customCookies = customCookies;
}

Expand Down Expand Up @@ -77,6 +88,14 @@ public void process(HttpRequest httpRequest, HttpContext httpContext)
if (isCookieEnabled) {
httpContext.setAttribute(Utils.HIVE_SERVER2_RETRY_KEY, Utils.HIVE_SERVER2_CONST_FALSE);
}

if (requestTrackingEnabled) {
String trackHeader = getNewTrackHeader();
LOG.info("{}:{}", Constants.HTTP_HEADER_REQUEST_TRACK, trackHeader);
additionalHeaders.put(Constants.HTTP_HEADER_REQUEST_TRACK, trackHeader);
httpContext.setAttribute(Constants.HTTP_HEADER_REQUEST_TRACK, trackHeader);
httpContext.setAttribute(trackHeader + Constants.TIME_POSTFIX_REQUEST_TRACK, System.currentTimeMillis());
}
// Insert the additional http headers
if (additionalHeaders != null) {
for (Map.Entry<String, String> entry : additionalHeaders.entrySet()) {
Expand All @@ -99,7 +118,20 @@ public void process(HttpRequest httpRequest, HttpContext httpContext)
httpRequest.addHeader("Cookie", cookieHeaderKeyValues);
}
} catch (Exception e) {
throw new HttpException(e.getMessage(), e);
throw new RuntimeException(e);
}
}

protected String getNewTrackHeader() {
return String.format("HIVE_%s_%020d", sessionId.get(), requestTrackCounter.incrementAndGet());
}

public HttpRequestInterceptor sessionId(Supplier<String> sessionId) {
this.sessionId = sessionId;
return this;
}

public void setRequestTrackingEnabled(boolean requestTrackingEnabled) {
this.requestTrackingEnabled = requestTrackingEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.jdbc;

import org.apache.http.HttpResponseInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HttpResponseInterceptorBase implements HttpResponseInterceptor {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
}
2 changes: 2 additions & 0 deletions jdbc/src/java/org/apache/hive/jdbc/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public static class JdbcConnectionParams {
static final String DEFAULT_COOKIE_NAMES_HS2 = "hive.server2.auth";
// The http header prefix for additional headers which have to be appended to the request
static final String HTTP_HEADER_PREFIX = "http.header.";
// Request tracking
static final String JDBC_PARAM_REQUEST_TRACK = "requestTrack";
// Set the fetchSize
static final String FETCH_SIZE = "fetchSize";
static final String INIT_FILE = "initFile";
Expand Down
21 changes: 19 additions & 2 deletions jdbc/src/test/org/apache/hive/jdbc/TestHiveConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@

package org.apache.hive.jdbc;

import java.io.IOException;
import java.sql.SQLException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;

public class TestHiveConnection {

private static final String EXISTING_TOKEN = "ExistingToken";
Expand Down Expand Up @@ -57,4 +59,19 @@ public void testIfGettingDelegationTokenFromCredentialWorks() throws IOException
String tokenStr = fetcher.getTokenFromCredential(creds, EXISTING_TOKEN);
Assert.assertEquals("Token string form is not as expected.", EXPECTED_TOKEN_STRING_FORM, tokenStr);
}

@Test
public void testHiveConnectionParameters() throws SQLException, ZooKeeperHiveClientException {
JdbcConnectionParams params = Utils.parseURL(
"jdbc:hive2://hello.host:10002/default;transportMode=http;httpPath=cliservice;socketTimeout=60;requestTrack=true;");

Assert.assertEquals("hello.host", params.getHost());
Assert.assertEquals("default", params.getDbName());
Assert.assertEquals(10002, params.getPort());

Assert.assertEquals("http", params.getSessionVars().get(JdbcConnectionParams.TRANSPORT_MODE));
Assert.assertEquals("cliservice", params.getSessionVars().get(JdbcConnectionParams.HTTP_PATH));
Assert.assertEquals("60", params.getSessionVars().get(JdbcConnectionParams.SOCKET_TIMEOUT));
Assert.assertEquals("true", params.getSessionVars().get(JdbcConnectionParams.JDBC_PARAM_REQUEST_TRACK));
}
}
51 changes: 51 additions & 0 deletions jdbc/src/test/org/apache/hive/jdbc/TestHttpRequestInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.jdbc;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.hive.conf.Constants;
import org.apache.http.HttpException;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.protocol.BasicHttpContext;
import org.junit.Assert;
import org.junit.Test;

public class TestHttpRequestInterceptor {

@Test
public void testRequestTrackHeader() throws HttpException, IOException {
HttpRequestInterceptorBase requestInterceptor = getInterceptor();
requestInterceptor.setRequestTrackingEnabled(true);
requestInterceptor.sessionId(() -> "sessionId");
requestInterceptor.process(new BasicHttpRequest("POST", "uri"), new BasicHttpContext());

Assert.assertTrue(requestInterceptor.additionalHeaders.containsKey(Constants.HTTP_HEADER_REQUEST_TRACK));
Assert.assertEquals("HIVE_sessionId_00000000000000000001",
requestInterceptor.additionalHeaders.get(Constants.HTTP_HEADER_REQUEST_TRACK));
}

private HttpRequestInterceptorBase getInterceptor() {
HttpRequestInterceptorBase requestInterceptor = new HttpBasicAuthInterceptor("user", "pass", new BasicCookieStore(),
"cookieName", false, new HashMap<>(), new HashMap<>());
return requestInterceptor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
Expand All @@ -45,6 +44,7 @@
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;

import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
Expand Down Expand Up @@ -153,6 +153,8 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
String clientIpAddress;
boolean requireNewCookie = false;

logTrackingHeaderIfAny(request);

try {
if (hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname,false)){
boolean continueProcessing = Utils.doXsrfFilter(request,response,null,null);
Expand Down Expand Up @@ -310,6 +312,13 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
}
}

private void logTrackingHeaderIfAny(HttpServletRequest request) {
if (request.getHeader(Constants.HTTP_HEADER_REQUEST_TRACK) != null) {
String requestTrackHeader = request.getHeader(Constants.HTTP_HEADER_REQUEST_TRACK);
LOG.info("{}:{}", Constants.HTTP_HEADER_REQUEST_TRACK, requestTrackHeader);
}
}

private String validateJWT(HttpServletRequest request, HttpServletResponse response)
throws HttpAuthenticationException {
Preconditions.checkState(jwtValidator != null, "JWT validator should have been set");
Expand Down

0 comments on commit 7579d5f

Please sign in to comment.