From 925ea04c26b4eb59bcf06e3ea2ab2bb4d55b7b6d Mon Sep 17 00:00:00 2001 From: "puspendu.banerjee@gmail.com" Date: Wed, 17 Feb 2016 01:33:29 -0600 Subject: [PATCH 1/2] Enhancement for NIFI-1045 : Add "backup suffix" Conflict Resolution Strategy to PutHDFS / PutFile --- .../nifi/processors/hadoop/PutHDFS.java | 20 ++++++++++++- .../nifi/processors/hadoop/PutHDFSTest.java | 29 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index f7f4d03aecd5..40b6668b96bb 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -71,6 +71,7 @@ public class PutHDFS extends AbstractHadoopProcessor { public static final String REPLACE_RESOLUTION = "replace"; public static final String IGNORE_RESOLUTION = "ignore"; public static final String FAIL_RESOLUTION = "fail"; + public static final String BACKUP_SUFFIX_RESOLUTION = "backupWithSuffix"; public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; public static final int BUFFER_SIZE_DEFAULT = 4096; @@ -101,8 +102,16 @@ public class PutHDFS extends AbstractHadoopProcessor { .description("Indicates what should happen when a file with the same name already exists in the output directory") .required(true) .defaultValue(FAIL_RESOLUTION) - .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION) + .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION, BACKUP_SUFFIX_RESOLUTION) .build(); + public static final PropertyDescriptor BACKUP_SUFFIX = new PropertyDescriptor.Builder() + .name("Backup With Suffix") + .description("Which Suffix to use for Backup, when a file with the same name already exists in the output directory" + + " and Backup_Suffix conflict resolution strategy has been selected.") + .expressionLanguageSupported(true) + .defaultValue("${now:toDate(\"yyyy-MM-ddTHH-mm-ss.SSS'Z'\")}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder() .name("Block Size") @@ -162,6 +171,7 @@ public class PutHDFS extends AbstractHadoopProcessor { props.add(REMOTE_OWNER); props.add(REMOTE_GROUP); props.add(COMPRESSION_CODEC); + props.add(BACKUP_SUFFIX); localProperties = Collections.unmodifiableList(props); } @@ -263,6 +273,14 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{flowFile}); return; + case BACKUP_SUFFIX_RESOLUTION: + final String suffix=context.getProperty(BACKUP_SUFFIX).evaluateAttributeExpressions(flowFile).getValue(); + final Path dst=copyFile.suffix(suffix); + if (hdfs.rename(copyFile, dst)) { + getLogger().info("moved {} to {} in order to keep backup", + new Object[]{copyFile, dst}); + } + break; default: break; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index c524a44f0920..e045eea4cdb5 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -196,6 +196,35 @@ public void testPutFile() throws IOException { assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); } + @Test + public void testPutFileWithBackupSuffix() throws IOException { + // Refer to comment in the BeforeClass method for an explanation + assumeTrue(isNotWindows()); + + TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); + runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.BACKUP_SUFFIX_RESOLUTION); + runner.setValidateExpressionUsage(false); + runner.setProperty(PutHDFS.BACKUP_SUFFIX, "2016-03-16T08-11-05"); + for(int i=0;i<2;i++){ + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { + Map attributes = new HashMap(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + } + + Configuration config = new Configuration(); + FileSystem fs = FileSystem.get(config); + + List failedFlowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + assertTrue(failedFlowFiles.isEmpty()); + + assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); + } + @Test public void testPutFileWithException() throws IOException { // Refer to comment in the BeforeClass method for an explanation From 11322b37571a250cb2966f40fb14c9ddb4656da8 Mon Sep 17 00:00:00 2001 From: "puspendu.banerjee@gmail.com" Date: Thu, 18 Feb 2016 16:24:26 -0600 Subject: [PATCH 2/2] Added Best-Effort conflict resolution mechanism & modified to incorporated comments to rename new file being put insteadof existing one. --- .../nifi/processors/hadoop/PutHDFS.java | 84 +++++++++++-------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 40b6668b96bb..fdc9dc221895 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -232,14 +232,14 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final CompressionCodec codec = getCompressionCodec(context, configuration); - final String filename = codec != null + String filename = codec != null ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() : flowFile.getAttribute(CoreAttributes.FILENAME.key()); Path tempDotCopyFile = null; try { - final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename); - final Path copyFile = new Path(configuredRootDirPath, filename); + Path tempCopyFile = new Path(configuredRootDirPath, "." + filename); + Path copyFile = new Path(configuredRootDirPath, filename); // Create destination directory if it does not exist try { @@ -254,38 +254,54 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } // If destination file already exists, resolve that based on processor configuration - if (hdfs.exists(copyFile)) { - switch (conflictResponse) { - case REPLACE_RESOLUTION: - if (hdfs.delete(copyFile, false)) { - getLogger().info("deleted {} in order to replace with the contents of {}", - new Object[]{copyFile, flowFile}); - } - break; - case IGNORE_RESOLUTION: - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("transferring {} to success because file with same name already exists", - new Object[]{flowFile}); - return; - case FAIL_RESOLUTION: - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - getLogger().warn("penalizing {} and routing to failure because file with same name already exists", - new Object[]{flowFile}); - return; - case BACKUP_SUFFIX_RESOLUTION: - final String suffix=context.getProperty(BACKUP_SUFFIX).evaluateAttributeExpressions(flowFile).getValue(); - final Path dst=copyFile.suffix(suffix); - if (hdfs.rename(copyFile, dst)) { - getLogger().info("moved {} to {} in order to keep backup", - new Object[]{copyFile, dst}); - } - break; - default: - break; + boolean redoConflictCheck = false; + //If we require to re-do conflict check , do it no more that 10[just an assumed value] times. + for(int i=0;redoConflictCheck && i<10;i++){ + if (hdfs.exists(copyFile)) { + switch (conflictResponse) { + case REPLACE_RESOLUTION: + if (hdfs.delete(copyFile, false)) { + getLogger().info("deleted {} in order to replace with the contents of {}", + new Object[]{copyFile, flowFile}); + } + break; + case IGNORE_RESOLUTION: + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("transferring {} to success because file with same name already exists", + new Object[]{flowFile}); + return; + case FAIL_RESOLUTION: + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + getLogger().warn("penalizing {} and routing to failure because file with same name already exists", + new Object[]{flowFile}); + return; + case BACKUP_SUFFIX_RESOLUTION: + final String suffix=context.getProperty(BACKUP_SUFFIX).evaluateAttributeExpressions(flowFile).getValue(); + int extOffset=filename.lastIndexOf('.'); + //File Extension Should be intact, so manipulate filename + filename=new StringBuilder(filename).insert(extOffset<0?filename.length():extOffset, suffix).toString(); + Path updatedCopyFilePath=new Path(configuredRootDirPath, filename); + getLogger().info("Resolving conflict by renaming {} to {}{} in order to keep backup", + new Object[]{copyFile, updatedCopyFilePath}); + tempCopyFile = new Path(configuredRootDirPath, "." + filename); + copyFile = updatedCopyFilePath; + //Still Conflict may happen while writing with new filename. So we will re-do conflict check + redoConflictCheck=true; + break; + default: + break; + } + }else{ + redoConflictCheck=false; } } + //Check if we still require a conflict check, means conflict has not been resolved even with best effort. + if(redoConflictCheck){ + throw new ProcessException("Conflict Could not be resolved"); + } + final Path finalTempCopyFile=tempCopyFile; // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); session.read(flowFile, new InputStreamCallback() { @@ -295,11 +311,11 @@ public void process(InputStream in) throws IOException { OutputStream fos = null; Path createdFile = null; try { - fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); + fos = hdfs.create(finalTempCopyFile, true, bufferSize, replication, blockSize); if (codec != null) { fos = codec.createOutputStream(fos); } - createdFile = tempCopyFile; + createdFile = finalTempCopyFile; BufferedInputStream bis = new BufferedInputStream(in); StreamUtils.copy(bis, fos); bis = null;