From 8434d54b1fda626118090d1d206f2c62f2f39e85 Mon Sep 17 00:00:00 2001 From: WilliamNouet Date: Mon, 6 Mar 2017 13:21:48 -0500 Subject: [PATCH] Add Kerberos Support to Kite --- .../nifi-kite-processors/pom.xml | 16 ++- .../kite/AbstractKiteProcessor.java | 110 ++++++++++++++++-- .../processors/kite/ConvertCSVToAvro.java | 23 ++-- .../processors/kite/ConvertJSONToAvro.java | 17 +-- .../processors/kite/StoreInKiteDataset.java | 104 +++++++++-------- 5 files changed, 190 insertions(+), 80 deletions(-) diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml index 74289fdd6e93..8a32a29f96ac 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml @@ -40,7 +40,21 @@ org.apache.nifi nifi-utils - + + org.apache.nifi + nifi-hadoop-utils + + + org.apache.hadoop + hadoop-common + + + + + org.apache.nifi + nifi-properties + + diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java index f90c089244cb..2cf4be373c54 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java @@ -44,8 +44,31 @@ import org.kitesdk.data.URIBuilder; import org.kitesdk.data.spi.DefaultConfiguration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.processor.ProcessorInitializationContext; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicReference; + abstract class AbstractKiteProcessor extends AbstractProcessor { + protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { + return new KerberosProperties(kerberosConfigFile); + } + + protected KerberosProperties kerberosProperties; + private volatile File kerberosConfigFile = null; + private String kerberosPrincipal; + private String kerberosKeytab; + private Configuration configuration; + private static UserGroupInformation ugi; + private final AtomicReference hdfsResources = new AtomicReference<>(); + private static final Splitter COMMA = Splitter.on(',').trimResults(); protected static final Validator FILES_EXIST = new Validator() { @Override @@ -128,7 +151,8 @@ protected static Schema getSchema(String uriOrLiteral, Configuration conf) { } else { // try to open the file Path schemaPath = new Path(uri); - FileSystem fs = schemaPath.getFileSystem(conf); + FileSystem fs; + fs = getFileSystemAsUser(conf, ugi); try (InputStream in = fs.open(schemaPath)) { return parseSchema(uri, in); } @@ -183,11 +207,9 @@ public ValidationResult validate(String subject, String uri, ValidationContext c } }; - protected static final List ABSTRACT_KITE_PROPS = ImmutableList.builder() - .add(CONF_XML_FILES) - .build(); + protected List ABSTRACT_KITE_PROPS; - static List getProperties() { + protected List getProperties() { return ABSTRACT_KITE_PROPS; } @@ -198,6 +220,79 @@ protected void setDefaultConfiguration(ProcessContext context) context.getProperty(CONF_XML_FILES).getValue())); } + + @Override + protected void init(ProcessorInitializationContext context) { + hdfsResources.set(new HdfsResources(null, null)); + + kerberosConfigFile = context.getKerberosConfigurationFile(); + kerberosProperties = getKerberosProperties(kerberosConfigFile); + + List props = new ArrayList<>(); + props.add(CONF_XML_FILES); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); + ABSTRACT_KITE_PROPS = Collections.unmodifiableList(props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return ABSTRACT_KITE_PROPS; + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws Exception { + configuration = getConfiguration(context.getProperty(CONF_XML_FILES).getValue()); + if (SecurityUtil.isSecurityEnabled(configuration)) { + kerberosPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + kerberosKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + ugi = SecurityUtil.loginKerberos(configuration, kerberosPrincipal, kerberosKeytab); + } else { + configuration.set("ipc.client.fallback-to-simple-auth-allowed", "true"); + configuration.set("hadoop.security.authentication", "simple"); + ugi = SecurityUtil.loginSimple(configuration); + } + hdfsResources.set(new HdfsResources(configuration, ugi)); + } + + + protected static FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return FileSystem.get(config); + } + }); + } catch (InterruptedException e) { + throw new IOException("Unable to create file system: " + e.getMessage()); + } + } + + static protected class HdfsResources { + private final Configuration configuration; + private final UserGroupInformation userGroupInformation; + + public HdfsResources(Configuration configuration, UserGroupInformation userGroupInformation) { + this.configuration = configuration; + this.userGroupInformation = ugi; + } + + public Configuration getConfiguration() { + return configuration; + } + + public UserGroupInformation getUserGroupInformation() { + return userGroupInformation; + } + } + + protected UserGroupInformation getUserGroupInformation() { + UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation(); + + return userGroupInformation; + } + protected static Configuration getConfiguration(String configFiles) { Configuration conf = DefaultConfiguration.get(); @@ -215,9 +310,4 @@ protected static Configuration getConfiguration(String configFiles) { return conf; } - - @Override - protected List getSupportedPropertyDescriptors() { - return ABSTRACT_KITE_PROPS; - } } diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index 970291659abd..d419e9cef5c4 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -160,18 +160,6 @@ public ValidationResult validate(String subject, String input, ValidationContext .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) .build(); - private static final List PROPERTIES = ImmutableList. builder() - .addAll(AbstractKiteProcessor.getProperties()) - .add(SCHEMA) - .add(CHARSET) - .add(DELIMITER) - .add(QUOTE) - .add(ESCAPE) - .add(HAS_HEADER) - .add(LINES_TO_SKIP) - .add(COMPRESSION_TYPE) - .build(); - private static final Set RELATIONSHIPS = ImmutableSet. builder() .add(SUCCESS) .add(FAILURE) @@ -180,7 +168,16 @@ public ValidationResult validate(String subject, String input, ValidationContext @Override protected List getSupportedPropertyDescriptors() { - return PROPERTIES; + List props = new ArrayList<>(ABSTRACT_KITE_PROPS); + props.add(SCHEMA); + props.add(CHARSET); + props.add(DELIMITER); + props.add(QUOTE); + props.add(ESCAPE); + props.add(HAS_HEADER); + props.add(LINES_TO_SKIP); + props.add(COMPRESSION_TYPE); + return props; } @Override diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index 1127a2de5428..c94e4dda86a7 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -80,13 +80,6 @@ public class ConvertJSONToAvro extends AbstractKiteConvertProcessor { .required(true) .build(); - private static final List PROPERTIES - = ImmutableList.builder() - .addAll(AbstractKiteProcessor.getProperties()) - .add(SCHEMA) - .add(COMPRESSION_TYPE) - .build(); - private static final Set RELATIONSHIPS = ImmutableSet.builder() .add(SUCCESS) @@ -99,7 +92,10 @@ public ConvertJSONToAvro() { @Override protected List getSupportedPropertyDescriptors() { - return PROPERTIES; + List props = new ArrayList<>(ABSTRACT_KITE_PROPS); + props.add(SCHEMA); + props.add(COMPRESSION_TYPE); + return props; } @Override @@ -107,6 +103,11 @@ public Set getRelationships() { return RELATIONSHIPS; } + @OnScheduled + public void OnScheduled(ProcessContext context) throws IOException { + super.setDefaultConfiguration(context); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java index 1986f0bf0ecd..3996c6d8edcf 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java @@ -50,6 +50,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.security.PrivilegedAction; +import org.apache.hadoop.security.UserGroupInformation; + @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"}) @CapabilityDescription("Stores Avro records in a Kite dataset") @@ -79,12 +82,6 @@ public class StoreInKiteDataset extends AbstractKiteProcessor { .required(true) .build(); - private static final List PROPERTIES - = ImmutableList.builder() - .addAll(AbstractKiteProcessor.getProperties()) - .add(KITE_DATASET_URI) - .build(); - private static final Set RELATIONSHIPS = ImmutableSet.builder() .add(SUCCESS) @@ -94,7 +91,9 @@ public class StoreInKiteDataset extends AbstractKiteProcessor { @Override protected List getSupportedPropertyDescriptors() { - return PROPERTIES; + List props = new ArrayList<>(ABSTRACT_KITE_PROPS); + props.add(KITE_DATASET_URI); + return props; } @Override @@ -110,54 +109,63 @@ public void onTrigger(ProcessContext context, final ProcessSession session) return; } - final View target = load(context, flowFile); - final Schema schema = target.getDataset().getDescriptor().getSchema(); - - try { - StopWatch timer = new StopWatch(true); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - try (DataFileStream stream = new DataFileStream<>( - in, AvroUtil.newDatumReader(schema, Record.class))) { - IncompatibleSchemaException.check( - SchemaValidationUtil.canRead(stream.getSchema(), schema), - "Incompatible file schema %s, expected %s", - stream.getSchema(), schema); - - long written = 0L; - try (DatasetWriter writer = target.newWriter()) { - for (Record record : stream) { - writer.write(record); - written += 1; + final UserGroupInformation ugi = getUserGroupInformation(); + + ugi.doAs(new PrivilegedAction() { + @Override + public Object run() { + final View target = load(context, flowFile); + final Schema schema = target.getDataset().getDescriptor().getSchema(); + + try { + StopWatch timer = new StopWatch(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + try (DataFileStream stream = new DataFileStream<>( + in, AvroUtil.newDatumReader(schema, Record.class))) { + IncompatibleSchemaException.check( + SchemaValidationUtil.canRead(stream.getSchema(), schema), + "Incompatible file schema %s, expected %s", + stream.getSchema(), schema); + + long written = 0L; + try (DatasetWriter writer = target.newWriter()) { + for (Record record : stream) { + writer.write(record); + written += 1; + } + } finally { + session.adjustCounter("Stored records", written, + true /* cannot roll back the write */); + } } - } finally { - session.adjustCounter("Stored records", written, - true /* cannot roll back the write */); } - } - } - }); - timer.stop(); + }); + + timer.stop(); - session.getProvenanceReporter().send(flowFile, - target.getUri().toString(), - timer.getDuration(TimeUnit.MILLISECONDS), - true /* cannot roll back the write */); + session.getProvenanceReporter().send(flowFile, + target.getUri().toString(), + timer.getDuration(TimeUnit.MILLISECONDS), + true /* cannot roll back the write */); - session.transfer(flowFile, SUCCESS); + session.transfer(flowFile, SUCCESS); - } catch (ProcessException | DatasetIOException e) { - getLogger().error("Failed to read FlowFile", e); - session.transfer(flowFile, FAILURE); + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(flowFile, FAILURE); - } catch (ValidationException e) { - getLogger().error(e.getMessage()); - getLogger().debug("Incompatible schema error", e); - session.transfer(flowFile, INCOMPATIBLE); - } + } catch (ValidationException e) { + getLogger().error(e.getMessage()); + getLogger().debug("Incompatible schema error", e); + session.transfer(flowFile, INCOMPATIBLE); + } + return null; + } + }); } - + private View load(ProcessContext context, FlowFile file) { String uri = context.getProperty(KITE_DATASET_URI) .evaluateAttributeExpressions(file)