Skip to content

Commit

Permalink
FLUME-2668. Document SecureThriftRpcClient/SecureRpcClientFactory in …
Browse files Browse the repository at this point in the history
…Flume Developer Guide

(Johny Rufus via Hari)
  • Loading branch information
harishreedharan committed Apr 14, 2015
1 parent be4ae29 commit a508d95
Showing 1 changed file with 133 additions and 14 deletions.
147 changes: 133 additions & 14 deletions flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
Expand Up @@ -277,6 +277,116 @@ properties:
request-timeout = 20000 # Must be >=1000 (default: 20000)
Secure RPC client - Thrift
''''''''''''''''''''''''''

As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication.
The client needs to use the getThriftInstance method of ``SecureRpcClientFactory``
to get hold of a ``SecureThriftRpcClient``. ``SecureThriftRpcClient`` extends
``ThriftRpcClient`` which implements the ``RpcClient`` interface. The kerberos
authentication module resides in flume-ng-auth module which is
required in classpath, when using the ``SecureRpcClientFactory``. Both the client
principal and the client keytab should be passed in as parameters through the
properties and they reflect the credentials of the client to authenticate
against the kerberos KDC. In addition, the server principal of the destination
Thrift source to which this client is connecting to, should also be provided.
The following example shows how to use the ``SecureRpcClientFactory``
within a user's data-generating application:

.. code-block:: java
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;
public class MyApp {
public static void main(String[] args) {
MySecureRpcClientFacade client = new MySecureRpcClientFacade();
// Initialize client with the remote Flume agent's host, port
Properties props = new Properties();
props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
props.setProperty("hosts", "h1");
props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));
// Initialize client with the kerberos authentication related properties
props.setProperty("kerberos", "true");
props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
client.init(props);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MySecureRpcClientFacade {
private RpcClient client;
private Properties properties;
public void init(Properties properties) {
// Setup the RPC connection
this.properties = properties;
// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
this.client = SecureRpcClientFactory.getThriftInstance(properties);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = SecureRpcClientFactory.getThriftInstance(properties);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
The remote ``ThriftSource`` should be started in kerberos mode.
Below is an example Flume agent configuration that's waiting for a connection
from MyApp:

.. code-block:: properties
a1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.kerberos = true
a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
a1.sources.r1.agent-keytab = /tmp/flume.keytab
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
Failover Client
'''''''''''''''

Expand Down Expand Up @@ -459,20 +569,29 @@ full Agent. The following is an exhaustive list of configration options:

Required properties are in **bold**.

==================== ================ ==============================================
Property Name Default Description
==================== ================ ==============================================
source.type embedded The only available source is the embedded source.
**channel.type** -- Either ``memory`` or ``file`` which correspond to MemoryChannel and FileChannel respectively.
channel.* -- Configuration options for the channel type requested, see MemoryChannel or FileChannel user guide for an exhaustive list.
**sinks** -- List of sink names
**sink.type** -- Property name must match a name in the list of sinks. Value must be ``avro``
sink.* -- Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port.
**processor.type** -- Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
processor.* -- Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list.
source.interceptors -- Space-separated list of interceptors
source.interceptors.* -- Configuration options for individual interceptors specified in the source.interceptors property
==================== ================ ==============================================
===================== ================ ======================================================================
Property Name Default Description
===================== ================ ======================================================================
source.type embedded The only available source is the embedded source.
**channel.type** -- Either ``memory`` or ``file`` which correspond
to MemoryChannel and FileChannel respectively.
channel.* -- Configuration options for the channel type requested,
see MemoryChannel or FileChannel user guide for an exhaustive list.
**sinks** -- List of sink names
**sink.type** -- Property name must match a name in the list of sinks.
Value must be ``avro``
sink.* -- Configuration options for the sink.
See AvroSink user guide for an exhaustive list,
however note AvroSink requires at least hostname and port.
**processor.type** -- Either ``failover`` or ``load_balance`` which correspond
to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
processor.* -- Configuration options for the sink processor selected.
See FailoverSinksProcessor and LoadBalancingSinkProcessor
user guide for an exhaustive list.
source.interceptors -- Space-separated list of interceptors
source.interceptors.* -- Configuration options for individual interceptors
specified in the source.interceptors property
===================== ================ ======================================================================

Below is an example of how to use the agent:

Expand Down

0 comments on commit a508d95

Please sign in to comment.