Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.SocketFactory;
Expand All @@ -50,9 +51,9 @@
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.Tuple;

/**
* This is a base class that is helpful when building processors interacting with HDFS.
Expand Down Expand Up @@ -132,15 +133,24 @@ public ValidationResult validate(String subject, String input, ValidationContext
.description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();

private static final PropertyDescriptor KERBEROS_RENEWAL_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Renewal Period").required(false)
.description("Period of time which should pass before renewing the kerberos ticket").defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

protected static final List<PropertyDescriptor> properties;

private static final Object RESOURCES_LOCK = new Object();

private long ticketRenewalThresholdSeconds;
private long lastTicketRenewal;

static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HADOOP_CONFIGURATION_RESOURCES);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_KEYTAB);
props.add(KERBEROS_RENEWAL_PERIOD);
properties = Collections.unmodifiableList(props);
try {
NIFI_PROPERTIES = NiFiProperties.getInstance();
Expand All @@ -154,12 +164,12 @@ public ValidationResult validate(String subject, String input, ValidationContext
}

// variables shared by all threads of this processor
// Hadoop Configuration and FileSystem
private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();

@Override
protected void init(ProcessorInitializationContext context) {
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
hdfsResources.set(new HdfsResources(null, null, null));
}

@Override
Expand All @@ -173,8 +183,13 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@OnScheduled
public final void abstractOnScheduled(ProcessContext context) throws IOException {
try {
Tuple<Configuration, FileSystem> resources = hdfsResources.get();
if (resources.getKey() == null || resources.getValue() == null) {
// This value will be null when called from ListHDFS, because it overrides all of the default
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
if (context.getProperty(KERBEROS_RENEWAL_PERIOD).getValue() != null) {
ticketRenewalThresholdSeconds = context.getProperty(KERBEROS_RENEWAL_PERIOD).asTimePeriod(TimeUnit.SECONDS);
}
HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) {
String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
dir = dir == null ? "/" : dir;
Expand All @@ -183,14 +198,14 @@ public final void abstractOnScheduled(ProcessContext context) throws IOException
}
} catch (IOException ex) {
getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
hdfsResources.set(new HdfsResources(null, null, null));
throw ex;
}
}

@OnStopped
public final void abstractOnStopped() {
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
hdfsResources.set(new HdfsResources(null, null, null));
}

private static Configuration getConfigurationFromResources(String configResources) throws IOException {
Expand Down Expand Up @@ -224,7 +239,7 @@ private static Configuration getConfigurationFromResources(String configResource
/*
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
*/
Tuple<Configuration, FileSystem> resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
// org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
// later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
// NarThreadContextClassLoader.
Expand All @@ -244,13 +259,15 @@ Tuple<Configuration, FileSystem> resetHDFSResources(String configResources, Stri
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
FileSystem fs = null;
UserGroupInformation ugi = null;
synchronized (RESOURCES_LOCK) {
if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) {
String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue();
String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue();
UserGroupInformation.setConfiguration(config);
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
fs = getFileSystemAsUser(config, ugi);
lastTicketRenewal = System.currentTimeMillis() / 1000;
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
Expand All @@ -260,7 +277,7 @@ Tuple<Configuration, FileSystem> resetHDFSResources(String configResources, Stri
config.set(disableCacheName, "true");
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() });
return new Tuple<>(config, fs);
return new HdfsResources(config, fs, ugi);

} finally {
Thread.currentThread().setContextClassLoader(savedClassLoader);
Expand Down Expand Up @@ -392,10 +409,61 @@ public static String getPathDifference(final Path root, final Path child) {
}

protected Configuration getConfiguration() {
return hdfsResources.get().getKey();
return hdfsResources.get().getConfiguration();
}

protected FileSystem getFileSystem() {
return hdfsResources.get().getValue();
// if kerberos is enabled, check if the ticket should be renewed before returning the FS
if (getUserGroupInformation() != null && isTicketOld()) {
renewKerberosTicket(getUserGroupInformation());
}
return hdfsResources.get().getFileSystem();
}

protected UserGroupInformation getUserGroupInformation() {
return hdfsResources.get().getUserGroupInformation();
}

protected void renewKerberosTicket(UserGroupInformation ugi) {
try {
getLogger().info(String.format("Kerberos ticket age exceeds threshold [%d seconds], " +
"attempting to renew ticket for user [%s]",
ticketRenewalThresholdSeconds, ugi.getUserName()));
ugi.checkTGTAndReloginFromKeytab();
lastTicketRenewal = System.currentTimeMillis() / 1000;
getLogger().info("Kerberos ticket successfully renewed!");
} catch (IOException e) {
getLogger().error("Failed to renew Kerberos ticket\n" + e.getMessage());
throw new ProcessException("Unable to renew kerberos ticket\n" + e.getMessage());
}
}

protected boolean isTicketOld() {
return (System.currentTimeMillis() / 1000 - lastTicketRenewal) > ticketRenewalThresholdSeconds;
}


protected class HdfsResources {
private Configuration configuration;
private FileSystem fileSystem;
private UserGroupInformation userGroupInformation;

public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) {
this.configuration = configuration;
this.fileSystem = fileSystem;
this.userGroupInformation = userGroupInformation;
}

public Configuration getConfiguration() {
return configuration;
}

public FileSystem getFileSystem() {
return fileSystem;
}

public UserGroupInformation getUserGroupInformation() {
return userGroupInformation;
}
}
}