Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FTP Connector #3

Merged
merged 1 commit into from
Nov 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
lazy val alpakka = project
.in(file("."))
.enablePlugins(PublishUnidoc)
.aggregate(amqp, cassandra, docs, files, mqtt, s3)
.aggregate(amqp, cassandra, docs, files, mqtt, s3, ftp)

lazy val amqp = project
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -44,6 +44,14 @@ lazy val s3 = project
Dependencies.S3
)

lazy val ftp = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-ftp",
Dependencies.Ftp,
parallelExecution in Test := false
)

lazy val docs = project
.enablePlugins(ParadoxPlugin, NoPublish)
.disablePlugins(BintrayPlugin)
Expand All @@ -59,4 +67,4 @@ lazy val docs = project
"scaladoc.akka.base_url" -> s"http://doc.akka.io/api/akka/${Dependencies.AkkaVersion}",
"scaladoc.akka.stream.alpakka.base_url" -> s"http://doc.akka.io/api/alpakka/${version.value}"
)
)
)
1 change: 1 addition & 0 deletions docs/src/main/paradox/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [Cassandra Connector](cassandra.md)
* [File Connectors](file.md)
* [MQTT Connector](mqtt.md)
* [FTP Connector](ftp.md)
* [External Connectors](external-connectors.md)

@@@
Expand Down
96 changes: 96 additions & 0 deletions docs/src/main/paradox/ftp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# FTP Connector

The FTP connector provides Akka Stream sources to connect to FTP, FTPs and SFTP servers. Currently, two kinds of sources are provided:

* one for browsing or traversing the server recursively and,
* another for retrieving files as a stream of bytes.

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-ftp_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-ftp_$scala.binaryVersion$", version: "$version$"
}
```
@@@

## Usage

### Configuring the connection settings

In order to establish a connection with the remote server, you need to provide a specialized version of a @scaladoc[RemoteFileSettings](akka.stream.alpakka.ftp.RemoteFileSettings) instance. It's specialized as it depends on the kind of server you're connecting to: FTP, FTPs or SFTP.

Scala
: @@snip (../../../../ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala) { #create-settings }

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpSourceTest.java) { #create-settings }

The configuration above will create an anonymous connection with a remote FTP server in passive mode. For both FTPs and SFTP servers, you will need to provide the specialized versions of these settings: @scaladoc[FtpsSettings](akka.stream.alpakka.ftp.RemoteFileSettings$$FtpsSettings) or @scaladoc[SftpSettings](akka.stream.alpakka.ftp.RemoteFileSettings$$SftpSettings)
respectively.

For non-anonymous connection, please provide an instance of @scaladoc[NonAnonFtpCredentials](akka.stream.alpakka.ftp.FtpCredentials$$NonAnonFtpCredentials) instead.

### Traversing a remote FTP folder recursively

In order to traverse a remote folder recursively, you need to use the `ls` method in the FTP API:

Scala
: @@snip (../../../../ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala) { #traversing }

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpSourceTest.java) { #traversing }

This source will emit @scaladoc[FtpFile](akka.stream.alpakka.ftp.FtpFile) elements with no significant materialization.

For both FTPs and SFTP servers, you will need to use the `FTPs` and `SFTP` API respectively.

### Retrieving files

In order to retrieve a remote file as a stream of bytes, you need to use the `fromPath` method in the FTP API:

Scala
: @@snip (../../../../ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala) { #retrieving }

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpSourceTest.java) { #retrieving }

This souce will emit @scaladoc[ByteString](akka.util.ByteString) elements and materializes to @scaladoc[Future](scala.concurrent.Future) in Scala API and @extref[CompletionStage](java-api:java/util/concurrent/CompletionStage) in Java API of @scaladoc[IOResult](akka.stream.IOResult) when the stream finishes.

For both FTPs and SFTP servers, you will need to use the `FTPs` and `SFTP` API respectively.

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to browse the code, edit and run it in sbt.

Scala
: ```
sbt
> ftp/test
```

Java
: ```
sbt
> ftp/test
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.ftp
package impl

import akka.stream.stage.{ GraphStage, OutHandler }
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher

private[ftp] trait FtpBrowserGraphStage[FtpClient] extends GraphStage[SourceShape[FtpFile]] {

def name: String

def basePath: String

def connectionSettings: RemoteFileSettings

implicit def ftpClient: FtpClient

val ftpLike: FtpLike[FtpClient]

val shape: SourceShape[FtpFile] = SourceShape(Outlet[FtpFile](s"$name.out"))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the blocking logic is done already in this trait I think the IODispatcher should be specified using initialAttributes here rather than when creating the flows in FtpApi (and possibly miss out on it)

override def initialAttributes: Attributes =
super.initialAttributes and Attributes.name(name) and IODispatcher

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanandren Please, can you confirm that this is what you mean regarding initial attributes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly!

def createLogic(inheritedAttributes: Attributes) = {
val logic = new FtpGraphStageLogic[FtpFile, FtpClient](shape, ftpLike, connectionSettings) {

private[this] var buffer: Seq[FtpFile] = Seq.empty[FtpFile]

setHandler(out, new OutHandler {
def onPull(): Unit = {
fillBuffer()
buffer match {
case Seq() =>
finalize()
case head +: Seq() =>
if (!head.isDirectory)
push(out, head)
finalize()
case head +: tail =>
buffer = tail
if (!head.isDirectory)
push(out, head)
else
onPull()
}
def finalize() =
try {
disconnect()
} finally {
complete(out)
}
} // end of onPull

override def onDownstreamFinish(): Unit =
try {
disconnect()
} finally {
matSuccess()
super.onDownstreamFinish()
}
}) // end of handler

protected[this] def doPreStart(): Unit =
buffer = initBuffer(basePath)

override protected[this] def matSuccess() = true

override protected[this] def matFailure(t: Throwable) = true

private[this] def initBuffer(basePath: String) =
getFilesFromPath(basePath)

private[this] def fillBuffer() =
buffer match {
case Seq() => // Nothing to do
case head +: tail =>
if (head.isDirectory) {
buffer = getFilesFromPath(head.path) ++ tail
}
}

private[this] def getFilesFromPath(basePath: String) =
if (basePath.isEmpty)
ftpLike.listFiles(handler.get)
else
ftpLike.listFiles(basePath, handler.get)

} // end of stage logic

logic
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.ftp
package impl

import akka.stream.stage.GraphStageLogic
import akka.stream.{ Outlet, Shape }
import scala.util.control.NonFatal

private[ftp] abstract class FtpGraphStageLogic[T, FtpClient](
val shape: Shape,
val ftpLike: FtpLike[FtpClient],
val connectionSettings: RemoteFileSettings
)(implicit ftpClient: FtpClient)
extends GraphStageLogic(shape) {

protected[this] val out = shape.outlets.head.asInstanceOf[Outlet[T]]
protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler]
protected[this] var isConnected: Boolean = false

override def preStart(): Unit = {
super.preStart()
try {
val tryConnect = ftpLike.connect(connectionSettings)
if (tryConnect.isSuccess) {
handler = tryConnect.toOption
isConnected = true
} else
tryConnect.failed.foreach { case NonFatal(t) => throw t }
doPreStart()
} catch {
case NonFatal(t) =>
disconnect()
matFailure(t)
failStage(t)
}
}

override def postStop(): Unit = {
disconnect()
matSuccess()
super.postStop()
}

protected[this] def doPreStart(): Unit

protected[this] def disconnect(): Unit =
if (isConnected) {
ftpLike.disconnect(handler.get)
isConnected = false
}

protected[this] def matSuccess(): Boolean

protected[this] def matFailure(t: Throwable): Boolean

}
Loading