Skip to content

Commit

Permalink
Merge pull request #373 from Tecsisa/365-ftp-ssh
Browse files Browse the repository at this point in the history
FTP: Refactoring on SSH (jsch -> sshj) (previously "Secure FTP connection intermittently fails")
  • Loading branch information
raboof committed Jul 12, 2017
2 parents 7118c16 + 50ef97b commit a757056
Show file tree
Hide file tree
Showing 24 changed files with 250 additions and 207 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Expand Up @@ -97,7 +97,8 @@ lazy val ftp = project
.settings(
name := "akka-stream-alpakka-ftp",
Dependencies.Ftp,
parallelExecution in Test := false
parallelExecution in Test := false,
fork in Test := true
)

lazy val geode = project
Expand Down
Expand Up @@ -41,18 +41,14 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings] exte
case head +: tail =>
buffer = tail
push(out, head)
case _ => finalize()
case _ => complete(out)
}
def finalize() = try { disconnect() } finally { complete(out) }
} // end of onPull

override def onDownstreamFinish(): Unit =
try {
disconnect()
} finally {
matSuccess()
super.onDownstreamFinish()
}
override def onDownstreamFinish(): Unit = {
matSuccess()
super.onDownstreamFinish()
}
}
) // end of handler

Expand Down
Expand Up @@ -18,39 +18,33 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSett

protected[this] implicit val client = ftpClient()
protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler]
protected[this] var isConnected: Boolean = false

override def preStart(): Unit = {
super.preStart()
try {
val tryConnect = ftpLike.connect(connectionSettings)
if (tryConnect.isSuccess) {
handler = tryConnect.toOption
isConnected = true
} else
tryConnect.failed.foreach { case NonFatal(t) => throw t }
doPreStart()
} catch {
case NonFatal(t) =>
disconnect()
matFailure(t)
failStage(t)
}
}

override def postStop(): Unit = {
disconnect()
matSuccess()
disconnect()
super.postStop()
}

protected[this] def doPreStart(): Unit

protected[this] def disconnect(): Unit =
if (isConnected) {
ftpLike.disconnect(handler.get)
isConnected = false
}
handler.foreach(ftpLike.disconnect)

protected[this] def matSuccess(): Boolean

Expand Down
5 changes: 2 additions & 3 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala
Expand Up @@ -4,9 +4,8 @@
package akka.stream.alpakka.ftp
package impl

import com.jcraft.jsch.JSch
import net.schmizz.sshj.SSHClient
import org.apache.commons.net.ftp.FTPClient

import scala.collection.immutable
import scala.util.Try
import java.io.{InputStream, OutputStream}
Expand All @@ -31,5 +30,5 @@ protected[ftp] trait FtpLike[FtpClient, S <: RemoteFileSettings] {
object FtpLike {
// type class instances
implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpFileSettings] with FtpOperations
implicit val sFtpLikeInstance = new FtpLike[JSch, SftpSettings] with SftpOperations
implicit val sFtpLikeInstance = new FtpLike[SSHClient, SftpSettings] with SftpOperations
}
Expand Up @@ -6,7 +6,7 @@ package akka.stream.alpakka.ftp.impl
import akka.stream.alpakka.ftp.FtpCredentials.{AnonFtpCredentials, NonAnonFtpCredentials}
import akka.stream.alpakka.ftp.{FtpFileSettings, RemoteFileSettings, SftpSettings}
import akka.stream.alpakka.ftp.RemoteFileSettings._
import com.jcraft.jsch.JSch
import net.schmizz.sshj.SSHClient
import org.apache.commons.net.ftp.FTPClient
import java.net.InetAddress

Expand Down Expand Up @@ -91,11 +91,11 @@ private[ftp] trait FtpsSource extends FtpSourceFactory[FTPClient] {
protected val ftpIOSinkName: String = FtpsIOSinkName
}

private[ftp] trait SftpSource extends FtpSourceFactory[JSch] {
private[ftp] trait SftpSource extends FtpSourceFactory[SSHClient] {
protected final val sFtpBrowserSourceName = "sFtpBrowserSource"
protected final val sFtpIOSourceName = "sFtpIOSource"
protected final val sFtpIOSinkName = "sFtpIOSink"
protected val ftpClient: () => JSch = () => new JSch
protected val ftpClient: () => SSHClient = () => new SSHClient
protected val ftpBrowserSourceName: String = sFtpBrowserSourceName
protected val ftpIOSourceName: String = sFtpIOSourceName
protected val ftpIOSinkName: String = sFtpIOSinkName
Expand Down Expand Up @@ -161,5 +161,5 @@ private[ftp] trait FtpsSourceParams extends FtpsSource with FtpsDefaultSettings

private[ftp] trait SftpSourceParams extends SftpSource with SftpDefaultSettings {
type S = SftpSettings
protected[this] val ftpLike: FtpLike[JSch, S] = FtpLike.sFtpLikeInstance
protected[this] val ftpLike: FtpLike[SSHClient, S] = FtpLike.sFtpLikeInstance
}
144 changes: 80 additions & 64 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/impl/SftpOperations.scala
Expand Up @@ -4,92 +4,108 @@
package akka.stream.alpakka.ftp
package impl

import com.jcraft.jsch.{ChannelSftp, JSch}

import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.sftp.{OpenMode, RemoteResourceInfo, SFTPClient}
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.userauth.keyprovider.OpenSSHKeyFile
import net.schmizz.sshj.userauth.password.PasswordUtils
import net.schmizz.sshj.xfer.FilePermission
import scala.collection.immutable
import scala.util.Try
import scala.collection.JavaConverters._
import java.io.{InputStream, OutputStream}
import java.nio.file.Paths
import scala.collection.JavaConversions._
import java.nio.file.attribute.PosixFilePermissions
import java.nio.file.attribute.PosixFilePermission
import java.io.{File, IOException, InputStream, OutputStream}

import com.jcraft.jsch.ChannelSftp.{APPEND, OVERWRITE}
private[ftp] trait SftpOperations { _: FtpLike[SSHClient, SftpSettings] =>

private[ftp] trait SftpOperations { _: FtpLike[JSch, SftpSettings] =>
type Handler = SFTPClient

type Handler = ChannelSftp
def connect(connectionSettings: SftpSettings)(implicit ssh: SSHClient): Try[Handler] = Try {
import connectionSettings._

private def configureIdentity(sftpIdentity: SftpIdentity)(implicit ftpClient: JSch) = sftpIdentity match {
case identity: RawKeySftpIdentity =>
ftpClient.addIdentity(identity.name, identity.privateKey, identity.publicKey.orNull, identity.password.orNull)
case identity: KeyFileSftpIdentity =>
ftpClient.addIdentity(identity.privateKey, identity.publicKey.orNull, identity.password.orNull)
}
if (!strictHostKeyChecking)
ssh.addHostKeyVerifier(new PromiscuousVerifier)
else
knownHosts.foreach(path => ssh.loadKnownHosts(new File(path)))

ssh.connect(host.getHostAddress, port)

if (credentials.password != "" && sftpIdentity.isEmpty)
ssh.authPassword(credentials.username, credentials.password)

sftpIdentity.foreach(setIdentity(_, credentials.username))

def connect(connectionSettings: SftpSettings)(implicit ftpClient: JSch): Try[Handler] = Try {
connectionSettings.sftpIdentity.foreach(configureIdentity)
connectionSettings.knownHosts.foreach(ftpClient.setKnownHosts)
val session = ftpClient.getSession(
connectionSettings.credentials.username,
connectionSettings.host.getHostAddress,
connectionSettings.port
)
session.setPassword(connectionSettings.credentials.password)
val config = new java.util.Properties
config.setProperty("StrictHostKeyChecking", if (connectionSettings.strictHostKeyChecking) "yes" else "no")
config.putAll(connectionSettings.options)
session.setConfig(config)
session.connect()
val channel = session.openChannel("sftp").asInstanceOf[ChannelSftp]
channel.connect()
channel
ssh.newSFTPClient()
}

def disconnect(handler: Handler)(implicit ftpClient: JSch): Unit = {
val session = handler.getSession
if (session.isConnected) {
session.disconnect()
}
if (handler.isConnected) {
handler.disconnect()
}
def disconnect(handler: Handler)(implicit ssh: SSHClient): Unit = {
handler.close()
if (ssh.isConnected) ssh.disconnect()
}

def listFiles(basePath: String, handler: Handler): immutable.Seq[FtpFile] = {
val path = if (!basePath.isEmpty && basePath.head != '/') s"/$basePath" else basePath
import scala.collection.JavaConversions.iterableAsScalaIterable
val entries = handler.ls(path).toSeq.filter {
case entry: Handler#LsEntry => entry.getFilename != "." && entry.getFilename != ".."
} // TODO
entries.map {
case entry: Handler#LsEntry =>
FtpFile(
entry.getFilename,
Paths.get(s"$path/${entry.getFilename}").normalize.toString,
entry.getAttrs.isDir,
entry.getAttrs.getSize,
entry.getAttrs.getMTime * 1000L,
getPosixFilePermissions(entry.getAttrs.getPermissionsString)
)
val entries = handler.ls(path).asScala
entries.map { file =>
FtpFile(
file.getName,
file.getPath,
file.isDirectory,
file.getAttributes.getSize,
file.getAttributes.getMtime * 1000L,
getPosixFilePermissions(file)
)
}.toVector
}

private def getPosixFilePermissions(permissions: String) =
PosixFilePermissions
.fromString(
permissions.replace('s', '-').drop(1)
)
.asScala
.toSet
private def getPosixFilePermissions(file: RemoteResourceInfo) = {
import FilePermission._, PosixFilePermission._
file.getAttributes.getPermissions.asScala.collect {
case USR_R => OWNER_READ
case USR_W => OWNER_WRITE
case USR_X => OWNER_EXECUTE
case GRP_R => GROUP_READ
case GRP_W => GROUP_WRITE
case GRP_X => GROUP_EXECUTE
case OTH_R => OTHERS_READ
case OTH_W => OTHERS_WRITE
case OTH_X => OTHERS_EXECUTE
}.toSet
}

def listFiles(handler: Handler): immutable.Seq[FtpFile] = listFiles(".", handler) // TODO
def listFiles(handler: Handler): immutable.Seq[FtpFile] = listFiles(".", handler)

def retrieveFileInputStream(name: String, handler: Handler): Try[InputStream] = Try {
handler.get(name)
val remoteFile = handler.open(name, Set(OpenMode.READ).asJava)
val is = new remoteFile.RemoteFileInputStream()
Option(is).getOrElse(throw new IOException(s"$name: No such file or directory"))
}

def storeFileOutputStream(name: String, handler: Handler, append: Boolean): Try[OutputStream] = Try {
handler.put(name, if (append) APPEND else OVERWRITE)
import OpenMode._
val openModes = Set(WRITE, CREAT) ++ (if (append) Set(APPEND) else Set())
val remoteFile = handler.open(name, openModes.asJava)
val os = new remoteFile.RemoteFileOutputStream()
Option(os).getOrElse(throw new IOException(s"Could not write to $name"))
}

private[this] def setIdentity(identity: SftpIdentity, username: String)(implicit ssh: SSHClient) = {
def bats(array: Array[Byte]): String = new String(array, "UTF-8")

def initKey(f: OpenSSHKeyFile => Unit) = {
val key = new OpenSSHKeyFile
f(key)
ssh.authPublickey(username, key)
}

val passphrase =
identity.privateKeyFilePassphrase.map(pass => PasswordUtils.createOneOff(bats(pass).toCharArray)).orNull

identity match {
case id: RawKeySftpIdentity =>
initKey(_.init(bats(id.privateKey), id.publicKey.map(bats).orNull, passphrase))
case id: KeyFileSftpIdentity =>
initKey(_.init(new File(id.privateKey), passphrase))
}
}
}
Expand Up @@ -10,10 +10,10 @@ import akka.stream.alpakka.ftp.impl.{FtpLike, FtpSourceFactory}
import akka.stream.IOResult
import akka.stream.javadsl.Source
import akka.stream.javadsl.Sink
import akka.stream.scaladsl.{Source ScalaSource}
import akka.stream.scaladsl.{Sink ScalaSink}
import akka.stream.scaladsl.{Source => ScalaSource}
import akka.stream.scaladsl.{Sink => ScalaSink}
import akka.util.ByteString
import com.jcraft.jsch.JSch
import net.schmizz.sshj.SSHClient
import org.apache.commons.net.ftp.FTPClient
import java.util.concurrent.CompletionStage

Expand Down Expand Up @@ -189,4 +189,4 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] =>

object Ftp extends FtpApi[FTPClient] with FtpSourceParams
object Ftps extends FtpApi[FTPClient] with FtpsSourceParams
object Sftp extends FtpApi[JSch] with SftpSourceParams
object Sftp extends FtpApi[SSHClient] with SftpSourceParams

0 comments on commit a757056

Please sign in to comment.