Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
FLUME-2631. End to End authentication in Flume
(Johny Rufus via Hari)
- Loading branch information
1 parent
3d03053
commit 542b169
Showing
29 changed files
with
1,087 additions
and
672 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<artifactId>flume-parent</artifactId> | ||
<groupId>org.apache.flume</groupId> | ||
<version>1.6.0-SNAPSHOT</version> | ||
</parent> | ||
|
||
<artifactId>flume-ng-auth</artifactId> | ||
<name>Flume Auth</name> | ||
<description>Flume Authentication</description> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.rat</groupId> | ||
<artifactId>apache-rat-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.felix</groupId> | ||
<artifactId>maven-bundle-plugin</artifactId> | ||
<version>2.3.7</version> | ||
<inherited>true</inherited> | ||
<extensions>true</extensions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-log4j12</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>${hadoop.common.artifact.id}</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flume</groupId> | ||
<artifactId>flume-ng-sdk</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-minikdc</artifactId> | ||
<version>${hadoop2.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.google.guava</groupId> | ||
<artifactId>guava</artifactId> | ||
</dependency> | ||
|
||
</dependencies> | ||
</project> |
40 changes: 40 additions & 0 deletions
40
flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.flume.api; | ||
|
||
import java.util.Properties; | ||
|
||
/** | ||
* Factory class to construct Flume {@link RPCClient} implementations. | ||
*/ | ||
public class SecureRpcClientFactory { | ||
|
||
/** | ||
* Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating with | ||
* the next hop. | ||
* @param props | ||
* @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift configured with the | ||
* given parameters. | ||
*/ | ||
public static RpcClient getThriftInstance(Properties props) { | ||
ThriftRpcClient client = new SecureThriftRpcClient(); | ||
client.configure(props); | ||
return client; | ||
} | ||
} |
113 changes: 113 additions & 0 deletions
113
flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.flume.api; | ||
|
||
import org.apache.flume.FlumeException; | ||
import org.apache.flume.auth.FlumeAuthenticationUtil; | ||
import org.apache.flume.auth.FlumeAuthenticator; | ||
import org.apache.flume.auth.PrivilegedExecutor; | ||
import org.apache.thrift.transport.*; | ||
|
||
import javax.security.auth.callback.CallbackHandler; | ||
import javax.security.sasl.Sasl; | ||
import java.io.IOException; | ||
import java.security.PrivilegedExceptionAction; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
|
||
public class SecureThriftRpcClient extends ThriftRpcClient { | ||
|
||
private static final String CLIENT_PRINCIPAL = "client-principal"; | ||
private static final String CLIENT_KEYTAB = "client-keytab"; | ||
private static final String SERVER_PRINCIPAL = "server-principal"; | ||
|
||
private String serverPrincipal; | ||
private FlumeAuthenticator privilegedExecutor; | ||
|
||
@Override | ||
protected void configure(Properties properties) throws FlumeException { | ||
super.configure(properties); | ||
serverPrincipal = properties.getProperty(SERVER_PRINCIPAL); | ||
if (serverPrincipal == null || serverPrincipal.isEmpty()) { | ||
throw new IllegalArgumentException("Flume in secure mode, but Flume config doesn't " | ||
+ "specify a server principal to use for Kerberos auth."); | ||
} | ||
String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL); | ||
String keytab = properties.getProperty(CLIENT_KEYTAB); | ||
this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab); | ||
if(!privilegedExecutor.isAuthenticated()) { | ||
throw new FlumeException("Authentication failed in Kerberos mode for " + | ||
"principal " + clientPrincipal + " keytab " + keytab); | ||
} | ||
} | ||
|
||
@Override | ||
protected TTransport getTransport(TSocket tsocket) throws Exception { | ||
Map<String, String> saslProperties = new HashMap<String, String>(); | ||
saslProperties.put(Sasl.QOP, "auth"); | ||
String[] names; | ||
try { | ||
names = FlumeAuthenticationUtil.splitKerberosName(serverPrincipal); | ||
} catch (IOException e) { | ||
throw new FlumeException( | ||
"Error while trying to resolve Principal name - " + serverPrincipal, e); | ||
} | ||
return new UgiSaslClientTransport( | ||
"GSSAPI", null, names[0], names[1], saslProperties, null, tsocket, privilegedExecutor); | ||
} | ||
|
||
/** | ||
* This transport wraps the Sasl transports to set up the right UGI context for open(). | ||
*/ | ||
public static class UgiSaslClientTransport extends TSaslClientTransport { | ||
PrivilegedExecutor privilegedExecutor; | ||
public UgiSaslClientTransport(String mechanism, String authorizationId, | ||
String protocol, String serverName, Map<String, String> props, | ||
CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException { | ||
super(mechanism, authorizationId, protocol, serverName, props, cbh, | ||
transport); | ||
this.privilegedExecutor = privilegedExecutor; | ||
} | ||
|
||
// open the SASL transport with using the current UserGroupInformation | ||
// This is needed to get the current login context stored | ||
@Override | ||
public void open() throws FlumeException { | ||
try { | ||
this.privilegedExecutor.execute( | ||
new PrivilegedExceptionAction<Void>() { | ||
public Void run() throws FlumeException { | ||
try { | ||
UgiSaslClientTransport.super.open(); | ||
} catch (TTransportException e) { | ||
throw new FlumeException("Failed to open SASL transport", e); | ||
} | ||
return null; | ||
} | ||
}); | ||
} catch (InterruptedException e) { | ||
throw new FlumeException( | ||
"Interrupted while opening underlying transport", e); | ||
} catch (Exception e) { | ||
throw new FlumeException("Failed to open SASL transport", e); | ||
} | ||
} | ||
} | ||
} |
99 changes: 99 additions & 0 deletions
99
flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.flume.auth; | ||
|
||
import com.google.common.base.Preconditions; | ||
import org.apache.hadoop.security.SaslRpcServer; | ||
import org.apache.hadoop.security.SecurityUtil; | ||
|
||
import javax.security.auth.callback.CallbackHandler; | ||
import java.io.IOException; | ||
|
||
/** | ||
* FlumeAuthentication utility class that provides methods to get an | ||
* Authenticator. If proper credentials are provided KerberosAuthenticator is | ||
* returned which can be used to execute as the authenticated principal , | ||
* or else a SimpleAuthenticator which executes without any authentication | ||
*/ | ||
public class FlumeAuthenticationUtil { | ||
|
||
private FlumeAuthenticationUtil() {} | ||
|
||
private static KerberosAuthenticator kerbAuthenticator; | ||
|
||
/** | ||
* If principal and keytab are null, this method returns a SimpleAuthenticator | ||
* which executes without authentication. If valid credentials are | ||
* provided KerberosAuthenitcator is returned which can be used to execute as | ||
* the authenticated principal. Invalid credentials result in | ||
* IllegalArgumentException and Failure to authenticate results in SecurityException | ||
* | ||
* @param principal | ||
* @param keytab | ||
* @return FlumeAuthenticator | ||
* | ||
* @throws org.apache.flume.auth.SecurityException | ||
*/ | ||
public synchronized static FlumeAuthenticator getAuthenticator( | ||
String principal, String keytab) throws SecurityException { | ||
|
||
if(principal == null && keytab == null) { | ||
return SimpleAuthenticator.getSimpleAuthenticator(); | ||
} | ||
|
||
Preconditions.checkArgument(principal != null, | ||
"Principal can not be null when keytab is provided"); | ||
Preconditions.checkArgument(keytab != null, | ||
"Keytab can not be null when Principal is provided"); | ||
|
||
if(kerbAuthenticator == null) { | ||
kerbAuthenticator = new KerberosAuthenticator(); | ||
} | ||
kerbAuthenticator.authenticate(principal, keytab); | ||
|
||
return kerbAuthenticator; | ||
} | ||
|
||
/** | ||
* Returns the standard SaslGssCallbackHandler from the hadoop common module | ||
* | ||
* @return CallbackHandler | ||
*/ | ||
public static CallbackHandler getSaslGssCallbackHandler() { | ||
return new SaslRpcServer.SaslGssCallbackHandler(); | ||
} | ||
|
||
/** | ||
* Resolves the principal using Hadoop common's SecurityUtil and splits | ||
* the kerberos principal into three parts user name, host and kerberos realm | ||
* | ||
* @param principal | ||
* @return String[] of username, hostname and kerberos realm | ||
* @throws IOException | ||
*/ | ||
public static String[] splitKerberosName(String principal) throws IOException { | ||
String resolvedPrinc = SecurityUtil.getServerPrincipal(principal, ""); | ||
return SaslRpcServer.splitKerberosName(resolvedPrinc); | ||
} | ||
} | ||
|
||
|
||
|
||
|
||
|
||
|
45 changes: 45 additions & 0 deletions
45
flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.flume.auth; | ||
|
||
/** | ||
* FlumeAuthenticator extends on a PrivilegedExecutor providing capabilities to | ||
* proxy as a different user | ||
*/ | ||
public interface FlumeAuthenticator extends PrivilegedExecutor { | ||
/** | ||
* Returns the current instance if proxyUsername is null or | ||
* returns the proxied Executor if proxyUserName is valid | ||
* @param proxyUserName | ||
* @return PrivilegedExecutor | ||
*/ | ||
public PrivilegedExecutor proxyAs(String proxyUserName); | ||
|
||
/** | ||
* Returns true, if the underlying Authenticator was obtained by | ||
* successful kerberos authentication | ||
* @return boolean | ||
*/ | ||
public boolean isAuthenticated(); | ||
|
||
/** | ||
* For Authenticators backed by credentials, this method refreshes the | ||
* credentials periodically | ||
*/ | ||
public void startCredentialRefresher(); | ||
} |
Oops, something went wrong.