Skip to content

Commit

Permalink
INT-3737: (S)FTP Outbound Gateway Streaming GET
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3737

Support returning an `InputStream` from a GET operation.

This can be used in conjuction with a `<file:splitter/>` to stream a text file.

The user is responsible for releasing the session when the download is complete.

The session object is stored in the message header and `RemoteFileUtils.closeSession()` should be called
to clean up and close the session.

INT-3737: Polishing and Docs

Fix `CachedSession.close()` bug

Preventing `Caused by: java.io.IOException: Previous raw read was not finalized`
when `Session` is returned to the pool, but `readingRaw` hasn't been finalized
because the real `close()` isn't invoked in case of normal return to pool.
  • Loading branch information
garyrussell authored and artembilan committed Jun 19, 2015
1 parent 5e9624f commit 65d2024
Show file tree
Hide file tree
Showing 15 changed files with 266 additions and 41 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,8 @@ public abstract class FileHeaders {

public static final String REMOTE_FILE = PREFIX + "remoteFile";

public static final String REMOTE_SESSION = PREFIX + "remoteSession";

public static final String RENAME_TO = PREFIX + "renameTo";

}
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2014 the original author or authors.
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,6 +98,14 @@ public RemoteFileTemplate(SessionFactory<F> sessionFactory) {
this.sessionFactory = sessionFactory;
}

/**
* @return this template's {@link SessionFactory}.
* @since 4.2
*/
public SessionFactory<F> getSessionFactory() {
return sessionFactory;
}

/**
* Determine whether the remote directory should automatically be created when
* sending files to the remote system.
Expand Down
Expand Up @@ -40,12 +40,14 @@
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.file.remote.AbstractFileInfo;
import org.springframework.integration.file.remote.RemoteFileTemplate;
import org.springframework.integration.file.remote.RemoteFileUtils;
import org.springframework.integration.file.remote.SessionCallback;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
Expand Down Expand Up @@ -170,7 +172,12 @@ public static enum Option {
/**
* Recursive (ls, mget)
*/
RECURSIVE("-R");
RECURSIVE("-R"),

/**
* Streaming 'get' (returns InputStream); user must call {@link RemoteFileUtils#closeSession(Session)}.
*/
STREAM("-stream");

private String option;

Expand Down Expand Up @@ -364,7 +371,7 @@ protected void doInit() {
Command.GET.equals(this.command)) {
Assert.isNull(this.filter, "Filters are not supported with the rm and get commands");
}
if (Command.GET.equals(this.command)
if ((Command.GET.equals(this.command) && !options.contains(Option.STREAM))
|| Command.MGET.equals(this.command)) {
Assert.notNull(this.localDirectoryExpression, "localDirectory must not be null");
if (this.localDirectoryExpression instanceof LiteralExpression) {
Expand Down Expand Up @@ -449,19 +456,37 @@ private Object doGet(final Message<?> requestMessage) {
final String remoteFilePath = this.fileNameProcessor.processMessage(requestMessage);
final String remoteFilename = this.getRemoteFilename(remoteFilePath);
final String remoteDir = this.getRemoteDirectory(remoteFilePath, remoteFilename);
File payload = this.remoteFileTemplate.execute(new SessionCallback<F, File>() {
Session<F> session = null;
Object payload;
if (this.options.contains(Option.STREAM)) {
session = this.remoteFileTemplate.getSessionFactory().getSession();
try {
payload = session.readRaw(remoteFilePath);
}
catch (IOException e) {
throw new MessageHandlingException(requestMessage, "Failed to get the remote file ["
+ remoteFilePath
+ "] as a stream", e);
}
}
else {
payload = this.remoteFileTemplate.execute(new SessionCallback<F, File>() {

@Override
public File doInSession(Session<F> session) throws IOException {
return AbstractRemoteFileOutboundGateway.this.get(requestMessage, session, remoteDir, remoteFilePath,
remoteFilename, true);
@Override
public File doInSession(Session<F> session) throws IOException {
return AbstractRemoteFileOutboundGateway.this.get(requestMessage, session, remoteDir, remoteFilePath,
remoteFilename, true);

}
});
return this.getMessageBuilderFactory().withPayload(payload)
}
});
}
AbstractIntegrationMessageBuilder<Object> builder = this.getMessageBuilderFactory().withPayload(payload)
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
.build();
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename);
if (session != null) {
builder.setHeader(FileHeaders.REMOTE_SESSION, session);
}
return builder.build();
}

private Object doMget(final Message<?> requestMessage) {
Expand Down
Expand Up @@ -190,7 +190,15 @@ public synchronized void close() {
else if (this.dirty) {
this.targetSession.close();
}
pool.releaseItem(targetSession);
if (this.targetSession.isOpen()) {
try {
this.targetSession.finalizeRaw();
}
catch (IOException e) {
//No-op in this context
}
}
pool.releaseItem(this.targetSession);
released = true;
}
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.util.StringUtils;

/**
* The {@link AbstractMessageSplitter} implementation to split the {@link File}
Expand Down Expand Up @@ -147,11 +148,11 @@ else if (payload instanceof InputStream) {
else {
reader = new InputStreamReader((InputStream) payload, this.charset);
}
filePath = ":stream:";
filePath = buildPathFromMessage(message, ":stream:");
}
else if (payload instanceof Reader) {
reader = (Reader) payload;
filePath = ":reader:";
filePath = buildPathFromMessage(message, ":reader:");
}
else {
return message;
Expand Down Expand Up @@ -242,7 +243,6 @@ public Object next() {
}
}


@Override
protected boolean willAddHeaders(Message<?> message) {
Object payload = message.getPayload();
Expand All @@ -268,6 +268,17 @@ else if (message.getPayload() instanceof String) {
}
}

private String buildPathFromMessage(Message<?> message, String defaultPath) {
String remoteDir = (String) message.getHeaders().get(FileHeaders.REMOTE_DIRECTORY);
String remoteFile = (String) message.getHeaders().get(FileHeaders.REMOTE_FILE);
if (StringUtils.hasText(remoteDir) && StringUtils.hasText(remoteFile)) {
return remoteDir + remoteFile;
}
else {
return defaultPath;
}
}

public static class FileMarker implements Serializable {

private static final long serialVersionUID = 8514605438145748406L;
Expand Down
Expand Up @@ -144,6 +144,9 @@ public void append(InputStream inputStream, String path) throws IOException {
@Override
public void close() {
try {
if (this.readingRaw.get()) {
finalizeRaw();
}
this.client.disconnect();
}
catch (Exception e) {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.apache.commons.net.ftp.FTPFile;
import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.ftplet.Authentication;
Expand All @@ -40,6 +41,8 @@

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.test.util.SocketUtils;

Expand Down Expand Up @@ -149,13 +152,14 @@ public String getTargetLocalDirectoryName() {
}

@Bean
public DefaultFtpSessionFactory ftpSessionFactory() {
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost("localhost");
factory.setPort(this.ftpPort);
factory.setUsername("foo");
factory.setPassword("foo");
return factory;

return new CachingSessionFactory<FTPFile>(factory);
}

@PostConstruct
Expand Down
@@ -1,12 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">

<bean id="ftpServer" class="org.springframework.integration.ftp.TestFtpServer">
<constructor-arg value="FtpServerOutboundTests"/>
Expand Down Expand Up @@ -104,6 +105,25 @@
remote-directory="ftpTarget"
reply-channel="output"/>

<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />

<int:chain input-channel="stream">
<int-file:splitter markers="true" />
<int:payload-type-router resolution-required="false" default-output-channel="output">
<int:mapping type="org.springframework.integration.file.splitter.FileSplitter$FileMarker"
channel="markers" />
</int:payload-type-router>
</int:chain>

<int:service-activator input-channel="markers"
expression="payload.mark.toString().equals('END') ? headers['file_remoteSession'].close() : null"/>

<int:channel id="appending" />

<int-ftp:outbound-channel-adapter id="appender"
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2013 the original author or authors.
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -32,6 +34,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.net.ftp.FTPFile;
import org.hamcrest.Matchers;
Expand All @@ -48,10 +51,11 @@
import org.springframework.integration.file.remote.RemoteFileTemplate;
import org.springframework.integration.file.remote.SessionCallback;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.TestFtpServer;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.ftp.session.FtpRemoteFileTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
Expand All @@ -74,7 +78,7 @@ public class FtpServerOutboundTests {
private TestFtpServer ftpServer;

@Autowired
private DefaultFtpSessionFactory ftpSessionFactory;
private SessionFactory<FTPFile> ftpSessionFactory;

@Autowired
private PollableChannel output;
Expand Down Expand Up @@ -112,6 +116,9 @@ public class FtpServerOutboundTests {
@Autowired
private DirectChannel failing;

@Autowired
private DirectChannel inboundGetStream;

@Before
public void setup() {
this.ftpServer.recursiveDelete(ftpServer.getTargetLocalDirectory());
Expand Down Expand Up @@ -342,6 +349,33 @@ public void testInt3412FileMode() {

}

@Test
public void testStream() {
String dir = "ftpSource/";
this.inboundGetStream.send(new GenericMessage<Object>(dir + "ftpSource1.txt"));
Message<?> result = this.output.receive(1000);
assertNotNull(result);
assertEquals("source1", result.getPayload());
assertEquals("ftpSource/", result.getHeaders().get(FileHeaders.REMOTE_DIRECTORY));
assertEquals("ftpSource1.txt", result.getHeaders().get(FileHeaders.REMOTE_FILE));

Session<?> session = (Session<?>) result.getHeaders().get(FileHeaders.REMOTE_SESSION);
// Returned to cache
assertTrue(session.isOpen());
// Raw reading is finished
assertFalse(TestUtils.getPropertyValue(session, "targetSession.readingRaw", AtomicBoolean.class).get());

// Check that we can use the same session from cache to read another remote InputStream
this.inboundGetStream.send(new GenericMessage<Object>(dir + "ftpSource2.txt"));
result = this.output.receive(1000);
assertNotNull(result);
assertEquals("source2", result.getPayload());
assertEquals("ftpSource/", result.getHeaders().get(FileHeaders.REMOTE_DIRECTORY));
assertEquals("ftpSource2.txt", result.getHeaders().get(FileHeaders.REMOTE_FILE));
assertSame(TestUtils.getPropertyValue(session, "targetSession"),
TestUtils.getPropertyValue(result.getHeaders().get(FileHeaders.REMOTE_SESSION), "targetSession"));
}

private void assertLength6(FtpRemoteFileTemplate template) {
FTPFile[] files = template.execute(new SessionCallback<FTPFile, FTPFile[]>() {

Expand Down
Expand Up @@ -61,7 +61,7 @@ public class FtpRemoteFileTemplateTests {
private TestFtpServer ftpServer;

@Autowired
private DefaultFtpSessionFactory sessionFactory;
private SessionFactory<FTPFile> sessionFactory;

@Before
@After
Expand Down

0 comments on commit 65d2024

Please sign in to comment.