Skip to content

Commit

Permalink
FLUME-2343. Add Kerberos and user impersonation support to Dataset Sink.
Browse files Browse the repository at this point in the history
(Ryan Blue via Hari Shreedharan)
  • Loading branch information
harishreedharan committed Mar 12, 2014
1 parent 63d26c1 commit 96b090b
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 12 deletions.
24 changes: 14 additions & 10 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Expand Up @@ -2047,16 +2047,20 @@ Note 2: In some cases, file rolling may occur slightly after the roll interval
has been exceeded. However, this delay will not exceed 5 seconds. In most
cases, the delay is neglegible.

===================== ======= ===========================================================
Property Name Default Description
===================== ======= ===========================================================
**channel** --
**type** -- Must be org.apache.flume.sink.kite.DatasetSink
**kite.repo.uri** -- URI of the repository to open
**kite.dataset.name** -- Name of the Dataset where records will be written
kite.batchSize 100 Number of records to process in each batch
kite.rollInterval 30 Maximum wait time (seconds) before data files are released
===================== ======= ===========================================================
======================= ======= ===========================================================
Property Name Default Description
======================= ======= ===========================================================
**channel** --
**type** -- Must be org.apache.flume.sink.kite.DatasetSink
**kite.repo.uri** -- URI of the repository to open
**kite.dataset.name** -- Name of the Dataset where records will be written
kite.batchSize 100 Number of records to process in each batch
kite.rollInterval 30 Maximum wait time (seconds) before data files are released
auth.kerberosPrincipal -- Kerberos user principal for secure authentication to HDFS
auth.kerberosKeytab -- Kerberos keytab location (local FS) for the principal
auth.proxyUser -- The effective user for HDFS actions, if different from
the kerberos principal
======================= ======= ===========================================================

Custom Sink
~~~~~~~~~~~
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetRepositories;
import org.kitesdk.data.DatasetWriter;
Expand All @@ -69,8 +71,10 @@ public class DatasetSink extends AbstractSink implements Configurable {
private String repositoryURI = null;
private String datasetName = null;
private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;

private Dataset<Object> targetDataset = null;
private DatasetWriter<Object> writer = null;
private UserGroupInformation login = null;
private SinkCounter counter = null;

// for rolling files at a given interval
Expand Down Expand Up @@ -130,14 +134,30 @@ protected List<String> allowedFormats() {

@Override
public void configure(Context context) {
// initialize login credentials
this.login = KerberosUtil.login(
context.getString(DatasetSinkConstants.AUTH_PRINCIPAL),
context.getString(DatasetSinkConstants.AUTH_KEYTAB));
String effectiveUser =
context.getString(DatasetSinkConstants.AUTH_PROXY_USER);
if (effectiveUser != null) {
this.login = KerberosUtil.proxyAs(effectiveUser, login);
}

this.repositoryURI = context.getString(
DatasetSinkConstants.CONFIG_KITE_REPO_URI);
Preconditions.checkNotNull(repositoryURI, "Repository URI is missing");
this.datasetName = context.getString(
DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
Preconditions.checkNotNull(datasetName, "Dataset name is missing");
this.targetDataset = DatasetRepositories.open(repositoryURI)
.load(datasetName);

this.targetDataset = KerberosUtil.runPrivileged(login,
new PrivilegedExceptionAction<Dataset<Object>>() {
@Override
public Dataset<Object> run() {
return DatasetRepositories.open(repositoryURI).load(datasetName);
}
});

String formatName = targetDataset.getDescriptor().getFormat().getName();
Preconditions.checkArgument(allowedFormats().contains(formatName),
Expand Down
Expand Up @@ -48,4 +48,10 @@ public class DatasetSinkConstants {
"flume.avro.schema.literal";
public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url";

/**
* Hadoop authentication settings
*/
public static final String AUTH_PROXY_USER = "auth.proxyUser";
public static final String AUTH_PRINCIPAL = "auth.kerberosPrincipal";
public static final String AUTH_KEYTAB = "auth.kerberosKeytab";
}
@@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flume.sink.kite;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KerberosUtil {

private static final Logger LOG = LoggerFactory.getLogger(KerberosUtil.class);

public static class SecurityException extends RuntimeException {
private SecurityException(String message) {
super(message);
}

private SecurityException(String message, Throwable cause) {
super(message, cause);
}

private SecurityException(Throwable cause) {
super(cause);
}
}

public static UserGroupInformation proxyAs(String username,
UserGroupInformation login) {
Preconditions.checkArgument(username != null && !username.isEmpty(),
"Invalid username: " + String.valueOf(username));
Preconditions.checkArgument(login != null,
"Cannot proxy without an authenticated user");

// hadoop impersonation works with or without kerberos security
return UserGroupInformation.createProxyUser(username, login);
}

/**
* Static synchronized method for static Kerberos login. <br/>
* Static synchronized due to a thundering herd problem when multiple Sinks
* attempt to log in using the same principal at the same time with the
* intention of impersonating different users (or even the same user).
* If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
* attach and it returns:
* <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
* In addition, since the underlying Hadoop APIs we are using for
* impersonation are static, we define this method as static as well.
*
* @param principal
* Fully-qualified principal to use for authentication.
* @param keytab
* Location of keytab file containing credentials for principal.
* @return Logged-in user
* @throws SecurityException
* if login fails.
* @throws IllegalArgumentException
* if the principal or the keytab is not usable
*/
public static synchronized UserGroupInformation login(String principal,
String keytab) {
// resolve the requested principal, if it is present
String finalPrincipal = null;
if (principal != null && !principal.isEmpty()) {
try {
// resolves _HOST pattern using standard Hadoop search/replace
// via DNS lookup when 2nd argument is empty
finalPrincipal = SecurityUtil.getServerPrincipal(principal, "");
} catch (IOException e) {
throw new SecurityException(
"Failed to resolve Kerberos principal", e);
}
}

// check if there is a user already logged in
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getLoginUser();
} catch (IOException e) {
// not a big deal but this shouldn't typically happen because it will
// generally fall back to the UNIX user
LOG.debug("Unable to get login user before Kerberos auth attempt", e);
}

// if the current user is valid (matches the given principal) then use it
if (currentUser != null) {
if (finalPrincipal == null ||
finalPrincipal.equals(currentUser.getUserName())) {
LOG.debug("Using existing login for {}: {}",
finalPrincipal, currentUser);
return currentUser;
} else {
// be cruel and unusual when user tries to login as multiple principals
// this isn't really valid with a reconfigure but this should be rare
// enough to warrant a restart of the agent JVM
// TODO: find a way to interrogate the entire current config state,
// since we don't have to be unnecessarily protective if they switch all
// HDFS sinks to use a different principal all at once.
throw new SecurityException(
"Cannot use multiple Kerberos principals: " + finalPrincipal +
" would replace " + currentUser.getUserName());
}
}

// prepare for a new login
Preconditions.checkArgument(principal != null && !principal.isEmpty(),
"Invalid Kerberos principal: " + String.valueOf(principal));
Preconditions.checkNotNull(finalPrincipal,
"Resolved principal must not be null");
Preconditions.checkArgument(keytab != null && !keytab.isEmpty(),
"Invalid Kerberos keytab: " + String.valueOf(keytab));
File keytabFile = new File(keytab);
Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(),
"Keytab is not a readable file: " + String.valueOf(keytab));

try {
// attempt static kerberos login
LOG.debug("Logging in as {} with {}", finalPrincipal, keytab);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
return UserGroupInformation.getLoginUser();
} catch (IOException e) {
throw new SecurityException("Kerberos login failed", e);
}
}

/**
* Allow methods to act with the privileges of a login.
*
* If the login is null, the current privileges will be used.
*
* @param <T> The return type of the action
* @param login UserGroupInformation credentials to use for action
* @param action A PrivilegedExceptionAction to perform as another user
* @return the T value returned by action.run()
*/
public static <T> T runPrivileged(UserGroupInformation login,
PrivilegedExceptionAction<T> action) {
try {
if (login == null) {
return action.run();
} else {
return login.doAs(action);
}
} catch (IOException ex) {
throw new DatasetIOException("Privileged action failed", ex);
} catch (InterruptedException ex) {
Thread.interrupted();
throw new DatasetException(ex);
} catch (Exception ex) {
throw Throwables.propagate(ex);
}
}
}

0 comments on commit 96b090b

Please sign in to comment.