Skip to content

Commit

Permalink
FLUME-2109. HTTPS support in HTTP Source.
Browse files Browse the repository at this point in the history
(Ashish Paliwal via Hari Shreedharan)
  • Loading branch information
harishreedharan committed Aug 2, 2013
1 parent 5b5470b commit e256610
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 10 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.bio.SocketConnector;
import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,16 +89,46 @@ public class HTTPSource extends AbstractSource implements
private HTTPSourceHandler handler;
private SourceCounter sourceCounter;

// SSL configuration variable
private volatile Integer sslPort;
private volatile String keyStorePath;
private volatile String keyStorePassword;
private volatile Boolean sslEnabled;


@Override
public void configure(Context context) {
try {
// SSL related config
sslEnabled = context.getBoolean(HTTPSourceConfigurationConstants.SSL_ENABLED, false);

port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND,
HTTPSourceConfigurationConstants.DEFAULT_BIND);
checkHostAndPort();

Preconditions.checkState(host != null && !host.isEmpty(),
"HTTPSource hostname specified is empty");
// verify port only if its not ssl
if(!sslEnabled) {
Preconditions.checkNotNull(port, "HTTPSource requires a port number to be"
+ " specified");
}

String handlerClassName = context.getString(
HTTPSourceConfigurationConstants.CONFIG_HANDLER,
HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim();

if(sslEnabled) {
LOG.debug("SSL configuration enabled");
sslPort = context.getInteger(HTTPSourceConfigurationConstants.SSL_PORT);
Preconditions.checkArgument(sslPort != null && sslPort > 0, "SSL Port cannot be null or less than 0" );
keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE);
Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(),
"Keystore is required for SSL Conifguration" );
keyStorePassword = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);
Preconditions.checkArgument(keyStorePassword != null, "Keystore password is required for SSL Configuration");
}

@SuppressWarnings("unchecked")
Class<? extends HTTPSourceHandler> clazz =
(Class<? extends HTTPSourceHandler>)
Expand Down Expand Up @@ -139,10 +170,25 @@ public void start() {
+ " before I started one."
+ "Will not attempt to start.");
srv = new Server();
SocketConnector connector = new SocketConnector();
connector.setPort(port);
connector.setHost(host);
srv.setConnectors(new Connector[] { connector });

// Connector Array
Connector[] connectors = new Connector[1];


if(sslEnabled) {
SslSocketConnector sslSocketConnector = new SslSocketConnector();
sslSocketConnector.setKeystore(keyStorePath);
sslSocketConnector.setKeyPassword(keyStorePassword);
sslSocketConnector.setPort(sslPort);
connectors[0] = sslSocketConnector;
} else {
SocketConnector connector = new SocketConnector();
connector.setPort(port);
connector.setHost(host);
connectors[0] = connector;
}

srv.setConnectors(connectors);
try {
org.mortbay.jetty.servlet.Context root =
new org.mortbay.jetty.servlet.Context(
Expand Down
Expand Up @@ -34,4 +34,9 @@ public class HTTPSourceConfigurationConstants {
public static final String DEFAULT_HANDLER =
"org.apache.flume.source.http.JSONHandler";

public static final String SSL_PORT = "sslPort";
public static final String SSL_KEYSTORE = "keystore";
public static final String SSL_KEYSTORE_PASSWORD = "keystorePassword";
public static final String SSL_ENABLED = "enableSSL";

}
Expand Up @@ -22,29 +22,30 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import junit.framework.Assert;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.*;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.JSONEvent;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import javax.net.ssl.*;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.ServerSocket;
import java.net.URL;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -58,8 +59,12 @@
public class TestHTTPSource {

private static HTTPSource source;
private static HTTPSource httpsSource;
// private static Channel httpsChannel;

private static Channel channel;
private static int selectedPort;
private static int sslPort;
DefaultHttpClient httpClient;
HttpPost postRequest;

Expand All @@ -77,9 +82,13 @@ public static void setUpClass() throws Exception {
source = new HTTPSource();
channel = new MemoryChannel();

httpsSource = new HTTPSource();
// httpsChannel = new MemoryChannel();

Context ctx = new Context();
ctx.put("capacity", "100");
Configurables.configure(channel, ctx);
// Configurables.configure(httpsChannel, ctx);

List<Channel> channels = new ArrayList<Channel>(1);
channels.add(channel);
Expand All @@ -90,19 +99,43 @@ public static void setUpClass() throws Exception {
source.setChannelProcessor(new ChannelProcessor(rcs));

channel.start();

// Channel for HTTPS source
// List<Channel> sslChannels = new ArrayList<Channel>(1);
// channels.add(httpsChannel);
//
// ChannelSelector sslRcs = new ReplicatingChannelSelector();
// rcs.setChannels(sslChannels);

httpsSource.setChannelProcessor(new ChannelProcessor(rcs));
// httpsChannel.start();

// HTTP context
Context context = new Context();

context.put("port", String.valueOf(selectedPort));
context.put("host", "0.0.0.0");

// SSL context props
Context sslContext = new Context();
sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
sslPort = findFreePort();
sslContext.put(HTTPSourceConfigurationConstants.SSL_PORT, String.valueOf(sslPort));
sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password");
sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, "src/test/resources/jettykeystore");

Configurables.configure(source, context);
Configurables.configure(httpsSource, sslContext);
source.start();
httpsSource.start();
}

@AfterClass
public static void tearDownClass() throws Exception {
source.stop();
channel.stop();
httpsSource.stop();
// httpsChannel.stop();
}

@Before
Expand Down Expand Up @@ -268,6 +301,73 @@ private ResultWrapper putWithEncoding(String encoding, int n)
return new ResultWrapper(resp, events);
}

@Test
public void testHttps() throws Exception {
Type listType = new TypeToken<List<JSONEvent>>() {
}.getType();
List<JSONEvent> events = Lists.newArrayList();
Random rand = new Random();
for (int i = 0; i < 10; i++) {
Map<String, String> input = Maps.newHashMap();
for (int j = 0; j < 10; j++) {
input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i));
}
JSONEvent e = new JSONEvent();
e.setHeaders(input);
e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8"));
events.add(e);
}
Gson gson = new Gson();
String json = gson.toJson(events, listType);
HttpsURLConnection httpsURLConnection = null;
try {
TrustManager[] trustAllCerts = {new X509TrustManager() {
@Override
public void checkClientTrusted(
java.security.cert.X509Certificate[] x509Certificates, String s)
throws CertificateException {
// noop
}

@Override
public void checkServerTrusted(
java.security.cert.X509Certificate[] x509Certificates, String s)
throws CertificateException {
// noop
}

public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
}};
SSLContext sc = SSLContext.getInstance("SSL");

HostnameVerifier hv = new HostnameVerifier() {
public boolean verify(String arg0, SSLSession arg1) {
return true;
}
};
sc.init(null, trustAllCerts, new SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
HttpsURLConnection.setDefaultHostnameVerifier(
SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
URL sslUrl = new URL("https://0.0.0.0:" + sslPort);
httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection();
httpsURLConnection.setDoInput(true);
httpsURLConnection.setDoOutput(true);
httpsURLConnection.setRequestMethod("POST");
httpsURLConnection.getOutputStream().write(json.getBytes());

int statusCode = httpsURLConnection.getResponseCode();
Assert.assertEquals(200, statusCode);
} catch (Exception exception) {
Assert.fail("Exception not expected");
exception.printStackTrace();
} finally {
httpsURLConnection.disconnect();
}
}

private void takeWithEncoding(String encoding, int n, List<JSONEvent> events)
throws Exception{
Transaction tx = channel.getTransaction();
Expand Down
Binary file added flume-ng-core/src/test/resources/jettykeystore
Binary file not shown.
4 changes: 4 additions & 0 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Expand Up @@ -1223,6 +1223,10 @@ selector.type replicating replicating or mul
selector.* Depends on the selector.type value
interceptors -- Space-separated list of interceptors
interceptors.*
enableSSL false Set the property true, to enable SSL
sslPort The port to be used for SSL
keystore Location of the keystore includng keystore file name
keystorePassword Keystore password
==================================================================================================================================

For example, a http source for agent named a1:
Expand Down

0 comments on commit e256610

Please sign in to comment.