Skip to content

Commit

Permalink
adding SSL sockets to flume for secure data transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
kimvogt committed Aug 20, 2010
1 parent 6802fc8 commit e456e59
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 21 deletions.
6 changes: 5 additions & 1 deletion bin/flume
Expand Up @@ -89,7 +89,11 @@ if [ -z "$FLUME_HOME" ]; then
export FLUME_HOME="$CMDPATH"
fi

JOPTS="-Dflume.log.dir=${FLUME_LOG_DIR:-${FLUME_HOME}/logs} "
# Get any JOPTS that might exists from flume-env.sh
if [ -z "$JOPTS" ]; then
export JOPTS=""
fi
JOPTS+="-Dflume.log.dir=${FLUME_LOG_DIR:-${FLUME_HOME}/logs} "
JOPTS+="-Dflume.log.file=${FLUME_LOGFILE:-flume.log} "
JOPTS+="-Dflume.root.logger=${FLUME_ROOT_LOGGER:-INFO,console} "
JOPTS+="-Dzookeeper.root.logger=${ZOOKEEPER_ROOT_LOGGER:-ERROR,console} "
Expand Down
4 changes: 3 additions & 1 deletion bin/flume-env.sh.template
Expand Up @@ -19,4 +19,6 @@

# export JAVA_HOME=/usr/lib/jvm/java-6-sun

# export FLUME_MASTER="localhost"
# export FLUME_MASTER="localhost"

# export JOPTS="-D..."
10 changes: 10 additions & 0 deletions conf/flume-conf.xml
Expand Up @@ -260,5 +260,15 @@ configuration values placed in flume-site.xml. -->
<value>/tmp/flume-zk</value>
<description>The base directory in which the ZBCS stores data.</description>
</property>

<!-- ================================================== -->
<!-- Secure transport flag ============================ -->
<!-- ================================================== -->
<property>
<name>flume.secure.transport</name>
<value>false</value>
<description>This turns on SSL security for data transfer</description>
</property>


</configuration>
24 changes: 21 additions & 3 deletions src/java/com/cloudera/flume/agent/ThriftMasterRPC.java
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Map.Entry;

import javax.net.ssl.SSLSocketFactory;

import org.apache.log4j.Logger;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -78,20 +80,36 @@ public void open() throws IOException {
}
}

protected synchronized FlumeClientServer.Iface open(String host, int port)
protected synchronized FlumeClientServer.Iface open(String host, int port, boolean secured)
throws IOException, TTransportException {
// single open only
Preconditions.checkState(masterClient == null, // && masterTransport ==
// null,
"client already open -- double open not allowed");
TTransport masterTransport = new TSocket(host, port);
TTransport masterTransport = null;
if (secured) {
masterTransport = new TSocket(SSLSocketFactory.getDefault().
createSocket(host, port));
} else {
masterTransport = new TSocket(host, port);
}
TProtocol protocol = new TBinaryProtocol(masterTransport);
masterTransport.open();

if(!masterTransport.isOpen()) {
masterTransport.open();
}
masterClient = new Client(protocol);
LOG.info("Connected to master at " + host + ":" + port);
return masterClient;
}

protected synchronized FlumeClientServer.Iface open(String host, int port)
throws IOException, TTransportException {
//get secured boolean flag from config
FlumeConfiguration conf = FlumeConfiguration.get();
boolean secured = conf.getIsSecureSSLTransport();
return open(host, port, secured);
}
protected synchronized FlumeClientServer.Iface ensureConnected()
throws TTransportException, IOException {
return (masterClient != null) ? masterClient : open(masterHostname,
Expand Down
10 changes: 10 additions & 0 deletions src/java/com/cloudera/flume/conf/FlumeConfiguration.java
Expand Up @@ -103,6 +103,9 @@ protected FlumeConfiguration(boolean loadDefaults) {

// Default sink / source variables
static public final int DEFAULT_SCRIBE_SOURCE_PORT = 1463;

// Secure transport parameter
public static final String SECURED_SSL_TRANSPORT = "flume.secure.transport";

// Watch dog parameters
public final static String WATCHDOG_MAX_RESTARTS = "watchdog.restarts.max";
Expand Down Expand Up @@ -861,6 +864,13 @@ public long getThriftCloseMaxSleep() {
public int getScribeSourcePort() {
return getInt(SCRIBE_SOURCE_PORT, DEFAULT_SCRIBE_SOURCE_PORT);
}

/**
* Returns the boolean to secure data transport or not
*/
public boolean getIsSecureSSLTransport() {
return getBoolean(SECURED_SSL_TRANSPORT, false);
}

/**
* Returns the default flow name for logical nodes.
Expand Down
48 changes: 37 additions & 11 deletions src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java
Expand Up @@ -20,6 +20,10 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;

import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand Down Expand Up @@ -89,23 +93,38 @@ public void close() throws IOException {
}
}

@Override
public void open() throws IOException {
public void open(boolean secured) throws IOException {

try {
if (nonblocking) {
// non blocking must use "Framed transport"
transport = new TSocket(host, port);
stats = new TStatsTransport(transport);
transport = new TFramedTransport(stats);
// TODO: will "TFramedTransport" handle nonblocking SSLSockets correctly?
if(secured) {
if (nonblocking) {
// non blocking must use "Framed transport"
transport = new TSocket(SSLSocketFactory.getDefault().createSocket(host, port));
stats = new TStatsTransport(transport);
transport = new TFramedTransport(stats);
} else {
transport = new TSocket(SSLSocketFactory.getDefault().createSocket(host, port));
stats = new TStatsTransport(transport);
transport = stats;
}
} else {
transport = new TSocket(host, port);
stats = new TStatsTransport(transport);
transport = stats;
if (nonblocking) {
// non blocking must use "Framed transport"
transport = new TSocket(host, port);
stats = new TStatsTransport(transport);
transport = new TFramedTransport(stats);
} else {
transport = new TSocket(host, port);
stats = new TStatsTransport(transport);
transport = stats;
}
}

TProtocol protocol = new TBinaryProtocol(transport);
transport.open();
if (!transport.isOpen()) {
transport.open();
}
client = new Client(protocol);
LOG.info("ThriftEventSink open on port " + port + " opened");

Expand All @@ -114,6 +133,13 @@ public void open() throws IOException {
+ port + " : " + e.getMessage());
}
}

@Override
public void open() throws IOException {
FlumeConfiguration conf = FlumeConfiguration.get();
boolean secured = conf.getIsSecureSSLTransport();
open(secured);
}

@Override
public ReportEvent getReport() {
Expand Down
27 changes: 24 additions & 3 deletions src/java/org/apache/thrift/transport/TSaneServerSocket.java
Expand Up @@ -43,9 +43,14 @@
import java.net.Socket;
import java.net.SocketException;

import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.flume.conf.FlumeConfiguration;

/**
* Wrapper around ServerSocket for Thrift.
*
Expand Down Expand Up @@ -95,20 +100,36 @@ public TSaneServerSocket(InetSocketAddress bindAddr, int clientTimeout)
this.bindAddr = bindAddr;
}

private void bind() throws TTransportException {
private void bind(boolean secured) throws TTransportException {
try {
// Make server socket
serverSocket_ = new ServerSocket();

if(secured) {
// TODO: figure out a better backlog number
serverSocket_ = (SSLServerSocket) SSLServerSocketFactory.getDefault().createServerSocket(bindAddr.getPort(), 1024, bindAddr.getAddress());
} else {
serverSocket_ = new ServerSocket();
}
// Prevent 2MSL delay problem on server restarts
serverSocket_.setReuseAddress(true);
// Bind to listening port
serverSocket_.bind(bindAddr);
// Bind to listening port if not bound
if (!serverSocket_.isBound()) {
serverSocket_.bind(bindAddr);
}
} catch (IOException ioe) {
serverSocket_ = null;
throw new TTransportException("Could not create ServerSocket on address "
+ bindAddr.toString() + ".");
}
}

private void bind() throws TTransportException {
//get secured boolean flag from config
FlumeConfiguration conf = FlumeConfiguration.get();
boolean secured = conf.getIsSecureSSLTransport();
bind(secured);
}

public void listen() throws TTransportException {
bind();
Expand Down
21 changes: 19 additions & 2 deletions src/javatest/com/cloudera/flume/master/TestThriftServer.java
Expand Up @@ -18,9 +18,13 @@

package com.cloudera.flume.master;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;

import javax.net.ssl.SSLSocketFactory;

import junit.framework.TestCase;

import org.apache.log4j.Logger;
Expand All @@ -31,6 +35,7 @@
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.thrift.FlumeConfigData;
import com.cloudera.flume.conf.thrift.FlumeMasterAdminServer;
import com.cloudera.flume.conf.thrift.FlumeMasterCommand;
Expand Down Expand Up @@ -81,12 +86,18 @@ public boolean hasCmdId(long cmdid) throws TException {
}
}

public void testMasterAdminServer() throws TException, InterruptedException {
public void testMasterAdminServer(boolean secured) throws TException, InterruptedException, UnknownHostException, IOException {
MyThriftServer server = new MyThriftServer();
server.serve();

// Try connection
TTransport masterTransport = new TSocket("localhost", 56789);
TTransport masterTransport;
if (secured) {
masterTransport = new TSocket(SSLSocketFactory.getDefault().
createSocket("localhost", 56789));
} else {
masterTransport = new TSocket("localhost", 56789);
}
TProtocol protocol = new TBinaryProtocol(masterTransport);
masterTransport.open();
Client client = new Client(protocol);
Expand All @@ -105,6 +116,12 @@ public void testMasterAdminServer() throws TException, InterruptedException {
server.stop();
}

public void testMasterAdminServer() throws TException, InterruptedException, UnknownHostException, IOException {
//get secured boolean flag from config
FlumeConfiguration conf = FlumeConfiguration.get();
boolean secured = conf.getIsSecureSSLTransport();
testMasterAdminServer(secured);
}
public void testThriftServerOpenClose() throws TTransportException {
MyThriftServer server = new MyThriftServer();
for (int i = 0; i < 50; i++) {
Expand Down

0 comments on commit e456e59

Please sign in to comment.