Skip to content

Commit

Permalink
NIFI-4323 Wrapped Get/ListHDFS hadoop operations in ugi.doAs calls
Browse files Browse the repository at this point in the history
NIFI-3472 Removed explicit relogin code from HDFS/Hive/HBase components and updated SecurityUtils.loginKerberos to use UGI.loginUserFromKeytab. This brings those components in line with daemon-process-style usage, made possible by NiFi's InstanceClassloader isolation.  Relogin (on ticket expiry/connection failure) can now be properly handled by hadoop-client code implicitly.
  • Loading branch information
jtstorck committed Dec 23, 2017
1 parent 14d2291 commit 760de56
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.logging.ComponentLog;

import java.io.IOException;

Expand Down Expand Up @@ -51,7 +50,8 @@ public static synchronized UserGroupInformation loginKerberos(final Configuratio
Validate.notNull(keyTab);

UserGroupInformation.setConfiguration(config);
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keyTab.trim());
UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim());
return UserGroupInformation.getCurrentUser();
}

/**
Expand Down Expand Up @@ -85,32 +85,4 @@ public static boolean isSecurityEnabled(final Configuration config) {
Validate.notNull(config);
return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
}

/**
* Start a thread that periodically attempts to renew the current Kerberos user's ticket.
*
* Callers of this method should store the reference to the KerberosTicketRenewer and call stop() to stop the thread.
*
* @param id
* The unique identifier to use for the thread, can be the class name that started the thread
* (i.e. PutHDFS, etc)
* @param ugi
* The current Kerberos user.
* @param renewalPeriod
* The amount of time between attempting renewals.
* @param logger
* The logger to use with in the renewer
*
* @return the KerberosTicketRenewer Runnable
*/
public static KerberosTicketRenewer startTicketRenewalThread(final String id, final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) {
final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, renewalPeriod, logger);

final Thread t = new Thread(renewer);
t.setName("Kerberos Ticket Renewal [" + id + "]");
t.start();

return renewer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
Expand All @@ -37,7 +36,6 @@
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import javax.net.SocketFactory;
Expand All @@ -54,7 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -89,15 +86,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
.defaultValue(CompressionType.NONE.toString())
.build();

public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
.name("Kerberos Relogin Period").required(false)
.description("Period of time which should pass before attempting a kerberos relogin")
.defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();

public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
.name("Additional Classpath Resources")
.description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " +
Expand All @@ -111,8 +99,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {

private static final Object RESOURCES_LOCK = new Object();

private long kerberosReloginThreshold;
private long lastKerberosReloginTime;
protected KerberosProperties kerberosProperties;
protected List<PropertyDescriptor> properties;
private volatile File kerberosConfigFile = null;
Expand All @@ -135,7 +121,6 @@ protected void init(ProcessorInitializationContext context) {
props.add(HADOOP_CONFIGURATION_RESOURCES);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
props.add(KERBEROS_RELOGIN_PERIOD);
props.add(ADDITIONAL_CLASSPATH_RESOURCES);
properties = Collections.unmodifiableList(props);
}
Expand Down Expand Up @@ -195,10 +180,6 @@ public final void abstractOnScheduled(ProcessContext context) throws IOException
try {
// This value will be null when called from ListHDFS, because it overrides all of the default
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
PropertyValue reloginPeriod = context.getProperty(KERBEROS_RELOGIN_PERIOD).evaluateAttributeExpressions();
if (reloginPeriod.getValue() != null) {
kerberosReloginThreshold = reloginPeriod.asTimePeriod(TimeUnit.SECONDS);
}
HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) {
final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
Expand Down Expand Up @@ -274,7 +255,6 @@ HdfsResources resetHDFSResources(String configResources, ProcessContext context)
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
fs = getFileSystemAsUser(config, ugi);
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
Expand Down Expand Up @@ -403,44 +383,11 @@ protected Configuration getConfiguration() {
}

protected FileSystem getFileSystem() {
// trigger Relogin if necessary
getUserGroupInformation();
return hdfsResources.get().getFileSystem();
}

protected UserGroupInformation getUserGroupInformation() {
// if kerberos is enabled, check if the ticket should be renewed before returning
UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation();
if (userGroupInformation != null && isTicketOld()) {
tryKerberosRelogin(userGroupInformation);
}
return userGroupInformation;
}

protected void tryKerberosRelogin(UserGroupInformation ugi) {
try {
getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
"attempting to renew ticket for user {}", new Object[]{
kerberosReloginThreshold, ugi.getUserName()});
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
ugi.checkTGTAndReloginFromKeytab();
return null;
});
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
getLogger().info("Kerberos relogin successful or ticket still valid");
} catch (IOException e) {
// Most likely case of this happening is ticket is expired and error getting a new one,
// meaning dfs operations would fail
getLogger().error("Kerberos relogin failed", e);
throw new ProcessException("Unable to renew kerberos ticket", e);
} catch (InterruptedException e) {
getLogger().error("Interrupted while attempting Kerberos relogin", e);
throw new ProcessException("Unable to renew kerberos ticket", e);
}
}

protected boolean isTicketOld() {
return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold;
return hdfsResources.get().getUserGroupInformation();
}

static protected class HdfsResources {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -50,6 +51,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -295,6 +297,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
context.yield();
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
context.yield();
getLogger().warn("Interrupted while retrieving files", e);
return;
}
}

Expand Down Expand Up @@ -342,13 +349,13 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
for (final Path file : files) {
try {
if (!hdfs.exists(file)) {
if (!getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(file))) {
continue; // if file is no longer there then move on
}
final String originalFilename = file.getName();
final String relativePath = getPathDifference(rootDir, file);

stream = hdfs.open(file, bufferSize);
stream = getUserGroupInformation().doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> hdfs.open(file, bufferSize));

final String outputFilename;
// Check if we should infer compression codec
Expand All @@ -374,7 +381,7 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);

if (!keepSourceFiles && !hdfs.delete(file, false)) {
if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.delete(file, false))) {
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
new Object[]{file});
session.remove(flowFile);
Expand Down Expand Up @@ -406,7 +413,7 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
* @return null if POLLING_INTERVAL has not lapsed. Will return an empty set if no files were found on HDFS that matched the configured filters
* @throws java.io.IOException ex
*/
protected Set<Path> performListing(final ProcessContext context) throws IOException {
protected Set<Path> performListing(final ProcessContext context) throws IOException, InterruptedException {

final long pollingIntervalMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
Expand Down Expand Up @@ -435,7 +442,7 @@ protected Set<Path> performListing(final ProcessContext context) throws IOExcept
* @return files to process
* @throws java.io.IOException ex
*/
protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path> filesVisited) throws IOException {
protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path> filesVisited) throws IOException, InterruptedException {
if (null == filesVisited) {
filesVisited = new HashSet<>();
}
Expand All @@ -446,7 +453,8 @@ protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path>

final Set<Path> files = new HashSet<>();

for (final FileStatus file : hdfs.listStatus(dir)) {
FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(dir));
for (final FileStatus file : fileStatuses) {
if (files.size() >= MAX_WORKING_QUEUE_SIZE) {
// no need to make the files set larger than what we would queue anyway
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -343,6 +344,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
} catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
return;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
getLogger().error("Interrupted while performing listing of HDFS", e);
return;
}

final Set<FileStatus> listable = determineListable(statuses, context);
Expand Down Expand Up @@ -381,11 +386,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException {
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException, InterruptedException {
final Set<FileStatus> statusSet = new HashSet<>();

getLogger().debug("Fetching listing for {}", new Object[] {path});
final FileStatus[] statuses = hdfs.listStatus(path, filter);
final FileStatus[] statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));

for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,11 @@ public void testKerberosOptionsWithEL() throws Exception {
// initialize the runner with EL for the kerberos properties
runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "${variableHadoopConfigResources}");
runner.setProperty(kerberosProperties.getKerberosPrincipal(), "${variablePrincipal}");
runner.setProperty(AbstractHadoopProcessor.KERBEROS_RELOGIN_PERIOD, "${variableReloginPeriod}");
runner.setProperty(kerberosProperties.getKerberosKeytab(), "${variableKeytab}");

// add variables for all the kerberos properties except for the keytab
runner.setVariable("variableHadoopConfigResources", "src/test/resources/core-site-security.xml");
runner.setVariable("variablePrincipal", "principal");
runner.setVariable("variableReloginPeriod", "4m");
// test that the config is not valid, since the EL for keytab will return nothing, no keytab
runner.assertNotValid();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.security.PrivilegedExceptionAction;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand All @@ -46,7 +45,6 @@ public class GetHDFSSequenceFileTest {
private Configuration configuration;
private FileSystem fileSystem;
private UserGroupInformation userGroupInformation;
private boolean isTicketOld;
private boolean reloginTried;

@Before
Expand All @@ -57,7 +55,6 @@ public void setup() throws IOException {
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation);
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
isTicketOld = false;
reloginTried = false;
init();
}
Expand All @@ -68,7 +65,8 @@ private void init() throws IOException {
getHDFSSequenceFile.onScheduled(context);
}

private void getFlowFilesWithUgi() throws Exception {
@Test
public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception {
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);
getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file);
Expand All @@ -77,21 +75,9 @@ private void getFlowFilesWithUgi() throws Exception {
verify(userGroupInformation).doAs(privilegedExceptionActionArgumentCaptor.capture());
privilegedExceptionActionArgumentCaptor.getValue().run();
verify(reader).readSequenceFile(file, configuration, fileSystem);
}

@Test
public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception {
getFlowFilesWithUgi();
assertFalse(reloginTried);
}

@Test
public void getFlowFilesWithUgiAndOldTicketShouldCallDoAsAndRelogin() throws Exception {
isTicketOld = true;
getFlowFilesWithUgi();
assertTrue(reloginTried);
}

@Test
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null);
Expand All @@ -117,15 +103,5 @@ public void onScheduled(ProcessContext context) throws IOException {
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return kerberosProperties;
}

@Override
protected boolean isTicketOld() {
return isTicketOld;
}

@Override
protected void tryKerberosRelogin(UserGroupInformation ugi) {
reloginTried = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,6 @@ public void onConfigured(final ConfigurationContext context) throws Initializati
*/
@OnDisabled
public void shutdown() {

hiveConfigurator.stopRenewer();

try {
dataSource.close();
} catch (final SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,6 @@ public void cleanup() {
}

ugi = null;
hiveConfigurator.stopRenewer();
}

private void setupHeartBeatTimer(int heartbeatInterval) {
Expand Down
Loading

0 comments on commit 760de56

Please sign in to comment.