Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,15 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -261,20 +257,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
boolean closeConnection = false;
try {
// Pull data from remote system.
final InputStream in;
try {
in = transfer.getInputStream(filename, flowFile);

flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
}
});

if (!transfer.flush(flowFile)) {
throw new IOException("completePendingCommand returned false, file transfer failed");
}
flowFile = transfer.getRemoteFile(filename, flowFile, session);

} catch (final FileNotFoundException e) {
closeConnection = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -189,12 +188,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
try {
FlowFile flowFile = session.create();
final StopWatch stopWatch = new StopWatch(false);
try (final InputStream in = transfer.getInputStream(file.getFullPathFileName())) {
stopWatch.start();
flowFile = session.importFrom(in, flowFile);
stopWatch.stop();
}
transfer.flush();
stopWatch.start();
flowFile = transfer.getRemoteFile(file.getFullPathFileName(), flowFile, session);
stopWatch.stop();
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
Expand Down Expand Up @@ -50,10 +51,13 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.stream.io.StreamUtils;

public class FTPTransfer implements FileTransfer {

Expand Down Expand Up @@ -314,35 +318,39 @@ private FileInfo newFileInfo(final FTPFile file, String path) {
}

@Override
public InputStream getInputStream(String remoteFileName) throws IOException {
return getInputStream(remoteFileName, null);
}

@Override
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final FTPClient client = getClient(flowFile);
InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) {
final String response = client.getReplyString();
// FTPClient doesn't throw exception if file not found.
// Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory"
if (response != null && response.trim().endsWith("No such file or directory")){
throw new FileNotFoundException(response);
public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException {
final FTPClient client = getClient(origFlowFile);
InputStream in = null;
FlowFile resultFlowFile = null;
try {
in = client.retrieveFileStream(remoteFileName);
if (in == null) {
final String response = client.getReplyString();
// FTPClient doesn't throw exception if file not found.
// Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory"
if (response != null && response.trim().endsWith("No such file or directory")) {
throw new FileNotFoundException(response);
}
throw new IOException(response);
}
final InputStream remoteIn = in;
resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(remoteIn, out);
}
});
client.completePendingCommand();
return resultFlowFile;
} finally {
if(in != null){
try{
in.close();
}catch(final IOException ioe){
//do nothing
}
}
throw new IOException(response);
}
return in;
}

@Override
public void flush() throws IOException {
final FTPClient client = getClient(null);
client.completePendingCommand();
}

@Override
public boolean flush(final FlowFile flowFile) throws IOException {
return getClient(flowFile).completePendingCommand();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

public interface FileTransfer extends Closeable {
Expand All @@ -34,13 +36,7 @@ public interface FileTransfer extends Closeable {

List<FileInfo> getListing() throws IOException;

InputStream getInputStream(String remoteFileName) throws IOException;

InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException;

void flush() throws IOException;

boolean flush(FlowFile flowFile) throws IOException;
FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException;

FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,23 @@
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.stream.io.StreamUtils;

import javax.net.SocketFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Proxy;
import java.net.Socket;
Expand Down Expand Up @@ -346,19 +351,22 @@ private void appendPermission(final StringBuilder builder, final Set<FilePermiss
}

@Override
public InputStream getInputStream(final String remoteFileName) throws IOException {
return getInputStream(remoteFileName, null);
}

@Override
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException {
final SFTPClient sftpClient = getSFTPClient(origFlowFile);
RemoteFile rf = null;
RemoteFile.ReadAheadRemoteFileInputStream rfis = null;
FlowFile resultFlowFile = null;
try {
// The client has 'get' methods for downloading a file, but they don't offer a way to get access to an InputStream so
// this code is what the SFTPTransfer Downloader does to get a stream for the remote file contents
final RemoteFile rf = sftpClient.open(remoteFileName);
final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16);
return rfis;
rf = sftpClient.open(remoteFileName);
rfis = rf.new ReadAheadRemoteFileInputStream(16);
final InputStream in = rfis;
resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
}
});
return resultFlowFile;
} catch (final SFTPException e) {
switch (e.getStatusCode()) {
case NO_SUCH_FILE:
Expand All @@ -368,19 +376,24 @@ public InputStream getInputStream(final String remoteFileName, final FlowFile fl
default:
throw new IOException("Failed to obtain file content for " + remoteFileName, e);
}
} finally {
if(rf != null){
try{
rf.close();
}catch(final IOException ioe){
//do nothing
}
}
if(rfis != null){
try{
rfis.close();
}catch(final IOException ioe){
//do nothing
}
}
}
}

@Override
public void flush() throws IOException {
// nothing needed here
}

@Override
public boolean flush(final FlowFile flowFile) throws IOException {
return true;
}

@Override
public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
Expand Down Expand Up @@ -285,12 +290,7 @@ public List<FileInfo> getListing() throws IOException {
}

@Override
public InputStream getInputStream(final String remoteFileName) throws IOException {
return getInputStream(remoteFileName, null);
}

@Override
public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException {
public FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException {
if (!allowAccess) {
throw new PermissionDeniedException("test permission denied");
}
Expand All @@ -299,17 +299,14 @@ public InputStream getInputStream(String remoteFileName, FlowFile flowFile) thro
if (content == null) {
throw new FileNotFoundException();
}

return new ByteArrayInputStream(content);
}

@Override
public void flush() throws IOException {
}

@Override
public boolean flush(FlowFile flowFile) throws IOException {
return true;
final InputStream in = new ByteArrayInputStream(content);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
}
});
return flowFile;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,29 +310,6 @@ public void testGetListingWhenRemotePathDoesNotExist() throws IOException {
}
}

@Test
public void testGetInputStream() throws IOException {
final String filename = "./" + DIR_2 + "/" + FILE_1;
final Map<PropertyDescriptor, String> properties = createBaseProperties();

try(final SFTPTransfer transfer = createSFTPTransfer(properties);
final InputStream in = transfer.getInputStream(filename)) {
final String content = IOUtils.toString(in, StandardCharsets.UTF_8);
assertEquals("dir2 file1", content);
}
}

@Test(expected = FileNotFoundException.class)
public void testGetInputStreamWhenFileDoesNotExist() throws IOException {
final String filename = "./" + DIR_2 + "/DOES-NOT-EXIST";
final Map<PropertyDescriptor, String> properties = createBaseProperties();

try(final SFTPTransfer transfer = createSFTPTransfer(properties);
final InputStream in = transfer.getInputStream(filename)) {
IOUtils.toString(in, StandardCharsets.UTF_8);
}
}

@Test
public void testDeleteFileWithoutPath() throws IOException {
final Map<PropertyDescriptor, String> properties = createBaseProperties();
Expand Down