From e456e599a9a0c4cc394371c0461dde888a7f4b5e Mon Sep 17 00:00:00 2001 From: kimvogt Date: Thu, 19 Aug 2010 17:01:03 -0700 Subject: [PATCH] adding SSL sockets to flume for secure data transfer --- bin/flume | 6 ++- bin/flume-env.sh.template | 4 +- conf/flume-conf.xml | 10 ++++ .../cloudera/flume/agent/ThriftMasterRPC.java | 24 ++++++++-- .../flume/conf/FlumeConfiguration.java | 10 ++++ .../handlers/thrift/ThriftEventSink.java | 48 ++++++++++++++----- .../thrift/transport/TSaneServerSocket.java | 27 +++++++++-- .../flume/master/TestThriftServer.java | 21 +++++++- 8 files changed, 129 insertions(+), 21 deletions(-) diff --git a/bin/flume b/bin/flume index de50bf71..d5573d60 100755 --- a/bin/flume +++ b/bin/flume @@ -89,7 +89,11 @@ if [ -z "$FLUME_HOME" ]; then export FLUME_HOME="$CMDPATH" fi -JOPTS="-Dflume.log.dir=${FLUME_LOG_DIR:-${FLUME_HOME}/logs} " +# Get any JOPTS that might exists from flume-env.sh +if [ -z "$JOPTS" ]; then + export JOPTS="" +fi +JOPTS+="-Dflume.log.dir=${FLUME_LOG_DIR:-${FLUME_HOME}/logs} " JOPTS+="-Dflume.log.file=${FLUME_LOGFILE:-flume.log} " JOPTS+="-Dflume.root.logger=${FLUME_ROOT_LOGGER:-INFO,console} " JOPTS+="-Dzookeeper.root.logger=${ZOOKEEPER_ROOT_LOGGER:-ERROR,console} " diff --git a/bin/flume-env.sh.template b/bin/flume-env.sh.template index da5db193..3deab0e8 100755 --- a/bin/flume-env.sh.template +++ b/bin/flume-env.sh.template @@ -19,4 +19,6 @@ # export JAVA_HOME=/usr/lib/jvm/java-6-sun -# export FLUME_MASTER="localhost" \ No newline at end of file +# export FLUME_MASTER="localhost" + +# export JOPTS="-D..." diff --git a/conf/flume-conf.xml b/conf/flume-conf.xml index e4eb1b6d..1e6d54fb 100644 --- a/conf/flume-conf.xml +++ b/conf/flume-conf.xml @@ -260,5 +260,15 @@ configuration values placed in flume-site.xml. --> /tmp/flume-zk The base directory in which the ZBCS stores data. + + + + + + flume.secure.transport + false + This turns on SSL security for data transfer + + diff --git a/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java b/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java index 25d6d955..4e44e7f8 100644 --- a/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java +++ b/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Map.Entry; +import javax.net.ssl.SSLSocketFactory; + import org.apache.log4j.Logger; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; @@ -78,20 +80,36 @@ public void open() throws IOException { } } - protected synchronized FlumeClientServer.Iface open(String host, int port) + protected synchronized FlumeClientServer.Iface open(String host, int port, boolean secured) throws IOException, TTransportException { // single open only Preconditions.checkState(masterClient == null, // && masterTransport == // null, "client already open -- double open not allowed"); - TTransport masterTransport = new TSocket(host, port); + TTransport masterTransport = null; + if (secured) { + masterTransport = new TSocket(SSLSocketFactory.getDefault(). + createSocket(host, port)); + } else { + masterTransport = new TSocket(host, port); + } TProtocol protocol = new TBinaryProtocol(masterTransport); - masterTransport.open(); + + if(!masterTransport.isOpen()) { + masterTransport.open(); + } masterClient = new Client(protocol); LOG.info("Connected to master at " + host + ":" + port); return masterClient; } + protected synchronized FlumeClientServer.Iface open(String host, int port) + throws IOException, TTransportException { + //get secured boolean flag from config + FlumeConfiguration conf = FlumeConfiguration.get(); + boolean secured = conf.getIsSecureSSLTransport(); + return open(host, port, secured); + } protected synchronized FlumeClientServer.Iface ensureConnected() throws TTransportException, IOException { return (masterClient != null) ? masterClient : open(masterHostname, diff --git a/src/java/com/cloudera/flume/conf/FlumeConfiguration.java b/src/java/com/cloudera/flume/conf/FlumeConfiguration.java index abebe3e2..b4c208f9 100644 --- a/src/java/com/cloudera/flume/conf/FlumeConfiguration.java +++ b/src/java/com/cloudera/flume/conf/FlumeConfiguration.java @@ -103,6 +103,9 @@ protected FlumeConfiguration(boolean loadDefaults) { // Default sink / source variables static public final int DEFAULT_SCRIBE_SOURCE_PORT = 1463; + + // Secure transport parameter + public static final String SECURED_SSL_TRANSPORT = "flume.secure.transport"; // Watch dog parameters public final static String WATCHDOG_MAX_RESTARTS = "watchdog.restarts.max"; @@ -861,6 +864,13 @@ public long getThriftCloseMaxSleep() { public int getScribeSourcePort() { return getInt(SCRIBE_SOURCE_PORT, DEFAULT_SCRIBE_SOURCE_PORT); } + + /** + * Returns the boolean to secure data transport or not + */ + public boolean getIsSecureSSLTransport() { + return getBoolean(SECURED_SSL_TRANSPORT, false); + } /** * Returns the default flow name for logical nodes. diff --git a/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java b/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java index aba93686..5842258b 100644 --- a/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java +++ b/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java @@ -20,6 +20,10 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSocketFactory; + import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -89,23 +93,38 @@ public void close() throws IOException { } } - @Override - public void open() throws IOException { + public void open(boolean secured) throws IOException { try { - if (nonblocking) { - // non blocking must use "Framed transport" - transport = new TSocket(host, port); - stats = new TStatsTransport(transport); - transport = new TFramedTransport(stats); + // TODO: will "TFramedTransport" handle nonblocking SSLSockets correctly? + if(secured) { + if (nonblocking) { + // non blocking must use "Framed transport" + transport = new TSocket(SSLSocketFactory.getDefault().createSocket(host, port)); + stats = new TStatsTransport(transport); + transport = new TFramedTransport(stats); + } else { + transport = new TSocket(SSLSocketFactory.getDefault().createSocket(host, port)); + stats = new TStatsTransport(transport); + transport = stats; + } } else { - transport = new TSocket(host, port); - stats = new TStatsTransport(transport); - transport = stats; + if (nonblocking) { + // non blocking must use "Framed transport" + transport = new TSocket(host, port); + stats = new TStatsTransport(transport); + transport = new TFramedTransport(stats); + } else { + transport = new TSocket(host, port); + stats = new TStatsTransport(transport); + transport = stats; + } } TProtocol protocol = new TBinaryProtocol(transport); - transport.open(); + if (!transport.isOpen()) { + transport.open(); + } client = new Client(protocol); LOG.info("ThriftEventSink open on port " + port + " opened"); @@ -114,6 +133,13 @@ public void open() throws IOException { + port + " : " + e.getMessage()); } } + + @Override + public void open() throws IOException { + FlumeConfiguration conf = FlumeConfiguration.get(); + boolean secured = conf.getIsSecureSSLTransport(); + open(secured); + } @Override public ReportEvent getReport() { diff --git a/src/java/org/apache/thrift/transport/TSaneServerSocket.java b/src/java/org/apache/thrift/transport/TSaneServerSocket.java index c08f3ffe..1ef6cda0 100644 --- a/src/java/org/apache/thrift/transport/TSaneServerSocket.java +++ b/src/java/org/apache/thrift/transport/TSaneServerSocket.java @@ -43,9 +43,14 @@ import java.net.Socket; import java.net.SocketException; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.cloudera.flume.conf.FlumeConfiguration; + /** * Wrapper around ServerSocket for Thrift. * @@ -95,20 +100,36 @@ public TSaneServerSocket(InetSocketAddress bindAddr, int clientTimeout) this.bindAddr = bindAddr; } - private void bind() throws TTransportException { + private void bind(boolean secured) throws TTransportException { try { // Make server socket serverSocket_ = new ServerSocket(); + + if(secured) { + // TODO: figure out a better backlog number + serverSocket_ = (SSLServerSocket) SSLServerSocketFactory.getDefault().createServerSocket(bindAddr.getPort(), 1024, bindAddr.getAddress()); + } else { + serverSocket_ = new ServerSocket(); + } // Prevent 2MSL delay problem on server restarts serverSocket_.setReuseAddress(true); - // Bind to listening port - serverSocket_.bind(bindAddr); + // Bind to listening port if not bound + if (!serverSocket_.isBound()) { + serverSocket_.bind(bindAddr); + } } catch (IOException ioe) { serverSocket_ = null; throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); } } + + private void bind() throws TTransportException { + //get secured boolean flag from config + FlumeConfiguration conf = FlumeConfiguration.get(); + boolean secured = conf.getIsSecureSSLTransport(); + bind(secured); + } public void listen() throws TTransportException { bind(); diff --git a/src/javatest/com/cloudera/flume/master/TestThriftServer.java b/src/javatest/com/cloudera/flume/master/TestThriftServer.java index bc5d716a..d81dcd8c 100644 --- a/src/javatest/com/cloudera/flume/master/TestThriftServer.java +++ b/src/javatest/com/cloudera/flume/master/TestThriftServer.java @@ -18,9 +18,13 @@ package com.cloudera.flume.master; +import java.io.IOException; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import javax.net.ssl.SSLSocketFactory; + import junit.framework.TestCase; import org.apache.log4j.Logger; @@ -31,6 +35,7 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import com.cloudera.flume.conf.FlumeConfiguration; import com.cloudera.flume.conf.thrift.FlumeConfigData; import com.cloudera.flume.conf.thrift.FlumeMasterAdminServer; import com.cloudera.flume.conf.thrift.FlumeMasterCommand; @@ -81,12 +86,18 @@ public boolean hasCmdId(long cmdid) throws TException { } } - public void testMasterAdminServer() throws TException, InterruptedException { + public void testMasterAdminServer(boolean secured) throws TException, InterruptedException, UnknownHostException, IOException { MyThriftServer server = new MyThriftServer(); server.serve(); // Try connection - TTransport masterTransport = new TSocket("localhost", 56789); + TTransport masterTransport; + if (secured) { + masterTransport = new TSocket(SSLSocketFactory.getDefault(). + createSocket("localhost", 56789)); + } else { + masterTransport = new TSocket("localhost", 56789); + } TProtocol protocol = new TBinaryProtocol(masterTransport); masterTransport.open(); Client client = new Client(protocol); @@ -105,6 +116,12 @@ public void testMasterAdminServer() throws TException, InterruptedException { server.stop(); } + public void testMasterAdminServer() throws TException, InterruptedException, UnknownHostException, IOException { + //get secured boolean flag from config + FlumeConfiguration conf = FlumeConfiguration.get(); + boolean secured = conf.getIsSecureSSLTransport(); + testMasterAdminServer(secured); + } public void testThriftServerOpenClose() throws TTransportException { MyThriftServer server = new MyThriftServer(); for (int i = 0; i < 50; i++) {