Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.smb.util.HostnameAndShareFlowFileFilter;
import org.apache.nifi.util.StringUtils;

import java.io.OutputStream;
import java.net.URI;
Expand Down Expand Up @@ -81,6 +82,9 @@
@SeeAlso({GetSmbFile.class, ListSmb.class, FetchSmb.class})
@ReadsAttributes({@ReadsAttribute(attribute = "filename", description = "The filename to use when writing the FlowFile to the network folder.")})
public class PutSmbFile extends AbstractProcessor {

public static final char PATH_SEPARATOR = '/';

public static final String SHARE_ACCESS_NONE = "none";
public static final String SHARE_ACCESS_READ = "read";
public static final String SHARE_ACCESS_READDELETE = "read, delete";
Expand Down Expand Up @@ -170,7 +174,7 @@ public class PutSmbFile extends AbstractProcessor {
.build();
public static final PropertyDescriptor RENAME_SUFFIX = new PropertyDescriptor.Builder()
.name("Temporary Suffix")
.description("A temporary suffix which will be apended to the filename while it's transfering. After the transfer is complete, the suffix will be removed.")
.description("A temporary suffix that is appended to the filename while it is being transferred. After the transfer is complete, the suffix will be removed.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
Expand Down Expand Up @@ -270,19 +274,16 @@ SMBClient initSmbClient(final ProcessContext context) {
}

private void createMissingDirectoriesRecursively(ComponentLog logger, DiskShare share, String pathToCreate) {
List<String> paths = new ArrayList<>();
int index = 0;

java.io.File file = new java.io.File(pathToCreate);
paths.add(file.getPath());
while (index < pathToCreate.length()) {
index = pathToCreate.indexOf(PATH_SEPARATOR, index);

while (file.getParent() != null) {
String parent = file.getParent();
paths.add(parent);
file = new java.io.File(parent);
}
if (index == -1) {
index = pathToCreate.length();
}

Collections.reverse(paths);
for (String path : paths) {
String path = pathToCreate.substring(0, index++);
if (!share.folderExists(path)) {
logger.debug("Creating folder {}", path);
share.mkdir(path);
Expand All @@ -292,6 +293,16 @@ private void createMissingDirectoriesRecursively(ComponentLog logger, DiskShare
}
}

String normalizePath(String path) {
if (path == null) {
return null;
}

return path.replace('\\', PATH_SEPARATOR)
.replaceAll("/+", "/")
.replaceAll("^/|/$", "");
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
Expand Down Expand Up @@ -328,34 +339,37 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
try {
final long processingStartTime = System.nanoTime();

final String destinationDirectory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String destinationFilename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());

String destinationFullPath;
final String destinationDirectory = normalizePath(directory);

final String destinationFullPath;

// build destination path for the flowfile
if (destinationDirectory == null || destinationDirectory.isBlank()) {
destinationFullPath = destinationFilename;
if (StringUtils.isBlank(destinationDirectory)) {
destinationFullPath = filename;
} else {
destinationFullPath = new java.io.File(destinationDirectory, destinationFilename).getPath();
destinationFullPath = String.format("%s%c%s", destinationDirectory, PATH_SEPARATOR, filename);
}

// handle missing directory
final String destinationFileParentDirectory = new java.io.File(destinationFullPath).getParent();
final Boolean createMissingDirectories = context.getProperty(CREATE_DIRS).asBoolean();
if (!createMissingDirectories && !share.folderExists(destinationFileParentDirectory)) {
logger.warn("Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist", flowFile, destinationFileParentDirectory);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
continue;
} else if (!share.folderExists(destinationFileParentDirectory)) {
try {
createMissingDirectoriesRecursively(logger, share, destinationFileParentDirectory);
} catch (Exception e) {
logger.error("Penalizing {} and routing to failure because failed to create missing destination directories ({})", flowFile, destinationFileParentDirectory, e);
if (StringUtils.isNotBlank(destinationDirectory) && !share.folderExists(destinationDirectory)) {
if (!createMissingDirectories) {
logger.warn("Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist", flowFile, destinationDirectory);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
continue;
} else {
try {
createMissingDirectoriesRecursively(logger, share, destinationDirectory);
} catch (Exception e) {
logger.error("Penalizing {} and routing to failure because failed to create missing destination directories ({})", flowFile, destinationDirectory, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
continue;
}
}
}

Expand All @@ -376,16 +390,18 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

// handle temporary suffix
final String renameSuffixValue = context.getProperty(RENAME_SUFFIX).getValue();
final boolean renameSuffix = renameSuffixValue != null && !renameSuffixValue.isBlank();
StringBuilder finalDestinationFullPath = new StringBuilder(destinationFullPath);
final boolean renameSuffix = StringUtils.isNotBlank(renameSuffixValue);
final String transferDestinationFullPath;
if (renameSuffix) {
finalDestinationFullPath.append(renameSuffixValue);
transferDestinationFullPath = destinationFullPath + renameSuffixValue;
} else {
transferDestinationFullPath = destinationFullPath;
}

// handle the transfer
try (
File shareDestinationFile = share.openFile(
finalDestinationFullPath.toString(),
transferDestinationFullPath,
EnumSet.of(AccessMask.GENERIC_WRITE),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
sharedAccess,
Expand All @@ -403,16 +419,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
// handle the rename
if (renameSuffix) {
try (DiskEntry fileDiskEntry = share.open(
finalDestinationFullPath.toString(),
transferDestinationFullPath,
EnumSet.of(AccessMask.DELETE, AccessMask.GENERIC_WRITE),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
sharedAccess,
SMB2CreateDisposition.FILE_OPEN,
EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) {

// normalize path slashes for the network share
destinationFullPath = destinationFullPath.replace("/", "\\");

// rename the file on the share and replace it in case it exists
fileDiskEntry.rename(destinationFullPath, true);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anySet;
Expand All @@ -75,7 +76,7 @@ public class PutSmbFileTest {

private static final String HOSTNAME = "smbhostname";
private static final String SHARE = "smbshare";
private static final String DIRECTORY = "smbdirectory\\subdir";
private static final String DIRECTORY = "smbdirectory";
private static final String DOMAIN = "mydomain";
private static final String USERNAME = "myusername";
private static final String PASSWORD = "mypassword";
Expand Down Expand Up @@ -129,13 +130,15 @@ private void setupSmbProcessor() throws IOException {
}

private void testDirectoryCreation(String dirFlag, int times) throws IOException {
when(diskShare.folderExists(DIRECTORY)).thenReturn(false);
when(diskShare.folderExists(any())).thenReturn(false);

testRunner.setProperty(PutSmbFile.DIRECTORY, "smbdirectory/subdir");
testRunner.setProperty(PutSmbFile.CREATE_DIRS, dirFlag);
testRunner.enqueue("data");
testRunner.run();

verify(diskShare, times(times)).mkdir(DIRECTORY);
verify(diskShare, times(times)).mkdir("smbdirectory");
verify(diskShare, times(times)).mkdir("smbdirectory/subdir");
}

private Set<SMB2ShareAccess> testOpenFileShareAccess() throws IOException {
Expand Down Expand Up @@ -289,25 +292,28 @@ public void testAnonymousAuth() throws IOException {
@Test
public void testDirExistsWithoutCreate() throws IOException {
testDirectoryCreation("false", 0);

testRunner.assertAllFlowFilesTransferred(PutSmbFile.REL_FAILURE);
}

@Test
public void testDirExistsWithCreate() throws IOException {
testDirectoryCreation("true", 1);

testRunner.assertAllFlowFilesTransferred(PutSmbFile.REL_SUCCESS);
}

@Test
public void testDirectoriesCreatedWhenDontExists() throws IOException {
final String directory = new java.io.File("a\\b\\c\\b\\e").getPath();
final int count = directory.split(java.util.regex.Pattern.quote(java.io.File.separator)).length;
when(diskShare.folderExists(DIRECTORY)).thenReturn(false);
final String directory = "a\\b/c/b\\e";
when(diskShare.folderExists(any())).thenReturn(false);

testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true");
testRunner.setProperty(PutSmbFile.DIRECTORY, directory);
testRunner.enqueue("data");
testRunner.run();

verify(diskShare, times(count)).mkdir(
verify(diskShare, times(5)).mkdir(
any(String.class)
);
}
Expand Down Expand Up @@ -443,7 +449,7 @@ public void testTemporarySuffixIsSetRenameIsCalled() throws IOException {

assertTrue(initialFilename.getValue().endsWith(suffix), "Suffix is not present and it should be");
assertTrue(!finalFilename.getValue().endsWith(suffix), "Suffix is present and it shouldn't be");
assertTrue(replace.getValue(), "Replace flag shold be true");
assertTrue(replace.getValue(), "Replace flag should be true");
}

@Test
Expand All @@ -458,4 +464,23 @@ public void testConnectionError() throws IOException {

testRunner.assertAllFlowFilesTransferred(PutSmbFile.REL_FAILURE, 3);
}

@Test
void testNormalizePath() {
PutSmbFile processor = new PutSmbFile();

assertNull(processor.normalizePath(null));

assertEquals("", processor.normalizePath("/"));
assertEquals("", processor.normalizePath("\\"));

assertEquals("d1/d2", processor.normalizePath("d1/d2"));
assertEquals("d1/d2", processor.normalizePath("/d1/d2/"));
assertEquals("d1/d2", processor.normalizePath("//d1//d2//"));

assertEquals("d1/d2", processor.normalizePath("d1\\d2"));
assertEquals("d1/d2", processor.normalizePath("\\d1\\d2\\"));
assertEquals("d1/d2", processor.normalizePath("\\\\d1\\\\d2\\\\"));
}

}
Loading