From 38e8de070ef4ae263983e6556e3f8775aa8c6746 Mon Sep 17 00:00:00 2001 From: Davide Date: Thu, 13 Jul 2017 16:03:54 +0200 Subject: [PATCH 1/3] I needed to put some attributes on REMOTE_GROUP and REMOTE_OWNER, in order to achieve it i put expressionLanguageSupported(true) on the PropertyDescriptor of REMOTE_GROUP and REMOTE_OWNER Signed-off-by: Davide --- .../main/java/org/apache/nifi/processors/hadoop/PutHDFS.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 69b5b77da519..f97394d2673f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -145,6 +145,7 @@ public class PutHDFS extends AbstractHadoopProcessor { .description( "Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder() @@ -152,6 +153,7 @@ public class PutHDFS extends AbstractHadoopProcessor { .description( "Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); private static final Set relationships; From 87eef242414e7cff4fb9c9ba1d3c18ea11d6bc09 Mon Sep 17 00:00:00 2001 From: Davide Date: Thu, 20 Jul 2017 12:04:25 +0200 Subject: [PATCH 2/3] I needed to put some attributes on REMOTE_GROUP and REMOTE_OWNER, in order to achieve it i put expressionLanguageSupported(true) on the PropertyDescriptor of REMOTE_GROUP and REMOTE_OWNER Signed-off-by: Davide --- .../org/apache/nifi/processors/hadoop/PutHDFS.java | 13 ++++++++----- .../apache/nifi/processors/hadoop/PutHDFSTest.java | 3 ++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index f97394d2673f..0a632107ee8e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -261,7 +261,7 @@ public Object run() { if (!hdfs.mkdirs(configuredRootDirPath)) { throw new IOException(configuredRootDirPath.toString() + " could not be created"); } - changeOwner(context, hdfs, configuredRootDirPath); + changeOwner(context, hdfs, configuredRootDirPath,flowFile); } final boolean destinationExists = hdfs.exists(copyFile); @@ -354,7 +354,7 @@ public void process(InputStream in) throws IOException { + " to its final filename"); } - changeOwner(context, hdfs, copyFile); + changeOwner(context, hdfs, copyFile,flowFile); } getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", @@ -388,11 +388,14 @@ public void process(InputStream in) throws IOException { }); } - protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) { + protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { try { // Change owner and group of file if configured to do so - String owner = context.getProperty(REMOTE_OWNER).getValue(); - String group = context.getProperty(REMOTE_GROUP).getValue(); +// String owner = context.getProperty(REMOTE_OWNER).getValue(); +// String group = context.getProperty(REMOTE_GROUP).getValue(); + String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue(); + String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue(); + if (owner != null || group != null) { hdfs.setOwner(name, owner, group); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 482ba6971801..2d3ad791b45d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; @@ -268,7 +269,7 @@ public void testPutFileWithException() throws IOException { final KerberosProperties testKerberosProperties = kerberosProperties; TestRunner runner = TestRunners.newTestRunner(new PutHDFS() { @Override - protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name) { + protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name, FlowFile flowFile) { throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling"); } From dd2219b6e2f42c002461602d0e7867ca3a66bdf4 Mon Sep 17 00:00:00 2001 From: Davide Date: Fri, 21 Jul 2017 09:08:46 +0200 Subject: [PATCH 3/3] I needed to put some attributes on REMOTE_GROUP and REMOTE_OWNER, in order to achieve it i put expressionLanguageSupported(true) on the PropertyDescriptor of REMOTE_GROUP and REMOTE_OWNER Signed-off-by: Davide --- .../java/org/apache/nifi/processors/hadoop/PutHDFS.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 0a632107ee8e..41ddf597d7f7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -261,7 +261,7 @@ public Object run() { if (!hdfs.mkdirs(configuredRootDirPath)) { throw new IOException(configuredRootDirPath.toString() + " could not be created"); } - changeOwner(context, hdfs, configuredRootDirPath,flowFile); + changeOwner(context, hdfs, configuredRootDirPath, flowFile); } final boolean destinationExists = hdfs.exists(copyFile); @@ -354,7 +354,7 @@ public void process(InputStream in) throws IOException { + " to its final filename"); } - changeOwner(context, hdfs, copyFile,flowFile); + changeOwner(context, hdfs, copyFile, flowFile); } getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", @@ -391,11 +391,12 @@ public void process(InputStream in) throws IOException { protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { try { // Change owner and group of file if configured to do so -// String owner = context.getProperty(REMOTE_OWNER).getValue(); -// String group = context.getProperty(REMOTE_GROUP).getValue(); String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue(); String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue(); + owner = owner == null || owner.isEmpty() ? null : owner; + group = group == null || group.isEmpty() ? null : group; + if (owner != null || group != null) { hdfs.setOwner(name, owner, group); }