Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FTP Connector #3

Merged
merged 1 commit into from
Nov 29, 2016
Merged

FTP Connector #3

merged 1 commit into from
Nov 29, 2016

Conversation

juanjovazquez
Copy link

@juanjovazquez juanjovazquez commented Oct 20, 2016

Please, check if this implementation might be enough for a first version of the connector. My team is already using it on a real project so this will be tested from the beginning.

closes #4

/cc @patriknw @ktoso @2m

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking good. Let us know if there is something specific you would like us to take an extra look at. Then adding javadsl and Paradox.

import scala.util.Try

/**
* @author Juan José Vázquez Delgado
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def connect(
connectionSettings: FtpConnectionSettings
)(implicit ftpClient: FTPClient): Try[Handler] = Try {
if (ftpClient != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why those null checks? isn't that a program error?

/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.ftp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have refactored the package conventions after you started this. We now use scaladsl and javadsl. See the other connectors in master.

/**
* @author Juan José Vázquez Delgado
*/
trait SFtp {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think about what classes should be public and what is internal. Keep the public API as small as possible. Mark internal things with private[alpakka], perhaps move to an impl package also.

@juanjovazquez
Copy link
Author

@patriknw Still working on this. I plan to cut a first version this week with at least the basic operations for listing and retrieving files as streams. After that, it could be a good idea going to merge and let others to contribute for improving it. FTP is an old pal and has a lot of nuances and different use cases. Thanks for your comments!.

connectionSettings: RemoteFileSettings
): Source[FtpFile, CompletionStage[java.lang.Long]] = {
import scala.compat.java8.FutureConverters._
implicit val ec = scala.compat.java8.FutureConverters.globalExecutionContext // TODO
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw Any idea about how could I avoid this execution context import?. I need to map the Scala Future[Long] to the Java CompletionStage<java.lang.Long> and the longs conversion is asking me for an ExecutionContext. Should I use a wrapper as in IOResult?.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GraphStage has materializer.executionContext

@@ -44,6 +44,15 @@ lazy val s3 = project
Dependencies.S3
)

lazy val ftp = project
.in(file("ftp"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessary, when name of the module is the same as the directory.

Copy link
Member

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of nitpicks in the docs. Otherwise LGTM.

sbt
: @@@vars
```scala
libraryDependencies += "com.typesafe.akka" %% "akka-stream-alpakka-ftp" % "$version$"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/typesafe/lightbend

: @@@vars
```xml
<dependency>
<groupId>com.typesafe.akka</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/typesafe/lightbend

: @@@vars
```gradle
dependencies {
compile group: "com.typesafe.akka", name: "akka-stream-alpakka-ftp_$scala.binaryVersion$", version: "$version$"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/typesafe/lightbend


### Configuring the connection settings

In order to establish a connection with the remote server, you need to provide an specialized version of a @scaladoc[RemoteFileSettings](akka.stream.alpakka.ftp.RemoteFileSettings) instance. It's specialized as it depends on the kind of server you're connecting to: FTP, FTPs or SFTP.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... you need to provide an specialized ...

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpSourceTest.java) { #create-settings }

The configuration above will create an anonymous connection with a remote FTP server in passive mode. For both FTPs and SFTP servers, you will need to provide the specialized versions of these settings: @scaladoc[FtpsSettings](akka.stream.alpakka.ftp.RemoteFileSettings.FtpsSettings) or @scaladoc[SFtpSettings](akka.stream.alpakka.ftp.RemoteFileSettings.SFtpSettings)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not properly link. Use akka.stream.alpakka.ftp.RemoteFileSettings$$FtpsSettings and akka.stream.alpakka.ftp.RemoteFileSettings$$SFtpSettings

The configuration above will create an anonymous connection with a remote FTP server in passive mode. For both FTPs and SFTP servers, you will need to provide the specialized versions of these settings: @scaladoc[FtpsSettings](akka.stream.alpakka.ftp.RemoteFileSettings.FtpsSettings) or @scaladoc[SFtpSettings](akka.stream.alpakka.ftp.RemoteFileSettings.SFtpSettings)
respectively.

For non-anonymous connection, please provide an instance of [NonAnonFtpCredentials](akka.stream.alpakka.ftp.FtpCredentials.NonAnonFtpCredentials) instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @scaladoc before [NonAnonFtpC...

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks promising, I had a few questions/opinions I have added.

import scala.concurrent.{ Future, Promise }

private[ftp] trait FtpBrowserGraphStage[FtpClient]
extends GraphStageWithMaterializedValue[SourceShape[FtpFile], Future[Done]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the stage just fails for errors, and completes the same thing can be achieved by using ftpSource.watchTermination(), is there some special circumstance that the Future[Done] materialized value is needed for?

Copy link
Author

@juanjovazquez juanjovazquez Nov 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. The only purpose of Future[Done] is to check termination. Should I change it and not materialize?.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future[Done] is great for sinks, but not really needed for sources

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. I'll change this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val ftpLike: FtpLike[FtpClient]

val shape = SourceShape(Outlet[FtpFile](s"$name.out"))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the blocking logic is done already in this trait I think the IODispatcher should be specified using initialAttributes here rather than when creating the flows in FtpApi (and possibly miss out on it)


val logic = new FtpGraphStageLogic[FtpFile, FtpClient](shape, ftpLike, connectionSettings) {

private var currentFiles: Stream[FtpFile] = Stream.empty[FtpFile]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to recall that Stream leaks memory because of memoization, are you sure this is not happening here (keeping each file in memory even after emitting it downstream)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, take into account that the propagated element here is an FtpFile that is a mere descriptor for the physical file. The physical file is got through other Source, the FtpSourceIOGraphStage in the form of stream of ByteString. So, IMHO, the scala.collection.immutable.Stream should not be a problem here, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you list a directory with 50 000 files, it would (if I'm right about memoization) keep all of them in memory until the stream completes which isn't very nice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow!. I'll check this right away. Thanks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally changed the Stream for a strict sequence as I'm quite sure that we had memoization as you mentioned. Thanks!.

protected val ftpIOSourceName: String = FtpsIOSourceName
}

private[ftp] trait SFtpSource { _: FtpSourceFactory[_] =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if FTP is cased as Ftp shouldn't SFTP be Sftp rather than SFtp?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, but I find ugly Sftp personally. I suppose this is a matter of taste. Other opinions?.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care super much about which one of them, it was more about consistency, both should follow the same pattern, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepted. I changed the naming. Thanks!

@juanjovazquez juanjovazquez force-pushed the wip-ftp-support branch 2 times, most recently from 1f664ca to 29143af Compare November 25, 2016 15:39

override def initialAttributes: Attributes =
super.initialAttributes and Attributes.name(name) and IODispatcher

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanandren Please, can you confirm that this is what you mean regarding initial attributes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly!

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@johanandren johanandren merged commit a2a20e8 into akka:master Nov 29, 2016
@johanandren
Copy link
Member

Thanks @juanjovazquez !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FTP connector
4 participants