Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
Expand All @@ -41,6 +42,7 @@
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -141,63 +143,70 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}

// We need a FlowFile to report provenance correctly.
FlowFile flowFile = originalFlowFile != null ? originalFlowFile : session.create();
final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();

final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(finalFlowFile).getValue();

final FileSystem fileSystem = getFileSystem();
try {
// Check if the user has supplied a file or directory pattern
List<Path> pathList = Lists.newArrayList();
if (GLOB_MATCHER.reset(fileOrDirectoryName).find()) {
FileStatus[] fileStatuses = fileSystem.globStatus(new Path(fileOrDirectoryName));
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
pathList.add(fileStatus.getPath());
final UserGroupInformation ugi = getUserGroupInformation();

ugi.doAs((PrivilegedAction<Object>)() -> {
FlowFile flowFile = finalFlowFile;
try {
// Check if the user has supplied a file or directory pattern
List<Path> pathList = Lists.newArrayList();
if (GLOB_MATCHER.reset(fileOrDirectoryName).find()) {
FileStatus[] fileStatuses = fileSystem.globStatus(new Path(fileOrDirectoryName));
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
pathList.add(fileStatus.getPath());
}
}
} else {
pathList.add(new Path(fileOrDirectoryName));
}
} else {
pathList.add(new Path(fileOrDirectoryName));
}

int failedPath = 0;
for (Path path : pathList) {
if (fileSystem.exists(path)) {
try {
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
attributes.put("hdfs.filename", path.getName());
attributes.put("hdfs.path", path.getParent().toString());
flowFile = session.putAllAttributes(flowFile, attributes);

fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);

Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put("hdfs.error.message", ioe.getMessage());

session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
failedPath++;
int failedPath = 0;
for (Path path : pathList) {
if (fileSystem.exists(path)) {
try {
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
attributes.put("hdfs.filename", path.getName());
attributes.put("hdfs.path", path.getParent().toString());
flowFile = session.putAllAttributes(flowFile, attributes);

fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);

Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put("hdfs.error.message", ioe.getMessage());

session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
failedPath++;
}
}
}
}

if (failedPath == 0) {
session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
} else {
// If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
session.remove(flowFile);
if (failedPath == 0) {
session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
} else {
// If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
session.remove(flowFile);
}
} catch (IOException e) {
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
}
} catch (IOException e) {
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
}

return null;
});

}
}