diff --git a/bin/flume b/bin/flume
index de50bf71..d5573d60 100755
--- a/bin/flume
+++ b/bin/flume
@@ -89,7 +89,11 @@ if [ -z "$FLUME_HOME" ]; then
export FLUME_HOME="$CMDPATH"
fi
-JOPTS="-Dflume.log.dir=${FLUME_LOG_DIR:-${FLUME_HOME}/logs} "
+# Get any JOPTS that might exists from flume-env.sh
+if [ -z "$JOPTS" ]; then
+ export JOPTS=""
+fi
+JOPTS+="-Dflume.log.dir=${FLUME_LOG_DIR:-${FLUME_HOME}/logs} "
JOPTS+="-Dflume.log.file=${FLUME_LOGFILE:-flume.log} "
JOPTS+="-Dflume.root.logger=${FLUME_ROOT_LOGGER:-INFO,console} "
JOPTS+="-Dzookeeper.root.logger=${ZOOKEEPER_ROOT_LOGGER:-ERROR,console} "
diff --git a/bin/flume-env.sh.template b/bin/flume-env.sh.template
index da5db193..3deab0e8 100755
--- a/bin/flume-env.sh.template
+++ b/bin/flume-env.sh.template
@@ -19,4 +19,6 @@
# export JAVA_HOME=/usr/lib/jvm/java-6-sun
-# export FLUME_MASTER="localhost"
\ No newline at end of file
+# export FLUME_MASTER="localhost"
+
+# export JOPTS="-D..."
diff --git a/conf/flume-conf.xml b/conf/flume-conf.xml
index e4eb1b6d..1e6d54fb 100644
--- a/conf/flume-conf.xml
+++ b/conf/flume-conf.xml
@@ -260,5 +260,15 @@ configuration values placed in flume-site.xml. -->
/tmp/flume-zk
The base directory in which the ZBCS stores data.
+
+
+
+
+
+ flume.secure.transport
+ false
+ This turns on SSL security for data transfer
+
+
diff --git a/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java b/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java
index 25d6d955..4e44e7f8 100644
--- a/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java
+++ b/src/java/com/cloudera/flume/agent/ThriftMasterRPC.java
@@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Map.Entry;
+import javax.net.ssl.SSLSocketFactory;
+
import org.apache.log4j.Logger;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
@@ -78,20 +80,36 @@ public void open() throws IOException {
}
}
- protected synchronized FlumeClientServer.Iface open(String host, int port)
+ protected synchronized FlumeClientServer.Iface open(String host, int port, boolean secured)
throws IOException, TTransportException {
// single open only
Preconditions.checkState(masterClient == null, // && masterTransport ==
// null,
"client already open -- double open not allowed");
- TTransport masterTransport = new TSocket(host, port);
+ TTransport masterTransport = null;
+ if (secured) {
+ masterTransport = new TSocket(SSLSocketFactory.getDefault().
+ createSocket(host, port));
+ } else {
+ masterTransport = new TSocket(host, port);
+ }
TProtocol protocol = new TBinaryProtocol(masterTransport);
- masterTransport.open();
+
+ if(!masterTransport.isOpen()) {
+ masterTransport.open();
+ }
masterClient = new Client(protocol);
LOG.info("Connected to master at " + host + ":" + port);
return masterClient;
}
+ protected synchronized FlumeClientServer.Iface open(String host, int port)
+ throws IOException, TTransportException {
+ //get secured boolean flag from config
+ FlumeConfiguration conf = FlumeConfiguration.get();
+ boolean secured = conf.getIsSecureSSLTransport();
+ return open(host, port, secured);
+ }
protected synchronized FlumeClientServer.Iface ensureConnected()
throws TTransportException, IOException {
return (masterClient != null) ? masterClient : open(masterHostname,
diff --git a/src/java/com/cloudera/flume/conf/FlumeConfiguration.java b/src/java/com/cloudera/flume/conf/FlumeConfiguration.java
index abebe3e2..b4c208f9 100644
--- a/src/java/com/cloudera/flume/conf/FlumeConfiguration.java
+++ b/src/java/com/cloudera/flume/conf/FlumeConfiguration.java
@@ -103,6 +103,9 @@ protected FlumeConfiguration(boolean loadDefaults) {
// Default sink / source variables
static public final int DEFAULT_SCRIBE_SOURCE_PORT = 1463;
+
+ // Secure transport parameter
+ public static final String SECURED_SSL_TRANSPORT = "flume.secure.transport";
// Watch dog parameters
public final static String WATCHDOG_MAX_RESTARTS = "watchdog.restarts.max";
@@ -861,6 +864,13 @@ public long getThriftCloseMaxSleep() {
public int getScribeSourcePort() {
return getInt(SCRIBE_SOURCE_PORT, DEFAULT_SCRIBE_SOURCE_PORT);
}
+
+ /**
+ * Returns the boolean to secure data transport or not
+ */
+ public boolean getIsSecureSSLTransport() {
+ return getBoolean(SECURED_SSL_TRANSPORT, false);
+ }
/**
* Returns the default flow name for logical nodes.
diff --git a/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java b/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java
index aba93686..5842258b 100644
--- a/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java
+++ b/src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java
@@ -20,6 +20,10 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -89,23 +93,38 @@ public void close() throws IOException {
}
}
- @Override
- public void open() throws IOException {
+ public void open(boolean secured) throws IOException {
try {
- if (nonblocking) {
- // non blocking must use "Framed transport"
- transport = new TSocket(host, port);
- stats = new TStatsTransport(transport);
- transport = new TFramedTransport(stats);
+ // TODO: will "TFramedTransport" handle nonblocking SSLSockets correctly?
+ if(secured) {
+ if (nonblocking) {
+ // non blocking must use "Framed transport"
+ transport = new TSocket(SSLSocketFactory.getDefault().createSocket(host, port));
+ stats = new TStatsTransport(transport);
+ transport = new TFramedTransport(stats);
+ } else {
+ transport = new TSocket(SSLSocketFactory.getDefault().createSocket(host, port));
+ stats = new TStatsTransport(transport);
+ transport = stats;
+ }
} else {
- transport = new TSocket(host, port);
- stats = new TStatsTransport(transport);
- transport = stats;
+ if (nonblocking) {
+ // non blocking must use "Framed transport"
+ transport = new TSocket(host, port);
+ stats = new TStatsTransport(transport);
+ transport = new TFramedTransport(stats);
+ } else {
+ transport = new TSocket(host, port);
+ stats = new TStatsTransport(transport);
+ transport = stats;
+ }
}
TProtocol protocol = new TBinaryProtocol(transport);
- transport.open();
+ if (!transport.isOpen()) {
+ transport.open();
+ }
client = new Client(protocol);
LOG.info("ThriftEventSink open on port " + port + " opened");
@@ -114,6 +133,13 @@ public void open() throws IOException {
+ port + " : " + e.getMessage());
}
}
+
+ @Override
+ public void open() throws IOException {
+ FlumeConfiguration conf = FlumeConfiguration.get();
+ boolean secured = conf.getIsSecureSSLTransport();
+ open(secured);
+ }
@Override
public ReportEvent getReport() {
diff --git a/src/java/org/apache/thrift/transport/TSaneServerSocket.java b/src/java/org/apache/thrift/transport/TSaneServerSocket.java
index c08f3ffe..1ef6cda0 100644
--- a/src/java/org/apache/thrift/transport/TSaneServerSocket.java
+++ b/src/java/org/apache/thrift/transport/TSaneServerSocket.java
@@ -43,9 +43,14 @@
import java.net.Socket;
import java.net.SocketException;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.cloudera.flume.conf.FlumeConfiguration;
+
/**
* Wrapper around ServerSocket for Thrift.
*
@@ -95,20 +100,36 @@ public TSaneServerSocket(InetSocketAddress bindAddr, int clientTimeout)
this.bindAddr = bindAddr;
}
- private void bind() throws TTransportException {
+ private void bind(boolean secured) throws TTransportException {
try {
// Make server socket
serverSocket_ = new ServerSocket();
+
+ if(secured) {
+ // TODO: figure out a better backlog number
+ serverSocket_ = (SSLServerSocket) SSLServerSocketFactory.getDefault().createServerSocket(bindAddr.getPort(), 1024, bindAddr.getAddress());
+ } else {
+ serverSocket_ = new ServerSocket();
+ }
// Prevent 2MSL delay problem on server restarts
serverSocket_.setReuseAddress(true);
- // Bind to listening port
- serverSocket_.bind(bindAddr);
+ // Bind to listening port if not bound
+ if (!serverSocket_.isBound()) {
+ serverSocket_.bind(bindAddr);
+ }
} catch (IOException ioe) {
serverSocket_ = null;
throw new TTransportException("Could not create ServerSocket on address "
+ bindAddr.toString() + ".");
}
}
+
+ private void bind() throws TTransportException {
+ //get secured boolean flag from config
+ FlumeConfiguration conf = FlumeConfiguration.get();
+ boolean secured = conf.getIsSecureSSLTransport();
+ bind(secured);
+ }
public void listen() throws TTransportException {
bind();
diff --git a/src/javatest/com/cloudera/flume/master/TestThriftServer.java b/src/javatest/com/cloudera/flume/master/TestThriftServer.java
index bc5d716a..d81dcd8c 100644
--- a/src/javatest/com/cloudera/flume/master/TestThriftServer.java
+++ b/src/javatest/com/cloudera/flume/master/TestThriftServer.java
@@ -18,9 +18,13 @@
package com.cloudera.flume.master;
+import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+
import junit.framework.TestCase;
import org.apache.log4j.Logger;
@@ -31,6 +35,7 @@
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.thrift.FlumeConfigData;
import com.cloudera.flume.conf.thrift.FlumeMasterAdminServer;
import com.cloudera.flume.conf.thrift.FlumeMasterCommand;
@@ -81,12 +86,18 @@ public boolean hasCmdId(long cmdid) throws TException {
}
}
- public void testMasterAdminServer() throws TException, InterruptedException {
+ public void testMasterAdminServer(boolean secured) throws TException, InterruptedException, UnknownHostException, IOException {
MyThriftServer server = new MyThriftServer();
server.serve();
// Try connection
- TTransport masterTransport = new TSocket("localhost", 56789);
+ TTransport masterTransport;
+ if (secured) {
+ masterTransport = new TSocket(SSLSocketFactory.getDefault().
+ createSocket("localhost", 56789));
+ } else {
+ masterTransport = new TSocket("localhost", 56789);
+ }
TProtocol protocol = new TBinaryProtocol(masterTransport);
masterTransport.open();
Client client = new Client(protocol);
@@ -105,6 +116,12 @@ public void testMasterAdminServer() throws TException, InterruptedException {
server.stop();
}
+ public void testMasterAdminServer() throws TException, InterruptedException, UnknownHostException, IOException {
+ //get secured boolean flag from config
+ FlumeConfiguration conf = FlumeConfiguration.get();
+ boolean secured = conf.getIsSecureSSLTransport();
+ testMasterAdminServer(secured);
+ }
public void testThriftServerOpenClose() throws TTransportException {
MyThriftServer server = new MyThriftServer();
for (int i = 0; i < 50; i++) {