Skip to content

Commit

Permalink
FLUME-2631. End to End authentication in Flume
Browse files Browse the repository at this point in the history
(Johny Rufus via Hari)
  • Loading branch information
harishreedharan committed Mar 6, 2015
1 parent 3d03053 commit 542b169
Show file tree
Hide file tree
Showing 29 changed files with 1,087 additions and 672 deletions.
88 changes: 88 additions & 0 deletions flume-ng-auth/pom.xml
@@ -0,0 +1,88 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>flume-parent</artifactId>
<groupId>org.apache.flume</groupId>
<version>1.6.0-SNAPSHOT</version>
</parent>

<artifactId>flume-ng-auth</artifactId>
<name>Flume Auth</name>
<description>Flume Authentication</description>

<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.3.7</version>
<inherited>true</inherited>
<extensions>true</extensions>
</plugin>
</plugins>
</build>

<dependencies>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>${hadoop.common.artifact.id}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop2.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

</dependencies>
</project>
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.api;

import java.util.Properties;

/**
* Factory class to construct Flume {@link RPCClient} implementations.
*/
public class SecureRpcClientFactory {

/**
* Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating with
* the next hop.
* @param props
* @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift configured with the
* given parameters.
*/
public static RpcClient getThriftInstance(Properties props) {
ThriftRpcClient client = new SecureThriftRpcClient();
client.configure(props);
return client;
}
}
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.api;

import org.apache.flume.FlumeException;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.FlumeAuthenticator;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.thrift.transport.*;

import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.Sasl;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class SecureThriftRpcClient extends ThriftRpcClient {

private static final String CLIENT_PRINCIPAL = "client-principal";
private static final String CLIENT_KEYTAB = "client-keytab";
private static final String SERVER_PRINCIPAL = "server-principal";

private String serverPrincipal;
private FlumeAuthenticator privilegedExecutor;

@Override
protected void configure(Properties properties) throws FlumeException {
super.configure(properties);
serverPrincipal = properties.getProperty(SERVER_PRINCIPAL);
if (serverPrincipal == null || serverPrincipal.isEmpty()) {
throw new IllegalArgumentException("Flume in secure mode, but Flume config doesn't "
+ "specify a server principal to use for Kerberos auth.");
}
String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL);
String keytab = properties.getProperty(CLIENT_KEYTAB);
this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab);
if(!privilegedExecutor.isAuthenticated()) {
throw new FlumeException("Authentication failed in Kerberos mode for " +
"principal " + clientPrincipal + " keytab " + keytab);
}
}

@Override
protected TTransport getTransport(TSocket tsocket) throws Exception {
Map<String, String> saslProperties = new HashMap<String, String>();
saslProperties.put(Sasl.QOP, "auth");
String[] names;
try {
names = FlumeAuthenticationUtil.splitKerberosName(serverPrincipal);
} catch (IOException e) {
throw new FlumeException(
"Error while trying to resolve Principal name - " + serverPrincipal, e);
}
return new UgiSaslClientTransport(
"GSSAPI", null, names[0], names[1], saslProperties, null, tsocket, privilegedExecutor);
}

/**
* This transport wraps the Sasl transports to set up the right UGI context for open().
*/
public static class UgiSaslClientTransport extends TSaslClientTransport {
PrivilegedExecutor privilegedExecutor;
public UgiSaslClientTransport(String mechanism, String authorizationId,
String protocol, String serverName, Map<String, String> props,
CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException {
super(mechanism, authorizationId, protocol, serverName, props, cbh,
transport);
this.privilegedExecutor = privilegedExecutor;
}

// open the SASL transport with using the current UserGroupInformation
// This is needed to get the current login context stored
@Override
public void open() throws FlumeException {
try {
this.privilegedExecutor.execute(
new PrivilegedExceptionAction<Void>() {
public Void run() throws FlumeException {
try {
UgiSaslClientTransport.super.open();
} catch (TTransportException e) {
throw new FlumeException("Failed to open SASL transport", e);
}
return null;
}
});
} catch (InterruptedException e) {
throw new FlumeException(
"Interrupted while opening underlying transport", e);
} catch (Exception e) {
throw new FlumeException("Failed to open SASL transport", e);
}
}
}
}
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.auth;

import com.google.common.base.Preconditions;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;

import javax.security.auth.callback.CallbackHandler;
import java.io.IOException;

/**
* FlumeAuthentication utility class that provides methods to get an
* Authenticator. If proper credentials are provided KerberosAuthenticator is
* returned which can be used to execute as the authenticated principal ,
* or else a SimpleAuthenticator which executes without any authentication
*/
public class FlumeAuthenticationUtil {

private FlumeAuthenticationUtil() {}

private static KerberosAuthenticator kerbAuthenticator;

/**
* If principal and keytab are null, this method returns a SimpleAuthenticator
* which executes without authentication. If valid credentials are
* provided KerberosAuthenitcator is returned which can be used to execute as
* the authenticated principal. Invalid credentials result in
* IllegalArgumentException and Failure to authenticate results in SecurityException
*
* @param principal
* @param keytab
* @return FlumeAuthenticator
*
* @throws org.apache.flume.auth.SecurityException
*/
public synchronized static FlumeAuthenticator getAuthenticator(
String principal, String keytab) throws SecurityException {

if(principal == null && keytab == null) {
return SimpleAuthenticator.getSimpleAuthenticator();
}

Preconditions.checkArgument(principal != null,
"Principal can not be null when keytab is provided");
Preconditions.checkArgument(keytab != null,
"Keytab can not be null when Principal is provided");

if(kerbAuthenticator == null) {
kerbAuthenticator = new KerberosAuthenticator();
}
kerbAuthenticator.authenticate(principal, keytab);

return kerbAuthenticator;
}

/**
* Returns the standard SaslGssCallbackHandler from the hadoop common module
*
* @return CallbackHandler
*/
public static CallbackHandler getSaslGssCallbackHandler() {
return new SaslRpcServer.SaslGssCallbackHandler();
}

/**
* Resolves the principal using Hadoop common's SecurityUtil and splits
* the kerberos principal into three parts user name, host and kerberos realm
*
* @param principal
* @return String[] of username, hostname and kerberos realm
* @throws IOException
*/
public static String[] splitKerberosName(String principal) throws IOException {
String resolvedPrinc = SecurityUtil.getServerPrincipal(principal, "");
return SaslRpcServer.splitKerberosName(resolvedPrinc);
}
}






@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.auth;

/**
* FlumeAuthenticator extends on a PrivilegedExecutor providing capabilities to
* proxy as a different user
*/
public interface FlumeAuthenticator extends PrivilegedExecutor {
/**
* Returns the current instance if proxyUsername is null or
* returns the proxied Executor if proxyUserName is valid
* @param proxyUserName
* @return PrivilegedExecutor
*/
public PrivilegedExecutor proxyAs(String proxyUserName);

/**
* Returns true, if the underlying Authenticator was obtained by
* successful kerberos authentication
* @return boolean
*/
public boolean isAuthenticated();

/**
* For Authenticators backed by credentials, this method refreshes the
* credentials periodically
*/
public void startCredentialRefresher();
}

0 comments on commit 542b169

Please sign in to comment.