Skip to content
Open
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@
<version>3.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>connect-utils</artifactId>
<version>0.3.1</version>
</dependency>
</dependencies>

<reporting>
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/splunk/hecclient/Hec.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.security.KeyManagementException;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
Expand Down Expand Up @@ -196,7 +197,7 @@ public static HecAckPoller createPoller(HecConfig config, PollerCallback callbac
public Hec(HecConfig config, CloseableHttpClient httpClient, Poller poller, LoadBalancerInf loadBalancer) {
for (int i = 0; i < config.getTotalChannels(); ) {
for (String uri : config.getUris()) {
Indexer indexer = new Indexer(uri, config.getToken(), httpClient, poller);
Indexer indexer = new Indexer(uri, httpClient, poller, config);
indexer.setKeepAlive(config.getHttpKeepAlive());
loadBalancer.add(indexer.getChannel().setTracking(config.getEnableChannelTracking()));
i++;
Expand Down Expand Up @@ -263,7 +264,13 @@ public final void close() {
*/
public static CloseableHttpClient createHttpClient(final HecConfig config) {
int poolSizePerDest = config.getMaxHttpConnectionPerChannel();

if (config.kerberosAuthEnabled()) {
try {
return new HttpClientBuilder().buildKerberosClient();
} catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException ex) {
throw new ConnectException("Unable to build Kerberos Client", ex);
}
}
// Code block for default client construction
if(!config.getHasCustomTrustStore() &&
StringUtils.isBlank(config.getTrustStorePath()) &&
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public final class HecConfig {
private boolean hasCustomTrustStore = false;
private String trustStorePath;
private String trustStorePassword;
private String kerberosPrincipal;
private String kerberosUser;
private String kerberosKeytabLocation;

public HecConfig(List<String> uris, String token) {
this.uris = uris;
Expand Down Expand Up @@ -155,8 +158,39 @@ public HecConfig setHasCustomTrustStore(boolean hasStore) {
return this;
}

public String kerberosPrincipal() {
return kerberosPrincipal;
}

public HecConfig setKerberosPrincipal(String kerberosPrincipal) {
this.kerberosPrincipal = kerberosPrincipal;
return this;
}

public String kerberosUser() {
return kerberosUser;
}

public HecConfig setKerberosUser(String kerberosUser) {
this.kerberosUser = kerberosUser;
return this;
}

public String kerberosKeytabLocation() {
return kerberosKeytabLocation;
}

public HecConfig setKerberosKeytabLocation(String kerberosKeytabLocation) {
this.kerberosKeytabLocation = kerberosKeytabLocation;
return this;
}

public HecConfig setEnableChannelTracking(boolean trackChannel) {
enableChannelTracking = trackChannel;
return this;
}

public boolean kerberosAuthEnabled() {
return !kerberosPrincipal().isEmpty();
}
}
42 changes: 42 additions & 0 deletions src/main/java/com/splunk/hecclient/HttpClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@
*/
package com.splunk.hecclient;

import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.Principal;
import java.security.cert.X509Certificate;

public final class HttpClientBuilder {
Expand Down Expand Up @@ -87,6 +100,35 @@ public CloseableHttpClient build() {
.build();
}

public CloseableHttpClient buildKerberosClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
org.apache.http.impl.client.HttpClientBuilder builder =
org.apache.http.impl.client.HttpClientBuilder.create();
Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create().
register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build();
builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() {
@Override
public Principal getUserPrincipal() {
return null;
}
@Override
public String getPassword() {
return null;
}
});
builder.setDefaultCredentialsProvider(credentialsProvider);
SSLContextBuilder sslContextBuilderbuilder = new SSLContextBuilder();
sslContextBuilderbuilder.loadTrustMaterial(null, (chain, authType) -> true);
SSLConnectionSocketFactory sslsf = new
SSLConnectionSocketFactory(
sslContextBuilderbuilder.build(), NoopHostnameVerifier.INSTANCE);

builder.setSSLSocketFactory(sslsf);
CloseableHttpClient httpClient = builder.build();
return httpClient;
}

private SSLConnectionSocketFactory getSSLConnectionFactory() {
if (disableSSLCertVerification) {
return getUnsecureSSLConnectionSocketFactory();
Expand Down
91 changes: 80 additions & 11 deletions src/main/java/com/splunk/hecclient/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package com.splunk.hecclient;

import com.splunk.kafka.connect.SplunkSinkConnectorConfig;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
Expand All @@ -25,15 +27,27 @@
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

final class Indexer implements IndexerInf {
private static final Logger log = LoggerFactory.getLogger(Indexer.class);

private HecConfig hecConfig;
private Configuration config;
private CloseableHttpClient httpClient;
private HttpContext context;
private String baseUrl;
Expand All @@ -47,14 +61,18 @@ final class Indexer implements IndexerInf {
private long backPressureThreshhold = 60 * 1000; // 1 min

// Indexer doesn't own client, ack poller
public Indexer(String baseUrl, String hecToken, CloseableHttpClient client, Poller poller) {
public Indexer(
String baseUrl,
CloseableHttpClient client,
Poller poller,
HecConfig config) {
this.httpClient = client;
this.baseUrl = baseUrl;
this.hecToken = hecToken;
this.hecToken = config.getToken();
this.poller = poller;
this.context = HttpClientContext.create();
backPressure = 0;

this.hecConfig = config;
channel = new HecChannel(this);

// Init headers
Expand Down Expand Up @@ -137,17 +155,68 @@ public boolean send(final EventBatch batch) {
@Override
public synchronized String executeHttpRequest(final HttpUriRequest req) {
CloseableHttpResponse resp;
try {
resp = httpClient.execute(req, context);
} catch (Exception ex) {
logBackPressure();
log.error("encountered io exception", ex);
throw new HecException("encountered exception when post data", ex);
if (hecConfig.kerberosAuthEnabled()) {
if (config == null) {
defineKerberosConfigs();
}
Set<Principal> princ = new HashSet<Principal>(1);
princ.add(new KerberosPrincipal(hecConfig.kerberosUser()));
Subject sub = new Subject(false, princ, new HashSet<Object>(), new HashSet<Object>());
try {
LoginContext lc = new LoginContext("", sub, null, config);
lc.login();
Subject serviceSubject = lc.getSubject();
resp = Subject.doAs(serviceSubject, new PrivilegedAction<CloseableHttpResponse>() {
@Override
public CloseableHttpResponse run() {
try {
return httpClient.execute(req, context);
} catch (IOException ex) {
logBackPressure();
throw new HecException("Encountered exception while posting data.", ex);
}
}
});
} catch (Exception le) {
throw new HecException(
"Encountered exception while authenticating via Kerberos.", le);
}
} else {
try {
resp = httpClient.execute(req, context);
} catch (Exception ex) {
logBackPressure();
throw new HecException("encountered exception when post data", ex);
}
}

return readAndCloseResponse(resp);
}

private void defineKerberosConfigs() {
config = new Configuration() {
@SuppressWarnings("serial")
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return new AppConfigurationEntry[]{new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, new HashMap<String, Object>() {
{
put("useTicketCache", "false");
put("useKeyTab", "true");
put("keyTab", hecConfig.kerberosKeytabLocation());
//Krb5 in GSS API needs to be refreshed so it does not throw the error
//Specified version of key is not available
put("refreshKrb5Config", "true");
put("principal", hecConfig.kerberosPrincipal());
put("storeKey", "false");
put("doNotPrompt", "true");
put("isInitiator", "true");
put("debug", "true");
}
})};
}
};
}

private String readAndCloseResponse(CloseableHttpResponse resp) {
String respPayload;
HttpEntity entity = resp.getEntity();
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package com.splunk.kafka.connect;

import com.splunk.kafka.connect.VersionUtils;

import io.confluent.connect.utils.validators.all.ConfigValidation;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
Expand Down Expand Up @@ -66,4 +70,13 @@ public String version() {
public ConfigDef config() {
return SplunkSinkConnectorConfig.conf();
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
return new ConfigValidation(
config(),
connectorConfigs,
SplunkSinkConnectorConfig::validateKerberosConfigs
).validate();
}
}
Loading