From 542b1695033d330eb00ae81713fdc838b88332b6 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 5 Mar 2015 23:19:13 -0800 Subject: [PATCH] FLUME-2631. End to End authentication in Flume (Johny Rufus via Hari) --- flume-ng-auth/pom.xml | 88 +++++++ .../flume/api/SecureRpcClientFactory.java | 40 +++ .../flume/api/SecureThriftRpcClient.java | 113 +++++++++ .../flume/auth/FlumeAuthenticationUtil.java | 99 ++++++++ .../apache/flume/auth/FlumeAuthenticator.java | 45 ++++ .../flume/auth/KerberosAuthenticator.java | 233 ++++++++++++++++++ .../apache/flume/auth/PrivilegedExecutor.java | 52 ++++ .../apache/flume/auth/SecurityException.java | 40 +++ .../flume/auth/SimpleAuthenticator.java | 88 +++++++ .../org/apache/flume/auth/UGIExecutor.java | 80 ++++++ .../flume/auth/TestFlumeAuthenticator.java | 53 ++-- flume-ng-core/pom.xml | 5 + .../org/apache/flume/sink/ThriftSink.java | 14 +- .../org/apache/flume/source/ThriftSource.java | 67 ++++- flume-ng-dist/pom.xml | 4 + flume-ng-dist/src/main/assembly/bin.xml | 1 + flume-ng-dist/src/main/assembly/src.xml | 1 + .../api/RpcClientConfigurationConstants.java | 2 + .../org/apache/flume/api/ThriftRpcClient.java | 30 ++- flume-ng-sinks/flume-dataset-sink/pom.xml | 7 - .../apache/flume/sink/kite/DatasetSink.java | 39 +-- .../apache/flume/sink/kite/KerberosUtil.java | 187 -------------- .../apache/flume/sink/hdfs/BucketWriter.java | 37 +-- .../apache/flume/sink/hdfs/HDFSEventSink.java | 229 ++--------------- .../flume/sink/hdfs/TestBucketWriter.java | 28 ++- .../flume/sink/hdfs/TestHDFSEventSink.java | 2 +- .../apache/flume/sink/hbase/HBaseSink.java | 34 +-- .../sink/hbase/HBaseSinkSecurityManager.java | 134 ---------- pom.xml | 7 + 29 files changed, 1087 insertions(+), 672 deletions(-) create mode 100644 flume-ng-auth/pom.xml create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java create mode 100644 flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java rename flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java => flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java (67%) delete mode 100644 flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java diff --git a/flume-ng-auth/pom.xml b/flume-ng-auth/pom.xml new file mode 100644 index 0000000000..292731dcfb --- /dev/null +++ b/flume-ng-auth/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + + flume-parent + org.apache.flume + 1.6.0-SNAPSHOT + + + flume-ng-auth + Flume Auth + Flume Authentication + + + + + org.apache.rat + apache-rat-plugin + + + org.apache.felix + maven-bundle-plugin + 2.3.7 + true + true + + + + + + + + junit + junit + test + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + org.apache.hadoop + ${hadoop.common.artifact.id} + + + + org.apache.flume + flume-ng-sdk + + + + org.apache.hadoop + hadoop-minikdc + ${hadoop2.version} + test + + + + com.google.guava + guava + + + + diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java new file mode 100644 index 0000000000..c976458204 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.api; + +import java.util.Properties; + +/** + * Factory class to construct Flume {@link RPCClient} implementations. + */ +public class SecureRpcClientFactory { + + /** + * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating with + * the next hop. + * @param props + * @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift configured with the + * given parameters. + */ + public static RpcClient getThriftInstance(Properties props) { + ThriftRpcClient client = new SecureThriftRpcClient(); + client.configure(props); + return client; + } +} diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java new file mode 100644 index 0000000000..7316e1b54a --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.api; + +import org.apache.flume.FlumeException; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.FlumeAuthenticator; +import org.apache.flume.auth.PrivilegedExecutor; +import org.apache.thrift.transport.*; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class SecureThriftRpcClient extends ThriftRpcClient { + + private static final String CLIENT_PRINCIPAL = "client-principal"; + private static final String CLIENT_KEYTAB = "client-keytab"; + private static final String SERVER_PRINCIPAL = "server-principal"; + + private String serverPrincipal; + private FlumeAuthenticator privilegedExecutor; + + @Override + protected void configure(Properties properties) throws FlumeException { + super.configure(properties); + serverPrincipal = properties.getProperty(SERVER_PRINCIPAL); + if (serverPrincipal == null || serverPrincipal.isEmpty()) { + throw new IllegalArgumentException("Flume in secure mode, but Flume config doesn't " + + "specify a server principal to use for Kerberos auth."); + } + String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL); + String keytab = properties.getProperty(CLIENT_KEYTAB); + this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab); + if(!privilegedExecutor.isAuthenticated()) { + throw new FlumeException("Authentication failed in Kerberos mode for " + + "principal " + clientPrincipal + " keytab " + keytab); + } + } + + @Override + protected TTransport getTransport(TSocket tsocket) throws Exception { + Map saslProperties = new HashMap(); + saslProperties.put(Sasl.QOP, "auth"); + String[] names; + try { + names = FlumeAuthenticationUtil.splitKerberosName(serverPrincipal); + } catch (IOException e) { + throw new FlumeException( + "Error while trying to resolve Principal name - " + serverPrincipal, e); + } + return new UgiSaslClientTransport( + "GSSAPI", null, names[0], names[1], saslProperties, null, tsocket, privilegedExecutor); + } + + /** + * This transport wraps the Sasl transports to set up the right UGI context for open(). + */ + public static class UgiSaslClientTransport extends TSaslClientTransport { + PrivilegedExecutor privilegedExecutor; + public UgiSaslClientTransport(String mechanism, String authorizationId, + String protocol, String serverName, Map props, + CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException { + super(mechanism, authorizationId, protocol, serverName, props, cbh, + transport); + this.privilegedExecutor = privilegedExecutor; + } + + // open the SASL transport with using the current UserGroupInformation + // This is needed to get the current login context stored + @Override + public void open() throws FlumeException { + try { + this.privilegedExecutor.execute( + new PrivilegedExceptionAction() { + public Void run() throws FlumeException { + try { + UgiSaslClientTransport.super.open(); + } catch (TTransportException e) { + throw new FlumeException("Failed to open SASL transport", e); + } + return null; + } + }); + } catch (InterruptedException e) { + throw new FlumeException( + "Interrupted while opening underlying transport", e); + } catch (Exception e) { + throw new FlumeException("Failed to open SASL transport", e); + } + } + } +} diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java new file mode 100644 index 0000000000..02afc0d1e9 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; + +import javax.security.auth.callback.CallbackHandler; +import java.io.IOException; + +/** + * FlumeAuthentication utility class that provides methods to get an + * Authenticator. If proper credentials are provided KerberosAuthenticator is + * returned which can be used to execute as the authenticated principal , + * or else a SimpleAuthenticator which executes without any authentication + */ +public class FlumeAuthenticationUtil { + + private FlumeAuthenticationUtil() {} + + private static KerberosAuthenticator kerbAuthenticator; + + /** + * If principal and keytab are null, this method returns a SimpleAuthenticator + * which executes without authentication. If valid credentials are + * provided KerberosAuthenitcator is returned which can be used to execute as + * the authenticated principal. Invalid credentials result in + * IllegalArgumentException and Failure to authenticate results in SecurityException + * + * @param principal + * @param keytab + * @return FlumeAuthenticator + * + * @throws org.apache.flume.auth.SecurityException + */ + public synchronized static FlumeAuthenticator getAuthenticator( + String principal, String keytab) throws SecurityException { + + if(principal == null && keytab == null) { + return SimpleAuthenticator.getSimpleAuthenticator(); + } + + Preconditions.checkArgument(principal != null, + "Principal can not be null when keytab is provided"); + Preconditions.checkArgument(keytab != null, + "Keytab can not be null when Principal is provided"); + + if(kerbAuthenticator == null) { + kerbAuthenticator = new KerberosAuthenticator(); + } + kerbAuthenticator.authenticate(principal, keytab); + + return kerbAuthenticator; + } + + /** + * Returns the standard SaslGssCallbackHandler from the hadoop common module + * + * @return CallbackHandler + */ + public static CallbackHandler getSaslGssCallbackHandler() { + return new SaslRpcServer.SaslGssCallbackHandler(); + } + + /** + * Resolves the principal using Hadoop common's SecurityUtil and splits + * the kerberos principal into three parts user name, host and kerberos realm + * + * @param principal + * @return String[] of username, hostname and kerberos realm + * @throws IOException + */ + public static String[] splitKerberosName(String principal) throws IOException { + String resolvedPrinc = SecurityUtil.getServerPrincipal(principal, ""); + return SaslRpcServer.splitKerberosName(resolvedPrinc); + } +} + + + + + + diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java new file mode 100644 index 0000000000..dbe241d722 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +/** + * FlumeAuthenticator extends on a PrivilegedExecutor providing capabilities to + * proxy as a different user + */ +public interface FlumeAuthenticator extends PrivilegedExecutor { + /** + * Returns the current instance if proxyUsername is null or + * returns the proxied Executor if proxyUserName is valid + * @param proxyUserName + * @return PrivilegedExecutor + */ + public PrivilegedExecutor proxyAs(String proxyUserName); + + /** + * Returns true, if the underlying Authenticator was obtained by + * successful kerberos authentication + * @return boolean + */ + public boolean isAuthenticated(); + + /** + * For Authenticators backed by credentials, this method refreshes the + * credentials periodically + */ + public void startCredentialRefresher(); +} diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java new file mode 100644 index 0000000000..324404601a --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; + +import com.google.common.base.Preconditions; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; + +/** + * A kerberos authenticator, which authenticates using the supplied principal + * and keytab and executes with authenticated privileges + */ +class KerberosAuthenticator implements FlumeAuthenticator { + + private static final Logger LOG = LoggerFactory + .getLogger(KerberosAuthenticator.class); + + private volatile UserGroupInformation ugi; + private volatile PrivilegedExecutor privilegedExecutor; + private Map proxyCache = new HashMap(); + + + @Override + public T execute(PrivilegedAction action) { + return privilegedExecutor.execute(action); + } + + @Override + public T execute(PrivilegedExceptionAction action) throws Exception { + return privilegedExecutor.execute(action); + } + + @Override + public synchronized PrivilegedExecutor proxyAs(String proxyUserName) { + if(proxyUserName == null || proxyUserName.isEmpty()) { + return this; + } + if(proxyCache.get(proxyUserName) == null) { + UserGroupInformation proxyUgi; + proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, ugi); + printUGI(proxyUgi); + proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi)); + } + return proxyCache.get(proxyUserName); + } + + @Override + public boolean isAuthenticated() { + return true; + } + + /** + * When valid principal and keytab are provided and if authentication has + * not yet been done for this object, this method authenticates the + * credentials and populates the ugi. In case of null or invalid credentials + * IllegalArgumentException is thrown. In case of failure to authenticate, + * SecurityException is thrown. If authentication has already happened on + * this KerberosAuthenticator object, then this method checks to see if the current + * credentials passed are same as the validated credentials. If not, it throws + * an exception as this authenticator can represent only one Principal. + * + * @param principal + * @param keytab + */ + public synchronized void authenticate(String principal, String keytab) { + // sanity checking + + Preconditions.checkArgument(principal != null && !principal.isEmpty(), + "Invalid Kerberos principal: " + String.valueOf(principal)); + Preconditions.checkArgument(keytab != null && !keytab.isEmpty(), + "Invalid Kerberos keytab: " + String.valueOf(keytab)); + File keytabFile = new File(keytab); + Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(), + "Keytab is not a readable file: " + String.valueOf(keytab)); + + + // resolve the requested principal + String resolvedPrincipal; + try { + // resolves _HOST pattern using standard Hadoop search/replace + // via DNS lookup when 2nd argument is empty + resolvedPrincipal = SecurityUtil.getServerPrincipal(principal, ""); + } catch (IOException e) { + throw new IllegalArgumentException("Host lookup error resolving kerberos principal (" + + principal + "). Exception follows.", e); + } + Preconditions.checkNotNull(resolvedPrincipal, + "Resolved Principal must not be null"); + + + // be cruel and unusual when user tries to login as multiple principals + // this isn't really valid with a reconfigure but this should be rare + // enough to warrant a restart of the agent JVM + // TODO: find a way to interrogate the entire current config state, + // since we don't have to be unnecessarily protective if they switch all + // HDFS sinks to use a different principal all at once. + + Preconditions.checkState(ugi == null || ugi.getUserName().equals(resolvedPrincipal), + "Cannot use multiple kerberos principals in the same agent. " + + " Must restart agent to use new principal or keytab. " + + "Previous = %s, New = %s", ugi, resolvedPrincipal); + + + // enable the kerberos mode of UGI, before doing anything else + if(!UserGroupInformation.isSecurityEnabled()) { + Configuration conf = new Configuration(false); + conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + // We are interested in currently logged in user with kerberos creds + UserGroupInformation curUser = null; + try { + curUser = UserGroupInformation.getLoginUser(); + if(curUser != null && !curUser.hasKerberosCredentials()) { + curUser = null; + } + } catch (IOException e) { + LOG.warn("User unexpectedly had no active login. Continuing with " + + "authentication", e); + } + + /* + * if ugi is not null, + * if ugi matches currently logged in kerberos user, we are good + * else we are logged out, so relogin our ugi + * else if ugi is null, login and populate state + */ + try { + if (ugi != null) { + if (curUser != null && curUser.getUserName().equals(ugi.getUserName())) { + LOG.debug("Using existing principal login: {}", ugi); + } else { + LOG.info("Attempting kerberos Re-login as principal ({}) " + , new Object[] { ugi.getUserName() } ); + ugi.reloginFromKeytab(); + } + } else { + LOG.info("Attempting kerberos login as principal ({}) from keytab " + + "file ({})", new Object[] { resolvedPrincipal, keytab } ); + UserGroupInformation.loginUserFromKeytab(resolvedPrincipal, keytab); + this.ugi = UserGroupInformation.getLoginUser(); + this.privilegedExecutor = new UGIExecutor(this.ugi); + } + } catch (IOException e) { + throw new SecurityException("Authentication error while attempting to " + + "login as kerberos principal (" + resolvedPrincipal + ") using " + + "keytab (" + keytab + "). Exception follows.", e); + } + + printUGI(this.ugi); + } + + private void printUGI(UserGroupInformation ugi) { + if (ugi != null) { + // dump login information + AuthenticationMethod authMethod = ugi.getAuthenticationMethod(); + LOG.info("\n{} \nUser: {} \nAuth method: {} \nKeytab: {} \n", + new Object[]{ authMethod.equals(AuthenticationMethod.PROXY) ? + "Proxy as: " : "Logged as: ", ugi.getUserName(), authMethod, + ugi.isFromKeytab() } + ); + } + } + + /** + * startCredentialRefresher should be used only for long running + * methods like Thrift source. For all privileged methods that use a UGI, the + * credentials are checked automatically and refreshed before the + * privileged method is executed in the UGIExecutor + */ + @Override + public void startCredentialRefresher() { + int CHECK_TGT_INTERVAL = 120; // seconds + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + ugi.checkTGTAndReloginFromKeytab(); + } catch (IOException e) { + LOG.warn("Error occured during checkTGTAndReloginFromKeytab() for user " + + ugi.getUserName(), e); + } + } + }, CHECK_TGT_INTERVAL, CHECK_TGT_INTERVAL, TimeUnit.SECONDS); + } + + @VisibleForTesting + String getUserName() { + if(ugi != null) { + return ugi.getUserName(); + } else { + return null; + } + } +} + + + diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java new file mode 100644 index 0000000000..0aa321a70c --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + + +/** + * PrivilegedExecutor provides the ability to execute a PrivilegedAction + * or a PrivilegedExceptionAction. Implementors of this class, can chose to execute + * in normal mode or secure authenticated mode + */ +public interface PrivilegedExecutor { + /** + * This method is used to execute a privileged action, the implementor can + * chose to execute the action using the appropriate privileges + * + * @param action A PrivilegedExceptionAction to perform as the desired user + * @param The return type of the action + * @return T the T value returned by action.run() + * @throws Exception + */ + public T execute(PrivilegedExceptionAction action) throws Exception; + + /** + * This method is used to execute a privileged action, the implementor can + * chose to execute the action using the appropriate privileges + * + * @param action A PrivilegedAction to perform as the desired user + * @param The return type of the action + * @return T the T value returned by action.run() + */ + public T execute(PrivilegedAction action); +} + + diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java new file mode 100644 index 0000000000..5760481c17 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +/** + * SecurityException thrown in the Flume security module + */ +public class SecurityException extends RuntimeException { + public SecurityException(String message) { + super(message); + } + + public SecurityException(String message, Throwable cause) { + super(message, cause); + } + + public SecurityException(Throwable cause) { + super(cause); + } +} + + + + + diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java new file mode 100644 index 0000000000..f7b5beac06 --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; + +/** + * A no-op authenticator, which does not authenticate and executes + * without any authenticated privileges + */ +class SimpleAuthenticator implements FlumeAuthenticator { + private SimpleAuthenticator() {} + + private static class SimpleAuthenticatorHolder { + public static SimpleAuthenticator authenticator = new SimpleAuthenticator(); + } + + public static SimpleAuthenticator getSimpleAuthenticator() { + return SimpleAuthenticatorHolder.authenticator; + } + + private Map proxyCache = + new HashMap(); + + + @Override + public T execute(PrivilegedExceptionAction action) + throws Exception { + return action.run(); + } + + @Override + public T execute(PrivilegedAction action) { + return action.run(); + } + + @Override + public synchronized PrivilegedExecutor proxyAs(String proxyUserName) { + if(proxyUserName == null || proxyUserName.isEmpty()) { + return this; + } + if(proxyCache.get(proxyUserName) == null) { + UserGroupInformation proxyUgi; + try { + proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, + UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + throw new SecurityException("Unable to create proxy User", e); + } + proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi)); + } + return proxyCache.get(proxyUserName); + } + + @Override + public boolean isAuthenticated() { + return false; + } + + @Override + public void startCredentialRefresher() { + // no-op + } + +} + + diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java new file mode 100644 index 0000000000..a5aeef2eda --- /dev/null +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.auth; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + +class UGIExecutor implements PrivilegedExecutor { + private UserGroupInformation ugi; + + UGIExecutor(UserGroupInformation ugi) { + this.ugi = ugi; + } + + @Override + public T execute(PrivilegedAction action) { + ensureValidAuth(); + return ugi.doAs(action); + } + + @Override + public T execute(PrivilegedExceptionAction action) throws Exception { + ensureValidAuth(); + try { + return ugi.doAs(action); + } catch (IOException ex) { + throw new SecurityException("Privileged action failed", ex); + } catch (InterruptedException ex) { + Thread.interrupted(); + throw new SecurityException(ex); + } + } + + private void ensureValidAuth() { + reloginUGI(ugi); + if(ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) { + reloginUGI(ugi.getRealUser()); + } + } + + private void reloginUGI(UserGroupInformation ugi) { + try { + if(ugi.hasKerberosCredentials()) { + ugi.checkTGTAndReloginFromKeytab(); + } + } catch (IOException e) { + throw new SecurityException("Error trying to relogin from keytab for user " + + ugi.getUserName(), e); + } + } + + @VisibleForTesting + String getUserName() { + if(ugi != null) { + return ugi.getUserName(); + } else { + return null; + } + } +} diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java similarity index 67% rename from flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java rename to flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java index f53ef73597..45ba2b0a78 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java +++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java @@ -15,21 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flume.sink.kite; +package org.apache.flume.auth; import java.io.File; import java.io.IOException; -import java.net.URL; import java.util.Properties; -import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.minikdc.MiniKdc; -import org.apache.hadoop.security.UserGroupInformation; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; -public class TestKerberosUtil { +public class TestFlumeAuthenticator { private static MiniKdc kdc; private static File workDir; @@ -41,12 +39,8 @@ public class TestKerberosUtil { @BeforeClass public static void startMiniKdc() throws Exception { - URL resource = Thread.currentThread() - .getContextClassLoader().getResource("enable-kerberos.xml"); - Configuration.addDefaultResource("enable-kerberos.xml"); - workDir = new File(System.getProperty("test.dir", "target"), - TestKerberosUtil.class.getSimpleName()); + TestFlumeAuthenticator.class.getSimpleName()); flumeKeytab = new File(workDir, "flume.keytab"); aliceKeytab = new File(workDir, "alice.keytab"); conf = MiniKdc.createConf(); @@ -72,9 +66,10 @@ public static void stopMiniKdc() { public void testNullLogin() throws IOException { String principal = null; String keytab = null; - UserGroupInformation expResult = UserGroupInformation.getCurrentUser(); - UserGroupInformation result = KerberosUtil.login(principal, keytab); - assertEquals(expResult, result); + + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + assertFalse(authenticator.isAuthenticated()); } @Test @@ -83,21 +78,29 @@ public void testFlumeLogin() throws IOException { String keytab = flumeKeytab.getAbsolutePath(); String expResult = principal; - String result = KerberosUtil.login(principal, keytab).getUserName(); + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + assertTrue(authenticator.isAuthenticated()); + + String result = ((KerberosAuthenticator)authenticator).getUserName(); assertEquals("Initial login failed", expResult, result); - result = KerberosUtil.login(principal, keytab).getUserName(); + authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + result = ((KerberosAuthenticator)authenticator).getUserName(); assertEquals("Re-login failed", expResult, result); principal = alicePrincipal; keytab = aliceKeytab.getAbsolutePath(); try { - result = KerberosUtil.login(principal, keytab).getUserName(); + authenticator = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab); + result = ((KerberosAuthenticator)authenticator).getUserName(); fail("Login should have failed with a new principal: " + result); - } catch (KerberosUtil.SecurityException ex) { + } catch (Exception ex) { assertTrue("Login with a new principal failed, but for an unexpected " + "reason: " + ex.getMessage(), - ex.getMessage().contains("Cannot use multiple Kerberos principals: ")); + ex.getMessage().contains("Cannot use multiple kerberos principals")); } } @@ -105,16 +108,20 @@ public void testFlumeLogin() throws IOException { public void testProxyAs() throws IOException { String username = "alice"; - UserGroupInformation login = UserGroupInformation.getCurrentUser(); String expResult = username; - String result = KerberosUtil.proxyAs(username, login).getUserName(); + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( + null, null); + String result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName(); assertEquals("Proxy as didn't generate the expected username", expResult, result); - login = KerberosUtil.login(flumePrincipal, flumeKeytab.getAbsolutePath()); + authenticator = FlumeAuthenticationUtil.getAuthenticator( + flumePrincipal, flumeKeytab.getAbsolutePath()); + + String login = ((KerberosAuthenticator)authenticator).getUserName(); assertEquals("Login succeeded, but the principal doesn't match", - flumePrincipal, login.getUserName()); + flumePrincipal, login); - result = KerberosUtil.proxyAs(username, login).getUserName(); + result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName(); assertEquals("Proxy as didn't generate the expected username", expResult, result); } diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index 89924142cf..fe34c0344d 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -263,6 +263,11 @@ limitations under the License. flume-ng-configuration + + org.apache.flume + flume-ng-auth + + org.slf4j slf4j-api diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java index baa60d0cf1..32021d37df 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java @@ -23,6 +23,8 @@ import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientConfigurationConstants; import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.api.SecureRpcClientFactory; + /** *

* A {@link org.apache.flume.Sink} implementation that can send events to an RPC server (such as @@ -102,12 +104,18 @@ public class ThriftSink extends AbstractRpcSink { @Override protected RpcClient initializeRpcClient(Properties props) { - props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, - RpcClientFactory.ClientType.THRIFT.name()); // Only one thread is enough, since only one sink thread processes // transactions at any given time. Each sink owns its own Rpc client. props.setProperty(RpcClientConfigurationConstants .CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1)); - return RpcClientFactory.getInstance(props); + boolean enableKerberos = Boolean.parseBoolean(props.getProperty( + RpcClientConfigurationConstants.KERBEROS_KEY, "false")); + if(enableKerberos) { + return SecureRpcClientFactory.getThriftInstance(props); + } else { + props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, + RpcClientFactory.ClientType.THRIFT.name()); + return RpcClientFactory.getInstance(props); + } } } diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index 06bb604a2f..1d8bb333b4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java @@ -26,6 +26,8 @@ import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.FlumeException; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.FlumeAuthenticator; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; @@ -45,12 +47,16 @@ import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TSSLTransportFactory; +import org.apache.thrift.transport.TTransportFactory; +import org.apache.thrift.transport.TSaslServerTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLServerSocket; +import javax.security.sasl.Sasl; import java.io.FileInputStream; +import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -60,10 +66,13 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.security.PrivilegedAction; public class ThriftSource extends AbstractSource implements Configurable, EventDrivenSource { @@ -97,6 +106,10 @@ public class ThriftSource extends AbstractSource implements Configurable, private static final String KEYMANAGER_TYPE = "keymanager-type"; private static final String EXCLUDE_PROTOCOLS = "exclude-protocols"; + private static final String KERBEROS_KEY = "kerberos"; + private static final String AGENT_PRINCIPAL = "agent-principal"; + private static final String AGENT_KEYTAB = "agent-keytab"; + private Integer port; private String bindAddress; private int maxThreads = 0; @@ -110,6 +123,9 @@ public class ThriftSource extends AbstractSource implements Configurable, private String keyManagerType; private final List excludeProtocols = new LinkedList(); private boolean enableSsl = false; + private boolean enableKerberos = false; + private String principal; + private FlumeAuthenticator flumeAuth; @Override public void configure(Context context) { @@ -171,6 +187,18 @@ public void configure(Context context) { "Thrift source configured with invalid keystore: " + keystore, ex); } } + + principal = context.getString(AGENT_PRINCIPAL); + String keytab = context.getString(AGENT_KEYTAB); + enableKerberos = context.getBoolean(KERBEROS_KEY, false); + this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator(principal, keytab); + if(enableKerberos) { + if(!flumeAuth.isAuthenticated()) { + throw new FlumeException("Authentication failed in Kerberos mode for " + + "principal " + principal + " keytab " + keytab); + } + flumeAuth.startCredentialRefresher(); + } } @Override @@ -195,7 +223,15 @@ public void start() { servingExecutor.submit(new Runnable() { @Override public void run() { - server.serve(); + flumeAuth.execute( + new PrivilegedAction() { + @Override + public Object run() { + server.serve(); + return null; + } + } + ); } }); @@ -263,7 +299,7 @@ private TProtocolFactory getProtocolFactory() { } private TServer getTThreadedSelectorServer() { - if(enableSsl) { + if(enableSsl || enableKerberos) { return null; } Class serverClass; @@ -277,6 +313,7 @@ private TServer getTThreadedSelectorServer() { TServerTransport serverTransport = new TNonblockingServerSocket( new InetSocketAddress(bindAddress, port)); + ExecutorService sourceService; ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( "Flume Thrift IPC Thread %d").build(); @@ -328,14 +365,35 @@ private void populateServerParams(TServer.AbstractServerArgs args) { args.protocolFactory(getProtocolFactory()); //populate the transportFactory - args.inputTransportFactory(new TFastFramedTransport.Factory()); - args.outputTransportFactory(new TFastFramedTransport.Factory()); + if(enableKerberos) { + args.transportFactory(getSASLTransportFactory()); + } else { + args.transportFactory(new TFastFramedTransport.Factory()); + } // populate the Processor args.processor(new ThriftSourceProtocol .Processor(new ThriftSourceHandler())); } + private TTransportFactory getSASLTransportFactory() { + String[] names; + try { + names = FlumeAuthenticationUtil.splitKerberosName(principal); + } catch (IOException e) { + throw new FlumeException( + "Error while trying to resolve Principal name - " + principal, e); + } + Map saslProperties = new HashMap(); + saslProperties.put(Sasl.QOP, "auth"); + TSaslServerTransport.Factory saslTransportFactory = + new TSaslServerTransport.Factory(); + saslTransportFactory.addServerDefinition( + "GSSAPI", names[0], names[1], saslProperties, + FlumeAuthenticationUtil.getSaslGssCallbackHandler()); + return saslTransportFactory; + } + @Override public void stop() { if(server != null && server.isServing()) { @@ -402,5 +460,4 @@ public Status appendBatch(List events) throws TException { return Status.OK; } } - } diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index a083fe2e0e..9f7c4f63ec 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -202,6 +202,10 @@ org.apache.flume flume-tools + + org.apache.flume + flume-ng-auth + diff --git a/flume-ng-dist/src/main/assembly/bin.xml b/flume-ng-dist/src/main/assembly/bin.xml index 5aa7cc6574..a61180d9cd 100644 --- a/flume-ng-dist/src/main/assembly/bin.xml +++ b/flume-ng-dist/src/main/assembly/bin.xml @@ -68,6 +68,7 @@ flume-ng-clients/** flume-ng-embedded-agent/** flume-tools/** + flume-ng-auth/** **/target/** **/.classpath **/.project diff --git a/flume-ng-dist/src/main/assembly/src.xml b/flume-ng-dist/src/main/assembly/src.xml index b1e79a2279..e5f4156bf8 100644 --- a/flume-ng-dist/src/main/assembly/src.xml +++ b/flume-ng-dist/src/main/assembly/src.xml @@ -49,6 +49,7 @@ org.apache.flume:flume-ng-clients org.apache.flume:flume-ng-embedded-agent org.apache.flume:flume-tools + org.apache.flume:flume-ng-auth diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java index 33a2330b01..343e07b1d5 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java @@ -145,6 +145,8 @@ public final class RpcClientConfigurationConstants { public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type"; public static final String CONFIG_EXCLUDE_PROTOCOLS = "exclude-protocols"; + public static final String KERBEROS_KEY = "kerberos"; + /** * Configuration constants for the NettyAvroRpcClient * NioClientSocketChannelFactory diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java index 4f75a2b464..5c4cc4192a 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java @@ -28,6 +28,7 @@ import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.transport.TFastFramedTransport; import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,7 @@ public class ThriftRpcClient extends AbstractRpcClient { public static final String CONFIG_PROTOCOL = "protocol"; public static final String BINARY_PROTOCOL = "binary"; public static final String COMPACT_PROTOCOL = "compact"; - + private int batchSize; private long requestTimeout; private final Lock stateLock; @@ -83,7 +84,6 @@ public class ThriftRpcClient extends AbstractRpcClient { private ConnectionPoolManager connectionManager; private final ExecutorService callTimeoutPool; private final AtomicLong threadCounter; - private int connectionPoolSize; private final Random random = new Random(); private String protocol; @@ -95,7 +95,6 @@ public class ThriftRpcClient extends AbstractRpcClient { private static final String TRUSTMANAGER_TYPE = "trustmanager-type"; private final List excludeProtocols = new LinkedList(); - public ThriftRpcClient() { stateLock = new ReentrantLock(true); connState = State.INIT; @@ -319,7 +318,7 @@ protected void configure(Properties properties) throws FlumeException { requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS; } - connectionPoolSize = Integer.parseInt(properties.getProperty( + int connectionPoolSize = Integer.parseInt(properties.getProperty( RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE, String.valueOf(RpcClientConfigurationConstants .DEFAULT_CONNECTION_POOL_SIZE))); @@ -352,6 +351,7 @@ protected void configure(Properties properties) throws FlumeException { } } } + connectionManager = new ConnectionPoolManager(connectionPoolSize); connState = State.READY; } catch (Throwable ex) { @@ -372,33 +372,41 @@ private static enum State { INIT, READY, DEAD } + protected TTransport getTransport(TSocket tsocket) throws Exception { + return new TFastFramedTransport(tsocket); + } + /** * Wrapper around a client and transport, so we can clean up when this * client gets closed. */ private class ClientWrapper { public final ThriftSourceProtocol.Client client; - public final TFastFramedTransport transport; + public final TTransport transport; private final int hashCode; public ClientWrapper() throws Exception{ TSocket tsocket; if(enableSsl) { - // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have - // to do some magic to make sure that happens. Not an issue in JDK7 - // Lifted from thrift-0.9.1 to make the SSLContext - SSLContext sslContext = createSSLContext(truststore, truststorePassword, trustManagerType, truststoreType); + // JDK6's factory doesn't appear to pass the protocol onto the Socket + // properly so we have to do some magic to make sure that happens. + // Not an issue in JDK7 Lifted from thrift-0.9.1 to make the SSLContext + SSLContext sslContext = createSSLContext(truststore, truststorePassword, + trustManagerType, truststoreType); // Create the factory from it SSLSocketFactory sslSockFactory = sslContext.getSocketFactory(); // Create the TSocket from that - tsocket = createSSLSocket(sslSockFactory, hostname, port, 120000, excludeProtocols); + tsocket = createSSLSocket( + sslSockFactory, hostname, port, 120000, excludeProtocols); } else { tsocket = new TSocket(hostname, port); } - transport = new TFastFramedTransport(tsocket); + + transport = getTransport(tsocket); + // The transport is already open for SSL as part of TSSLTransportFactory.getClientSocket if(!transport.isOpen()) { transport.open(); diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml index ad3f603537..92f7021655 100644 --- a/flume-ng-sinks/flume-dataset-sink/pom.xml +++ b/flume-ng-sinks/flume-dataset-sink/pom.xml @@ -149,13 +149,6 @@ limitations under the License. test - - org.apache.hadoop - hadoop-minikdc - ${hadoop2.version} - test - - org.slf4j slf4j-log4j12 diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index fd9f99113a..a9f42b8992 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -17,6 +17,8 @@ */ package org.apache.flume.sink.kite; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.sink.kite.parser.EntityParserFactory; import org.apache.flume.sink.kite.parser.EntityParser; import org.apache.flume.sink.kite.policy.FailurePolicy; @@ -25,8 +27,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; + import java.net.URI; -import java.security.PrivilegedExceptionAction; +import java.security.PrivilegedAction; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; @@ -40,7 +43,6 @@ import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; -import org.apache.hadoop.security.UserGroupInformation; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetIOException; @@ -72,7 +74,7 @@ public class DatasetSink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class); private Context context = null; - private UserGroupInformation login = null; + private PrivilegedExecutor privilegedExecutor; private String datasetName = null; private URI datasetUri = null; @@ -159,15 +161,12 @@ protected List allowedFormats() { public void configure(Context context) { this.context = context; - // initialize login credentials - this.login = KerberosUtil.login( - context.getString(AUTH_PRINCIPAL), - context.getString(AUTH_KEYTAB)); - String effectiveUser - = context.getString(AUTH_PROXY_USER); - if (effectiveUser != null) { - this.login = KerberosUtil.proxyAs(effectiveUser, login); - } + String principal = context.getString(AUTH_PRINCIPAL); + String keytab = context.getString(AUTH_KEYTAB); + String effectiveUser = context.getString(AUTH_PROXY_USER); + + this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator( + principal, keytab).proxyAs(effectiveUser); // Get the dataset URI and name from the context String datasetURI = context.getString(CONFIG_KITE_DATASET_URI); @@ -395,13 +394,15 @@ void createWriter() throws EventDeliveryException { // reset the commited flag whenver a new writer is created committedBatch = false; try { - View view = KerberosUtil.runPrivileged(login, - new PrivilegedExceptionAction>() { - @Override - public Dataset run() { - return Datasets.load(datasetUri); - } - }); + View view; + + view = privilegedExecutor.execute( + new PrivilegedAction>() { + @Override + public Dataset run() { + return Datasets.load(datasetUri); + } + }); DatasetDescriptor descriptor = view.getDataset().getDescriptor(); Format format = descriptor.getFormat(); diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java deleted file mode 100644 index c0dbffbd64..0000000000 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flume.sink.kite; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import java.io.File; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.kitesdk.data.DatasetException; -import org.kitesdk.data.DatasetIOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KerberosUtil { - - private static final Logger LOG = LoggerFactory.getLogger(KerberosUtil.class); - - public static class SecurityException extends RuntimeException { - private SecurityException(String message) { - super(message); - } - - private SecurityException(String message, Throwable cause) { - super(message, cause); - } - - private SecurityException(Throwable cause) { - super(cause); - } - } - - public static UserGroupInformation proxyAs(String username, - UserGroupInformation login) { - Preconditions.checkArgument(username != null && !username.isEmpty(), - "Invalid username: " + String.valueOf(username)); - Preconditions.checkArgument(login != null, - "Cannot proxy without an authenticated user"); - - // hadoop impersonation works with or without kerberos security - return UserGroupInformation.createProxyUser(username, login); - } - - /** - * Static synchronized method for static Kerberos login.
- * Static synchronized due to a thundering herd problem when multiple Sinks - * attempt to log in using the same principal at the same time with the - * intention of impersonating different users (or even the same user). - * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay - * attach and it returns: - *
Request is a replay (34) - PROCESS_TGS
- * In addition, since the underlying Hadoop APIs we are using for - * impersonation are static, we define this method as static as well. - * - * @param principal - * Fully-qualified principal to use for authentication. - * @param keytab - * Location of keytab file containing credentials for principal. - * @return Logged-in user - * @throws SecurityException - * if login fails. - * @throws IllegalArgumentException - * if the principal or the keytab is not usable - */ - public static synchronized UserGroupInformation login(String principal, - String keytab) { - // If the principal or keytab isn't set, get the current (Linux) user - if (principal == null || keytab == null) { - try { - return UserGroupInformation.getCurrentUser(); - } catch (IOException ex) { - LOG.error("Can't get current user: {}", ex.getMessage()); - throw new RuntimeException(ex); - } - } - - // resolve the requested principal, if it is present - String finalPrincipal = null; - if (principal != null && !principal.isEmpty()) { - try { - // resolves _HOST pattern using standard Hadoop search/replace - // via DNS lookup when 2nd argument is empty - finalPrincipal = SecurityUtil.getServerPrincipal(principal, ""); - } catch (IOException e) { - throw new SecurityException( - "Failed to resolve Kerberos principal", e); - } - } - - // check if there is a user already logged in - UserGroupInformation currentUser = null; - try { - currentUser = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - // not a big deal but this shouldn't typically happen because it will - // generally fall back to the UNIX user - LOG.debug("Unable to get login user before Kerberos auth attempt", e); - } - - // if the current user is valid (matches the given principal and has a TGT) - // then use it - if (currentUser != null && currentUser.hasKerberosCredentials()) { - if (finalPrincipal == null || - finalPrincipal.equals(currentUser.getUserName())) { - LOG.debug("Using existing login for {}: {}", - finalPrincipal, currentUser); - return currentUser; - } else { - // be cruel and unusual when user tries to login as multiple principals - // this isn't really valid with a reconfigure but this should be rare - // enough to warrant a restart of the agent JVM - // TODO: find a way to interrogate the entire current config state, - // since we don't have to be unnecessarily protective if they switch all - // HDFS sinks to use a different principal all at once. - throw new SecurityException( - "Cannot use multiple Kerberos principals: " + finalPrincipal + - " would replace " + currentUser.getUserName()); - } - } - - // prepare for a new login - Preconditions.checkArgument(principal != null && !principal.isEmpty(), - "Invalid Kerberos principal: " + String.valueOf(principal)); - Preconditions.checkNotNull(finalPrincipal, - "Resolved principal must not be null"); - Preconditions.checkArgument(keytab != null && !keytab.isEmpty(), - "Invalid Kerberos keytab: " + String.valueOf(keytab)); - File keytabFile = new File(keytab); - Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(), - "Keytab is not a readable file: " + String.valueOf(keytab)); - - try { - // attempt static kerberos login - LOG.debug("Logging in as {} with {}", finalPrincipal, keytab); - UserGroupInformation.loginUserFromKeytab(principal, keytab); - return UserGroupInformation.getLoginUser(); - } catch (IOException e) { - throw new SecurityException("Kerberos login failed", e); - } - } - - /** - * Allow methods to act with the privileges of a login. - * - * If the login is null, the current privileges will be used. - * - * @param The return type of the action - * @param login UserGroupInformation credentials to use for action - * @param action A PrivilegedExceptionAction to perform as another user - * @return the T value returned by action.run() - */ - public static T runPrivileged(UserGroupInformation login, - PrivilegedExceptionAction action) { - try { - if (login == null) { - return action.run(); - } else { - return login.doAs(action); - } - } catch (IOException ex) { - throw new DatasetIOException("Privileged action failed", ex); - } catch (InterruptedException ex) { - Thread.interrupted(); - throw new DatasetException(ex); - } catch (Exception ex) { - throw Throwables.propagate(ex); - } - } -} diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 62f4eee875..6b97de6e12 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -38,6 +38,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.SystemClock; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback; import org.apache.hadoop.conf.Configuration; @@ -45,7 +46,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +75,7 @@ class BucketWriter { private final CompressionCodec codeC; private final CompressionType compType; private final ScheduledExecutorService timedRollerPool; - private final UserGroupInformation user; + private final PrivilegedExecutor proxyUser; private final AtomicLong fileExtensionCounter; @@ -120,7 +120,7 @@ class BucketWriter { Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, - ScheduledExecutorService timedRollerPool, UserGroupInformation user, + ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, @@ -138,7 +138,7 @@ class BucketWriter { this.compType = compType; this.writer = writer; this.timedRollerPool = timedRollerPool; - this.user = user; + this.proxyUser = proxyUser; this.sinkCounter = sinkCounter; this.idleTimeout = idleTimeout; this.onCloseCallback = onCloseCallback; @@ -165,33 +165,6 @@ void setMockStream(HDFSWriter dataWriter) { this.writer = dataWriter; } - /** - * Allow methods to act as another user (typically used for HDFS Kerberos) - * @param - * @param action - * @return - * @throws IOException - * @throws InterruptedException - */ - private T runPrivileged(final PrivilegedExceptionAction action) - throws IOException, InterruptedException { - - if (user != null) { - return user.doAs(action); - } else { - try { - return action.run(); - } catch (IOException ex) { - throw ex; - } catch (InterruptedException ex) { - throw ex; - } catch (RuntimeException ex) { - throw ex; - } catch (Exception ex) { - throw new RuntimeException("Unexpected exception.", ex); - } - } - } /** * Clear the class counters @@ -700,7 +673,7 @@ private T callWithTimeout(final CallRunner callRunner) Future future = callTimeoutPool.submit(new Callable() { @Override public T call() throws Exception { - return runPrivileged(new PrivilegedExceptionAction() { + return proxyUser.execute(new PrivilegedExceptionAction() { @Override public T run() throws Exception { return callRunner.call(); diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 33f73a9cf0..9a48841c4c 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -18,7 +18,6 @@ package org.apache.flume.sink.hdfs; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; @@ -31,7 +30,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import org.apache.flume.Channel; @@ -41,6 +39,9 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.SystemClock; import org.apache.flume.Transaction; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.FlumeAuthenticator; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.SinkCounter; @@ -50,9 +51,6 @@ import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,12 +98,6 @@ public interface WriterCallback { private static final int defaultThreadPoolSize = 10; private static final int defaultRollTimerPoolSize = 1; - /** - * Singleton credential manager that manages static credentials for the - * entire JVM - */ - private static final AtomicReference staticLogin - = new AtomicReference(); private final HDFSWriterFactory writerFactory; private WriterLinkedHashMap sfWriters; @@ -129,11 +121,6 @@ public interface WriterCallback { private ExecutorService callTimeoutPool; private ScheduledExecutorService timedRollerPool; - private String kerbConfPrincipal; - private String kerbKeytab; - private String proxyUserName; - private UserGroupInformation proxyTicket; - private boolean needRounding = false; private int roundUnit = Calendar.SECOND; private int roundValue = 1; @@ -150,6 +137,7 @@ public interface WriterCallback { private final Object sfWritersLock = new Object(); private long retryInterval; private int tryCount; + private PrivilegedExecutor privExecutor; /* @@ -225,9 +213,9 @@ public void configure(Context context) { defaultThreadPoolSize); rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize", defaultRollTimerPoolSize); - kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", ""); - kerbKeytab = context.getString("hdfs.kerberosKeytab", ""); - proxyUserName = context.getString("hdfs.proxyUser", ""); + String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal"); + String kerbKeytab = context.getString("hdfs.kerberosKeytab"); + String proxyUser = context.getString("hdfs.proxyUser"); tryCount = context.getInteger("hdfs.closeTries", defaultTryCount); if(tryCount <= 0) { LOG.warn("Retry count value : " + tryCount + " is not " + @@ -269,9 +257,13 @@ public void configure(Context context) { + " when fileType is: " + fileType); } - if (!authenticate()) { - LOG.error("Failed to authenticate!"); - } + // get the appropriate executor + this.privExecutor = FlumeAuthenticationUtil.getAuthenticator( + kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser); + + + + needRounding = context.getBoolean("hdfs.round", false); if(needRounding) { @@ -482,7 +474,7 @@ private BucketWriter initializeBucketWriter(String realPath, rollSize, rollCount, batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, suffix, codeC, compType, hdfsWriter, timedRollerPool, - proxyTicket, sinkCounter, idleTimeout, closeCallback, + privExecutor, sinkCounter, idleTimeout, closeCallback, lookupPath, callTimeout, callTimeoutPool, retryInterval, tryCount); if(mockFs != null) { @@ -551,197 +543,6 @@ public void start() { super.start(); } - private boolean authenticate() { - - // logic for kerberos login - boolean useSecurity = UserGroupInformation.isSecurityEnabled(); - - LOG.info("Hadoop Security enabled: " + useSecurity); - - if (useSecurity) { - - // sanity checking - if (kerbConfPrincipal.isEmpty()) { - LOG.error("Hadoop running in secure mode, but Flume config doesn't " - + "specify a principal to use for Kerberos auth."); - return false; - } - if (kerbKeytab.isEmpty()) { - LOG.error("Hadoop running in secure mode, but Flume config doesn't " - + "specify a keytab to use for Kerberos auth."); - return false; - } else { - //If keytab is specified, user should want it take effect. - //HDFSEventSink will halt when keytab file is non-exist or unreadable - File kfile = new File(kerbKeytab); - if (!(kfile.isFile() && kfile.canRead())) { - throw new IllegalArgumentException("The keyTab file: " - + kerbKeytab + " is nonexistent or can't read. " - + "Please specify a readable keytab file for Kerberos auth."); - } - } - - String principal; - try { - // resolves _HOST pattern using standard Hadoop search/replace - // via DNS lookup when 2nd argument is empty - principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, ""); - } catch (IOException e) { - LOG.error("Host lookup error resolving kerberos principal (" - + kerbConfPrincipal + "). Exception follows.", e); - return false; - } - - Preconditions.checkNotNull(principal, "Principal must not be null"); - KerberosUser prevUser = staticLogin.get(); - KerberosUser newUser = new KerberosUser(principal, kerbKeytab); - - // be cruel and unusual when user tries to login as multiple principals - // this isn't really valid with a reconfigure but this should be rare - // enough to warrant a restart of the agent JVM - // TODO: find a way to interrogate the entire current config state, - // since we don't have to be unnecessarily protective if they switch all - // HDFS sinks to use a different principal all at once. - Preconditions.checkState(prevUser == null || prevUser.equals(newUser), - "Cannot use multiple kerberos principals in the same agent. " + - " Must restart agent to use new principal or keytab. " + - "Previous = %s, New = %s", prevUser, newUser); - - // attempt to use cached credential if the user is the same - // this is polite and should avoid flooding the KDC with auth requests - UserGroupInformation curUser = null; - if (prevUser != null && prevUser.equals(newUser)) { - try { - curUser = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - LOG.warn("User unexpectedly had no active login. Continuing with " + - "authentication", e); - } - } - - if (curUser == null || !curUser.getUserName().equals(principal)) { - try { - // static login - kerberosLogin(this, principal, kerbKeytab); - } catch (IOException e) { - LOG.error("Authentication or file read error while attempting to " - + "login as kerberos principal (" + principal + ") using " - + "keytab (" + kerbKeytab + "). Exception follows.", e); - return false; - } - } else { - LOG.debug("{}: Using existing principal login: {}", this, curUser); - } - - // we supposedly got through this unscathed... so store the static user - staticLogin.set(newUser); - } - - // hadoop impersonation works with or without kerberos security - proxyTicket = null; - if (!proxyUserName.isEmpty()) { - try { - proxyTicket = UserGroupInformation.createProxyUser( - proxyUserName, UserGroupInformation.getLoginUser()); - } catch (IOException e) { - LOG.error("Unable to login as proxy user. Exception follows.", e); - return false; - } - } - - UserGroupInformation ugi = null; - if (proxyTicket != null) { - ugi = proxyTicket; - } else if (useSecurity) { - try { - ugi = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - LOG.error("Unexpected error: Unable to get authenticated user after " + - "apparent successful login! Exception follows.", e); - return false; - } - } - - if (ugi != null) { - // dump login information - AuthenticationMethod authMethod = ugi.getAuthenticationMethod(); - LOG.info("Auth method: {}", authMethod); - LOG.info(" User name: {}", ugi.getUserName()); - LOG.info(" Using keytab: {}", ugi.isFromKeytab()); - if (authMethod == AuthenticationMethod.PROXY) { - UserGroupInformation superUser; - try { - superUser = UserGroupInformation.getLoginUser(); - LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod()); - LOG.info(" Superuser name: {}", superUser.getUserName()); - LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab()); - } catch (IOException e) { - LOG.error("Unexpected error: unknown superuser impersonating proxy.", - e); - return false; - } - } - - LOG.info("Logged in as user {}", ugi.getUserName()); - - return true; - } - - return true; - } - - /** - * Static synchronized method for static Kerberos login.
- * Static synchronized due to a thundering herd problem when multiple Sinks - * attempt to log in using the same principal at the same time with the - * intention of impersonating different users (or even the same user). - * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay - * attach and it returns: - *
Request is a replay (34) - PROCESS_TGS
- * In addition, since the underlying Hadoop APIs we are using for - * impersonation are static, we define this method as static as well. - * - * @param principal - * Fully-qualified principal to use for authentication. - * @param keytab - * Location of keytab file containing credentials for principal. - * @return Logged-in user - * @throws IOException - * if login fails. - */ - private static synchronized UserGroupInformation kerberosLogin( - HDFSEventSink sink, String principal, String keytab) throws IOException { - - // if we are the 2nd user thru the lock, the login should already be - // available statically if login was successful - UserGroupInformation curUser = null; - try { - curUser = UserGroupInformation.getLoginUser(); - } catch (IOException e) { - // not a big deal but this shouldn't typically happen because it will - // generally fall back to the UNIX user - LOG.debug("Unable to get login user before Kerberos auth attempt.", e); - } - - // we already have logged in successfully - if (curUser != null && curUser.getUserName().equals(principal)) { - LOG.debug("{}: Using existing principal ({}): {}", - new Object[]{sink, principal, curUser}); - - // no principal found - } else { - - LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " + - "file ({})", new Object[]{sink, principal, keytab}); - - // attempt static kerberos login - UserGroupInformation.loginUserFromKeytab(principal, keytab); - curUser = UserGroupInformation.getLoginUser(); - } - - return curUser; - } - @Override public String toString() { return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 7c74b162e7..2581f73992 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -29,6 +29,8 @@ import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SinkCounter; import org.apache.hadoop.conf.Configuration; @@ -53,10 +55,12 @@ public class TestBucketWriter { private Context ctx = new Context(); private static ScheduledExecutorService timedRollerPool; + private static PrivilegedExecutor proxy; @BeforeClass public static void setup() { timedRollerPool = Executors.newSingleThreadScheduledExecutor(); + proxy = FlumeAuthenticationUtil.getAuthenticator(null, null).proxyAs(null); } @AfterClass @@ -72,7 +76,7 @@ public void testEventCountingRoller() throws IOException, InterruptedException { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -97,7 +101,7 @@ public void testSizeRoller() throws IOException, InterruptedException { BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool, - null, new SinkCounter("test-bucket-writer-" + + proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -124,7 +128,7 @@ public void testIntervalRoller() throws IOException, InterruptedException { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, new HDFSEventSink.WriterCallback() { @Override @@ -147,7 +151,7 @@ public void run(String filePath) { bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -230,7 +234,7 @@ public void append(Event e) throws IOException { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter("test-bucket-writer-" + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -255,7 +259,7 @@ public void testFileSuffixNotGiven() throws IOException, InterruptedException { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter("test-bucket-writer-" + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -283,7 +287,7 @@ public void testFileSuffixGiven() throws IOException, InterruptedException { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter( + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -316,7 +320,7 @@ public void testFileSuffixCompressed() 0, ctx, "/tmp", "file", "", ".tmp", suffix, HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter, - timedRollerPool, null, new SinkCounter("test-bucket-writer-" + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0 ); @@ -348,7 +352,7 @@ public void testInUsePrefix() throws IOException, InterruptedException { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter( + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -368,7 +372,7 @@ public void testInUseSuffix() throws IOException, InterruptedException { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, null, new SinkCounter( + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); @@ -388,7 +392,7 @@ public void testCallbackOnClose() throws IOException, InterruptedException { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, + hdfsWriter, timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, new HDFSEventSink.WriterCallback() { @@ -442,7 +446,7 @@ public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null, null, new MockDataStream(mockFs), - timedRollerPool, null, + timedRollerPool, proxy, new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 1, diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 1b7a364716..23862eb44e 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -276,7 +276,7 @@ public void testKerbFileAccess() throws InterruptedException, Assert.fail("no exception thrown"); } catch (IllegalArgumentException expected) { Assert.assertTrue(expected.getMessage().contains( - "is nonexistent or can't read.")); + "Keytab is not a readable file")); } finally { //turn security off conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index 5de0bd56ef..e659ada928 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -36,6 +36,8 @@ import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; @@ -54,7 +56,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.hbase.security.User; /** @@ -103,11 +104,11 @@ public class HBaseSink extends AbstractSink implements Configurable { private Context serializerContext; private String kerberosPrincipal; private String kerberosKeytab; - private User hbaseUser; private boolean enableWal = true; private boolean batchIncrements = false; private Method refGetFamilyMap = null; private SinkCounter sinkCounter; + private PrivilegedExecutor privilegedExecutor; // Internal hooks used for unit testing. private DebugIncrementsCallback debugIncrCallback = null; @@ -132,17 +133,14 @@ public void start(){ Preconditions.checkArgument(table == null, "Please call stop " + "before calling start on an old instance."); try { - if (HBaseSinkSecurityManager.isSecurityEnabled(config)) { - hbaseUser = HBaseSinkSecurityManager.login(config, null, - kerberosPrincipal, kerberosKeytab); - } + privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab); } catch (Exception ex) { sinkCounter.incrementConnectionFailedCount(); throw new FlumeException("Failed to login to HBase using " + "provided credentials.", ex); } try { - table = runPrivileged(new PrivilegedExceptionAction() { + table = privilegedExecutor.execute(new PrivilegedExceptionAction() { @Override public HTable run() throws Exception { HTable table = new HTable(config, tableName); @@ -160,7 +158,7 @@ public HTable run() throws Exception { " from HBase", e); } try { - if (!runPrivileged(new PrivilegedExceptionAction() { + if (!privilegedExecutor.execute(new PrivilegedExceptionAction() { @Override public Boolean run() throws IOException { return table.getTableDescriptor().hasFamily(columnFamily); @@ -233,8 +231,8 @@ public void configure(Context context){ logger.error("Could not instantiate event serializer." , e); Throwables.propagate(e); } - kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, ""); - kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL, ""); + kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB); + kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL); enableWal = context.getBoolean(HBaseSinkConfigurationConstants .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); @@ -371,7 +369,7 @@ public Status process() throws EventDeliveryException { private void putEventsAndCommit(final List actions, final List incs, Transaction txn) throws Exception { - runPrivileged(new PrivilegedExceptionAction() { + privilegedExecutor.execute(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { for (Row r : actions) { @@ -388,7 +386,7 @@ public Void run() throws Exception { } }); - runPrivileged(new PrivilegedExceptionAction() { + privilegedExecutor.execute(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { @@ -416,18 +414,6 @@ public Void run() throws Exception { sinkCounter.addToEventDrainSuccessCount(actions.size()); } - private T runPrivileged(final PrivilegedExceptionAction action) - throws Exception { - if(hbaseUser != null) { - if (logger.isDebugEnabled()) { - logger.debug("Calling runAs as hbase user: " + hbaseUser.getName()); - } - return hbaseUser.runAs(action); - } else { - return action.run(); - } - } - /** * The method getFamilyMap() is no longer available in Hbase 0.96. * We must use reflection to determine which version we may use. diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java deleted file mode 100644 index 762fce98ee..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.base.Preconditions; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import org.apache.flume.FlumeException; -import org.apache.flume.sink.hdfs.KerberosUser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class to handle logging into HBase with the credentials passed in. - */ -public class HBaseSinkSecurityManager { - - /* - * volatile for safe publication. Since this is updated only by - * a single thread (configuration) and read later by the sink threads, - * this can just be volatile, no need of Atomic reference. - */ - private volatile static KerberosUser loggedInUser; - private static final String FLUME_KEYTAB_KEY = "flume.keytab.key"; - private static final String FLUME_PRINCIPAL_KEY = "flume.principal.key"; - private static final Logger LOG = - LoggerFactory.getLogger(HBaseSinkSecurityManager.class); - - /** - * Checks if security is enabled for the HBase cluster. - * - * @return - true if security is enabled on the HBase cluster and - * the underlying HDFS cluster. - */ - public static boolean isSecurityEnabled(Configuration conf) { - return User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf); - } - - /** - * Login the user using the configuration, and the hostname specified to use - * for logging in. - * - * @param conf - Configuration to use for logging the user in. - * @param hostname - The hostname to use for logging the user in. If no - * hostname is specified (null or empty string), the canonical hostname for - * the address returned by {@linkplain InetAddress#getLocalHost()} will be - * used. - * @return The logged in HBase {@linkplain User}. - * @throws IOException if login failed, or hostname lookup failed. - */ - public static synchronized User login(Configuration conf, String hostname, - String kerberosPrincipal, String kerberosKeytab) throws IOException { - if (kerberosPrincipal.isEmpty()) { - String msg = "Login failed, since kerberos principal was not specified."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - if (kerberosKeytab.isEmpty()) { - String msg = "Login failed, since kerberos keytab was not specified."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } else { - //If keytab is specified, user should want it take effect. - //HDFSEventSink will halt when keytab file is non-exist or unreadable - File kfile = new File(kerberosKeytab); - if (!(kfile.isFile() && kfile.canRead())) { - throw new IllegalArgumentException("The keyTab file: " - + kerberosKeytab + " is nonexistent or can't read. " - + "Please specify a readable keytab file for Kerberos auth."); - } - } - String principal = kerberosPrincipal; - try { - // resolves _HOST pattern using standard Hadoop search/replace - // via DNS lookup when 2nd argument is empty - principal = SecurityUtil.getServerPrincipal(kerberosPrincipal,""); - } catch (IOException e) { - LOG.error("Host lookup error resolving kerberos principal (" - + kerberosPrincipal + "). Exception follows.", e); - throw e; - } - Preconditions.checkNotNull(principal, "Principal must not be null"); - KerberosUser newUser = new KerberosUser(principal, kerberosKeytab); - //The HDFS Sink does not allow login credentials to change. - //To be uniform, we will do the same thing here. - User hbaseUser = null; - boolean loggedIn = false; - if (loggedInUser != null) { - Preconditions.checkArgument(newUser.equals(loggedInUser), - "Cannot switch kerberos credentials during a reconfiguration. " - + "Please restart the agent to set the new credentials."); - try { - hbaseUser = User.create(UserGroupInformation.getLoginUser()); - loggedIn = true; - } catch (IOException ex) { - LOG.warn("Previous login does not exist, " - + "will authenticate against KDC"); - } - } - if (!loggedIn) { - if (hostname == null || hostname.isEmpty()) { - hostname = InetAddress.getLocalHost().getCanonicalHostName(); - } - conf.set(FLUME_KEYTAB_KEY, kerberosKeytab); - conf.set(FLUME_PRINCIPAL_KEY, principal); - User.login(conf, FLUME_KEYTAB_KEY, FLUME_PRINCIPAL_KEY, hostname); - hbaseUser = User.create(UserGroupInformation.getLoginUser()); - loggedInUser = newUser; - //TODO: Set the loggedInUser to the current user. - LOG.info("Logged into HBase as user: " + hbaseUser.getName()); - } - return hbaseUser; - } -} diff --git a/pom.xml b/pom.xml index 3e405585d9..aad8be6ad8 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ limitations under the License. flume-ng-sdk flume-ng-tests flume-tools + flume-ng-auth @@ -1225,6 +1226,12 @@ limitations under the License. test
+ + org.apache.flume + flume-ng-auth + 1.6.0-SNAPSHOT + + org.apache.flume.flume-ng-clients flume-ng-log4jappender