Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use regex pattern when matching files #18

Merged
merged 1 commit into from Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -211,7 +211,7 @@ public String getFormatName() {
@Nullable
@Override
public Pattern getFilePattern() {
return null;
return Strings.isNullOrEmpty(fileRegex) ? null : Pattern.compile(fileRegex);
}

@Override
Expand Down
48 changes: 46 additions & 2 deletions src/test/java/io/cdap/plugin/batch/ETLFTPTestRun.java
Expand Up @@ -57,22 +57,27 @@ public class ETLFTPTestRun extends ETLBatchTestBase {
private static final String USER = "ftp";
private static final String PWD = "abcd";
private static final String TEST_STRING = "Hello World";
private static final String TEST_STRING_2 = "Goodnight Moon";
private static File folder;
private static File file2;
private static int port;
private static FakeFtpServer ftpServer;
private static final String PATH = "path";
private static final String IGNORE_NON_EXISTING_FOLDERS = "ignoreNonExistingFolders";
private static final String FILE_REGEX = "fileRegex";

@Before
public void init() throws Exception {
folder = TMP_FOLDER.newFolder();
File file = new File(folder, "sample");
file2 = new File(folder, "sample2");

ftpServer = new FakeFtpServer();
ftpServer.setServerControlPort(0);

FileSystem fileSystem = new UnixFakeFileSystem();
fileSystem.add(new FileEntry(file.getAbsolutePath(), TEST_STRING));
fileSystem.add(new FileEntry(file2.getAbsolutePath(), TEST_STRING_2));
ftpServer.setFileSystem(fileSystem);

ftpServer.addUserAccount(new UserAccount(USER, PWD, folder.getAbsolutePath()));
Expand Down Expand Up @@ -122,12 +127,51 @@ public void testFTPBatchSource() throws Exception {
DataSetManager<Table> outputManager = getDataset(outputDatasetName);
List<StructuredRecord> output = MockSink.readOutput(outputManager);

Assert.assertEquals("Expected records", 1, output.size());
Assert.assertEquals("Expected records", 2, output.size());
Set<String> outputValue = new HashSet<>();
for (StructuredRecord record : output) {
outputValue.add(record.get("body"));
}
Assert.assertTrue(outputValue.contains("Hello World"));
Assert.assertTrue(outputValue.contains(TEST_STRING));
Assert.assertTrue(outputValue.contains(TEST_STRING_2));
}

@Test
public void testFTPBatchSourceWithRegex() throws Exception {
ETLStage source = new ETLStage("source", new ETLPlugin(
"FTP",
BatchSource.PLUGIN_TYPE,
ImmutableMap.<String, String>builder()
.put(PATH, String.format("ftp://%s:%s@localhost:%d%s",
USER, PWD, port, folder.getAbsolutePath()))
.put(IGNORE_NON_EXISTING_FOLDERS, "false")
.put(FILE_REGEX, file2.getAbsolutePath())
.put("referenceName", "ftp")
.build(),
null));

String outputDatasetName = "testing-ftp-source-with-regex";
ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputDatasetName));

ETLBatchConfig etlConfig = ETLBatchConfig.builder()
.addStage(source)
.addStage(sink)
.addConnection(source.getName(), sink.getName())
.build();

AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlConfig);
ApplicationId appId = NamespaceId.DEFAULT.app("FTPBatchSource");

ApplicationManager appManager = deployApplication(appId, appRequest);

WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
workflowManager.startAndWaitForRun(ProgramRunStatus.COMPLETED, 30, TimeUnit.SECONDS);

DataSetManager<Table> outputManager = getDataset(outputDatasetName);
List<StructuredRecord> output = MockSink.readOutput(outputManager);

Assert.assertEquals("Expected records", 1, output.size());
Assert.assertEquals("Single file",TEST_STRING_2, output.get(0).get("body"));
}

private void testHelper(Map<String, String> properties, Map<String, String> runTimeProperties) throws Exception {
Expand Down