@@ -31,6 +31,7 @@
import org.apache.nifi.annotation.documentation.SeeAlso ;
import org.apache.nifi.annotation.documentation.Tags ;
import org.apache.nifi.annotation.lifecycle.OnScheduled ;
import org.apache.nifi.components.AllowableValue ;
import org.apache.nifi.components.PropertyDescriptor ;
import org.apache.nifi.components.PropertyValue ;
import org.apache.nifi.components.ValidationContext ;
@@ -74,9 +75,15 @@
@SeeAlso (GetHDFS . class)
public class PutHDFS extends AbstractHadoopProcessor {
public static final String REPLACE_RESOLUTION = " replace" ;
public static final String IGNORE_RESOLUTION = " ignore" ;
public static final String FAIL_RESOLUTION = " fail" ;
public static final String REPLACE = " replace" ;
public static final String IGNORE = " ignore" ;
public static final String FAIL = " fail" ;
public static final String APPEND = " append" ;
public static final AllowableValue REPLACE_RESOLUTION = new AllowableValue (REPLACE , REPLACE , " Replaces the existing file if any." );
public static final AllowableValue IGNORE_RESOLUTION = new AllowableValue (IGNORE , IGNORE , " Ignores the flow file and routes it to success." );
public static final AllowableValue FAIL_RESOLUTION = new AllowableValue (FAIL , FAIL , " Penalizes the flow file and routes it to failure." );
public static final AllowableValue APPEND_RESOLUTION = new AllowableValue (APPEND , APPEND , " Appends to the existing file if any, creates a new file otherwise." );
public static final String BUFFER_SIZE_KEY = " io.file.buffer.size" ;
public static final int BUFFER_SIZE_DEFAULT = 4096 ;
@@ -101,8 +108,8 @@
.name(" Conflict Resolution Strategy" )
.description(" Indicates what should happen when a file with the same name already exists in the output directory" )
.required(true )
.defaultValue(FAIL_RESOLUTION )
.allowableValues(REPLACE_RESOLUTION , IGNORE_RESOLUTION , FAIL_RESOLUTION )
.defaultValue(FAIL_RESOLUTION . getValue() )
.allowableValues(REPLACE_RESOLUTION , IGNORE_RESOLUTION , FAIL_RESOLUTION , APPEND_RESOLUTION )
.build();
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor .Builder ()
@@ -246,21 +253,23 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
changeOwner(context, hdfs, configuredRootDirPath);
}
final boolean destinationExists = hdfs. exists(copyFile);
// If destination file already exists, resolve that based on processor configuration
if (hdfs . exists(copyFile) ) {
if (destinationExists ) {
switch (conflictResponse) {
case REPLACE_RESOLUTION :
case REPLACE :
if (hdfs. delete(copyFile, false )) {
getLogger(). info(" deleted {} in order to replace with the contents of {}" ,
new Object []{copyFile, flowFile});
}
break ;
case IGNORE_RESOLUTION :
case IGNORE :
session. transfer(flowFile, REL_SUCCESS );
getLogger(). info(" transferring {} to success because file with same name already exists" ,
new Object []{flowFile});
return ;
case FAIL_RESOLUTION :
case FAIL :
flowFile = session. penalize(flowFile);
session. transfer(flowFile, REL_FAILURE );
getLogger(). warn(" penalizing {} and routing to failure because file with same name already exists" ,
@@ -280,7 +289,11 @@ public void process(InputStream in) throws IOException {
OutputStream fos = null ;
Path createdFile = null ;
try {
fos = hdfs. create(tempCopyFile, true , bufferSize, replication, blockSize);
if (conflictResponse. equals(APPEND_RESOLUTION . getValue()) && destinationExists) {
fos = hdfs. append(copyFile, bufferSize);
} else {
fos = hdfs. create(tempCopyFile, true , bufferSize, replication, blockSize);
}
if (codec != null ) {
fos = codec. createOutputStream(fos);
}
@@ -315,21 +328,24 @@ public void process(InputStream in) throws IOException {
final long millis = stopWatch. getDuration(TimeUnit . MILLISECONDS );
tempDotCopyFile = tempCopyFile;
boolean renamed = false ;
for (int i = 0 ; i < 10 ; i++ ) { // try to rename multiple times.
if (hdfs. rename(tempCopyFile, copyFile)) {
renamed = true ;
break ;// rename was successful
if (! conflictResponse. equals(APPEND_RESOLUTION . getValue())
|| (conflictResponse. equals(APPEND_RESOLUTION . getValue()) && ! destinationExists)) {
boolean renamed = false ;
for (int i = 0 ; i < 10 ; i++ ) { // try to rename multiple times.
if (hdfs. rename(tempCopyFile, copyFile)) {
renamed = true ;
break ;// rename was successful
}
Thread . sleep(200L );// try waiting to let whatever might cause rename failure to resolve
}
if (! renamed) {
hdfs. delete(tempCopyFile, false );
throw new ProcessException (" Copied file to HDFS but could not rename dot file " + tempCopyFile
+ " to its final filename" );
}
Thread . sleep(200L );// try waiting to let whatever might cause rename failure to resolve
}
if (! renamed) {
hdfs. delete(tempCopyFile, false );
throw new ProcessException (" Copied file to HDFS but could not rename dot file " + tempCopyFile
+ " to its final filename" );
}
changeOwner(context, hdfs, copyFile);
changeOwner(context, hdfs, copyFile);
}
getLogger(). info(" copied {} to HDFS at {} in {} milliseconds at a rate of {}" ,
new Object []{flowFile, copyFile, millis, dataRate});