diff --git a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java index b1ea0e1b5479..f3b2025b5b86 100644 --- a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java +++ b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java @@ -52,6 +52,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.smb.util.HostnameAndShareFlowFileFilter; +import org.apache.nifi.util.StringUtils; import java.io.OutputStream; import java.net.URI; @@ -81,6 +82,9 @@ @SeeAlso({GetSmbFile.class, ListSmb.class, FetchSmb.class}) @ReadsAttributes({@ReadsAttribute(attribute = "filename", description = "The filename to use when writing the FlowFile to the network folder.")}) public class PutSmbFile extends AbstractProcessor { + + public static final char PATH_SEPARATOR = '/'; + public static final String SHARE_ACCESS_NONE = "none"; public static final String SHARE_ACCESS_READ = "read"; public static final String SHARE_ACCESS_READDELETE = "read, delete"; @@ -170,7 +174,7 @@ public class PutSmbFile extends AbstractProcessor { .build(); public static final PropertyDescriptor RENAME_SUFFIX = new PropertyDescriptor.Builder() .name("Temporary Suffix") - .description("A temporary suffix which will be apended to the filename while it's transfering. After the transfer is complete, the suffix will be removed.") + .description("A temporary suffix that is appended to the filename while it is being transferred. After the transfer is complete, the suffix will be removed.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -270,19 +274,16 @@ SMBClient initSmbClient(final ProcessContext context) { } private void createMissingDirectoriesRecursively(ComponentLog logger, DiskShare share, String pathToCreate) { - List paths = new ArrayList<>(); + int index = 0; - java.io.File file = new java.io.File(pathToCreate); - paths.add(file.getPath()); + while (index < pathToCreate.length()) { + index = pathToCreate.indexOf(PATH_SEPARATOR, index); - while (file.getParent() != null) { - String parent = file.getParent(); - paths.add(parent); - file = new java.io.File(parent); - } + if (index == -1) { + index = pathToCreate.length(); + } - Collections.reverse(paths); - for (String path : paths) { + String path = pathToCreate.substring(0, index++); if (!share.folderExists(path)) { logger.debug("Creating folder {}", path); share.mkdir(path); @@ -292,6 +293,16 @@ private void createMissingDirectoriesRecursively(ComponentLog logger, DiskShare } } + String normalizePath(String path) { + if (path == null) { + return null; + } + + return path.replace('\\', PATH_SEPARATOR) + .replaceAll("/+", "/") + .replaceAll("^/|/$", ""); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); @@ -328,34 +339,37 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try { final long processingStartTime = System.nanoTime(); - final String destinationDirectory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - final String destinationFilename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - String destinationFullPath; + final String destinationDirectory = normalizePath(directory); + + final String destinationFullPath; // build destination path for the flowfile - if (destinationDirectory == null || destinationDirectory.isBlank()) { - destinationFullPath = destinationFilename; + if (StringUtils.isBlank(destinationDirectory)) { + destinationFullPath = filename; } else { - destinationFullPath = new java.io.File(destinationDirectory, destinationFilename).getPath(); + destinationFullPath = String.format("%s%c%s", destinationDirectory, PATH_SEPARATOR, filename); } // handle missing directory - final String destinationFileParentDirectory = new java.io.File(destinationFullPath).getParent(); final Boolean createMissingDirectories = context.getProperty(CREATE_DIRS).asBoolean(); - if (!createMissingDirectories && !share.folderExists(destinationFileParentDirectory)) { - logger.warn("Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist", flowFile, destinationFileParentDirectory); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - continue; - } else if (!share.folderExists(destinationFileParentDirectory)) { - try { - createMissingDirectoriesRecursively(logger, share, destinationFileParentDirectory); - } catch (Exception e) { - logger.error("Penalizing {} and routing to failure because failed to create missing destination directories ({})", flowFile, destinationFileParentDirectory, e); + if (StringUtils.isNotBlank(destinationDirectory) && !share.folderExists(destinationDirectory)) { + if (!createMissingDirectories) { + logger.warn("Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist", flowFile, destinationDirectory); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); continue; + } else { + try { + createMissingDirectoriesRecursively(logger, share, destinationDirectory); + } catch (Exception e) { + logger.error("Penalizing {} and routing to failure because failed to create missing destination directories ({})", flowFile, destinationDirectory, e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + continue; + } } } @@ -376,16 +390,18 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // handle temporary suffix final String renameSuffixValue = context.getProperty(RENAME_SUFFIX).getValue(); - final boolean renameSuffix = renameSuffixValue != null && !renameSuffixValue.isBlank(); - StringBuilder finalDestinationFullPath = new StringBuilder(destinationFullPath); + final boolean renameSuffix = StringUtils.isNotBlank(renameSuffixValue); + final String transferDestinationFullPath; if (renameSuffix) { - finalDestinationFullPath.append(renameSuffixValue); + transferDestinationFullPath = destinationFullPath + renameSuffixValue; + } else { + transferDestinationFullPath = destinationFullPath; } // handle the transfer try ( File shareDestinationFile = share.openFile( - finalDestinationFullPath.toString(), + transferDestinationFullPath, EnumSet.of(AccessMask.GENERIC_WRITE), EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL), sharedAccess, @@ -403,16 +419,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // handle the rename if (renameSuffix) { try (DiskEntry fileDiskEntry = share.open( - finalDestinationFullPath.toString(), + transferDestinationFullPath, EnumSet.of(AccessMask.DELETE, AccessMask.GENERIC_WRITE), EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL), sharedAccess, SMB2CreateDisposition.FILE_OPEN, EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) { - // normalize path slashes for the network share - destinationFullPath = destinationFullPath.replace("/", "\\"); - // rename the file on the share and replace it in case it exists fileDiskEntry.rename(destinationFullPath, true); } catch (Exception e) { diff --git a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java index d73eacbfa449..92e45a3a2eea 100644 --- a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java +++ b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java @@ -49,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anySet; @@ -75,7 +76,7 @@ public class PutSmbFileTest { private static final String HOSTNAME = "smbhostname"; private static final String SHARE = "smbshare"; - private static final String DIRECTORY = "smbdirectory\\subdir"; + private static final String DIRECTORY = "smbdirectory"; private static final String DOMAIN = "mydomain"; private static final String USERNAME = "myusername"; private static final String PASSWORD = "mypassword"; @@ -129,13 +130,15 @@ private void setupSmbProcessor() throws IOException { } private void testDirectoryCreation(String dirFlag, int times) throws IOException { - when(diskShare.folderExists(DIRECTORY)).thenReturn(false); + when(diskShare.folderExists(any())).thenReturn(false); + testRunner.setProperty(PutSmbFile.DIRECTORY, "smbdirectory/subdir"); testRunner.setProperty(PutSmbFile.CREATE_DIRS, dirFlag); testRunner.enqueue("data"); testRunner.run(); - verify(diskShare, times(times)).mkdir(DIRECTORY); + verify(diskShare, times(times)).mkdir("smbdirectory"); + verify(diskShare, times(times)).mkdir("smbdirectory/subdir"); } private Set testOpenFileShareAccess() throws IOException { @@ -289,25 +292,28 @@ public void testAnonymousAuth() throws IOException { @Test public void testDirExistsWithoutCreate() throws IOException { testDirectoryCreation("false", 0); + + testRunner.assertAllFlowFilesTransferred(PutSmbFile.REL_FAILURE); } @Test public void testDirExistsWithCreate() throws IOException { testDirectoryCreation("true", 1); + + testRunner.assertAllFlowFilesTransferred(PutSmbFile.REL_SUCCESS); } @Test public void testDirectoriesCreatedWhenDontExists() throws IOException { - final String directory = new java.io.File("a\\b\\c\\b\\e").getPath(); - final int count = directory.split(java.util.regex.Pattern.quote(java.io.File.separator)).length; - when(diskShare.folderExists(DIRECTORY)).thenReturn(false); + final String directory = "a\\b/c/b\\e"; + when(diskShare.folderExists(any())).thenReturn(false); testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true"); testRunner.setProperty(PutSmbFile.DIRECTORY, directory); testRunner.enqueue("data"); testRunner.run(); - verify(diskShare, times(count)).mkdir( + verify(diskShare, times(5)).mkdir( any(String.class) ); } @@ -443,7 +449,7 @@ public void testTemporarySuffixIsSetRenameIsCalled() throws IOException { assertTrue(initialFilename.getValue().endsWith(suffix), "Suffix is not present and it should be"); assertTrue(!finalFilename.getValue().endsWith(suffix), "Suffix is present and it shouldn't be"); - assertTrue(replace.getValue(), "Replace flag shold be true"); + assertTrue(replace.getValue(), "Replace flag should be true"); } @Test @@ -458,4 +464,23 @@ public void testConnectionError() throws IOException { testRunner.assertAllFlowFilesTransferred(PutSmbFile.REL_FAILURE, 3); } + + @Test + void testNormalizePath() { + PutSmbFile processor = new PutSmbFile(); + + assertNull(processor.normalizePath(null)); + + assertEquals("", processor.normalizePath("/")); + assertEquals("", processor.normalizePath("\\")); + + assertEquals("d1/d2", processor.normalizePath("d1/d2")); + assertEquals("d1/d2", processor.normalizePath("/d1/d2/")); + assertEquals("d1/d2", processor.normalizePath("//d1//d2//")); + + assertEquals("d1/d2", processor.normalizePath("d1\\d2")); + assertEquals("d1/d2", processor.normalizePath("\\d1\\d2\\")); + assertEquals("d1/d2", processor.normalizePath("\\\\d1\\\\d2\\\\")); + } + }