Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-4747 - Removed directory existence check in GetHDFS #2391

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,6 @@ public void onScheduled(ProcessContext context) throws IOException {
abstractOnScheduled(context);
// copy configuration values to pass them around cleanly
processorConfig = new ProcessorConfiguration(context);
final FileSystem fs = getFileSystem();
final Path dir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
if (!fs.exists(dir)) {
throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
}

// forget the state of the queue in case HDFS contents changed while this processor was turned off
queueLock.lock();
Expand Down Expand Up @@ -422,8 +417,16 @@ protected Set<Path> performListing(final ProcessContext context) throws IOExcept
if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
try {
final FileSystem hdfs = getFileSystem();
// get listing
listing = selectFiles(hdfs, new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()), null);
final Path directoryPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());

if (!hdfs.exists(directoryPath)) {
context.yield();
getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
} else {
// get listing
listing = selectFiles(hdfs, directoryPath, null);
}

lastPollTime.set(System.currentTimeMillis());
} finally {
listingLock.unlock();
Expand All @@ -447,10 +450,6 @@ protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path>
filesVisited = new HashSet<>();
}

if (!hdfs.exists(dir)) {
throw new IOException("Selection directory " + dir.toString() + " doesn't appear to exist!");
}

final Set<Path> files = new HashSet<>();

FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(dir));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ public void testGetFilesWithFilter() {
}
}

@Test
public void testDirectoryDoesNotExist() {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "does/not/exist/${now():format('yyyyMMdd')}");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(0, flowFiles.size());
}

@Test
public void testAutomaticDecompression() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
Expand Down