Skip to content

Commit

Permalink
NIFI-12772 Expose REMOTE_POLL_BATCH_SIZE property for ListSFTP
Browse files Browse the repository at this point in the history
  • Loading branch information
tombrisland committed Feb 9, 2024
1 parent e93fb17 commit 826b907
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -124,13 +123,7 @@ protected List<FileInfo> performListing(final ProcessContext context, final Long
return listing;
}

final Iterator<FileInfo> itr = listing.iterator();
while (itr.hasNext()) {
final FileInfo next = itr.next();
if (next.getLastModifiedTime() < minTimestamp) {
itr.remove();
}
}
listing.removeIf(file -> file.getLastModifiedTime() < minTimestamp);

return listing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
properties.add(SFTPTransfer.FILE_FILTER_REGEX);
properties.add(SFTPTransfer.PATH_FILTER_REGEX);
properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE);
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.HOST_KEY_FILE);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
Expand Down Expand Up @@ -176,7 +177,7 @@ private Predicate<FileInfo> createFileFilter(final ProcessContext context) {
final Long maxAge = context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);

return (attributes) -> {
if(attributes.isDirectory()) {
if (attributes.isDirectory()) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.standard;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
Expand All @@ -45,6 +35,16 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestListSFTP {
Expand Down Expand Up @@ -94,7 +94,7 @@ public void testRunFileFound() {
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute( "filename");
runner.assertAllFlowFilesContainAttribute("filename");

final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
Expand Down Expand Up @@ -178,6 +178,19 @@ public void testRunFileNotFoundMinSizeFiltered() {
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
}

@Test
public void testRemotePollBatchSizeEnforced() {
runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "1");

runner.run();
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);

runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");

runner.run();
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 2);
}

@Test
public void testVerificationSuccessful() {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
Expand Down

0 comments on commit 826b907

Please sign in to comment.