Skip to content

Commit

Permalink
Fix messages parsing #1
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarlakxen committed Mar 17, 2017
1 parent c4d0811 commit d42508f
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 76 deletions.
9 changes: 5 additions & 4 deletions build.sbt
Expand Up @@ -39,9 +39,10 @@ scalacOptions in Test ++= Seq("-Yrangepos")
val akkaV = "2.4.17"
val mockFtpServerV = "2.7.1"
val slf4JV = "1.7.21"
val logbackV = "1.2.1"
val dockerTestKitV = "0.9.0"
val spec2V = "3.8.8"
val logbackV = "1.2.2"
val commonsNetV = "3.6"
val dockerTestKitV = "0.9.1"
val spec2V = "3.8.9"
val jUnitV = "4.12"

libraryDependencies ++= Seq(
Expand All @@ -51,10 +52,10 @@ libraryDependencies ++= Seq(
// --- Testing ---
"ch.qos.logback" % "logback-classic" % logbackV % "test",
"org.mockftpserver" % "MockFtpServer" % mockFtpServerV % "test",
"commons-net" % "commons-net" % commonsNetV % "test",
"com.whisk" %% "docker-testkit-specs2" % dockerTestKitV % "test",
"com.whisk" %% "docker-testkit-impl-spotify" % dockerTestKitV % "test",
"com.typesafe.akka" %% "akka-stream-testkit" % akkaV % "test",
"org.specs2" %% "specs2-core" % spec2V % "test",
"org.specs2" %% "specs2-mock" % spec2V % "test",
"org.specs2" %% "specs2-junit" % spec2V % "test"
)
6 changes: 2 additions & 4 deletions project/plugins.sbt
@@ -1,5 +1,3 @@
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3")

addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")

addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")
17 changes: 8 additions & 9 deletions src/main/scala/com/github/jarlakxen/reactive/ftp/FtpClient.scala
Expand Up @@ -10,7 +10,7 @@ import akka.util.ByteString
object FtpClient {

def apply()(implicit system: ActorSystem): ActorRef = system.actorOf(Props(classOf[client.FtpProtocolManager]))

trait ConnectionMode
case object ActiveMode extends ConnectionMode
case object PassiveMode extends ConnectionMode
Expand Down Expand Up @@ -41,19 +41,18 @@ object FtpClient {
case class DirListing(files: List[EntryInfo])
case object DirFail


trait EntryInfo {
def name: String
def size: Long
def user: String
def group: String
def mode: String
def name: String
def size: Long
def user: String
def group: String
def mode: String
}
case class FileInfo(name: String, size: Long, user: String, group: String, mode: String) extends EntryInfo
case class DirInfo(name: String, size: Long, user: String, group: String, mode: String) extends EntryInfo

val ResponsePattern = "(\\d+) (.*)\r\n".r
val MultilineResponsePattern = "(\\d+)\\-(.*)\r?\n?.*\r?\n?".r
val SingleLineResponsePattern = "(\\d+) (.*)$".r
val MultilineResponsePattern = "(\\d+)\\-(.*)$".r
val ListPattern = "([drwsx\\-]+)\\s+(\\d+)\\s+(\\w+)\\s+(\\w+)\\s+(\\d+)\\s+(\\w{3})\\s+(\\d+)\\s+([\\d:]+)\\s+([\\w\\.]+)\r?".r
val DefaultFtpPort = 21

Expand Down
Expand Up @@ -23,21 +23,10 @@ class FtpConnection(val addr: InetSocketAddress) extends FSM[FtpConnection.State

when(ReceivePlain) {
case Event(Tcp.Received(data), ctx: PlainData) => {
data.utf8String match {
case FtpClient.ResponsePattern(rawCode, rawMessage) => {
log.debug(s"Received $rawCode $rawMessage")
context.parent ! Response(rawCode.toInt, rawMessage)
stay()
}
case FtpClient.MultilineResponsePattern(rawCode, rawMessage) => {
log.debug(s"Received multiline $rawCode $rawMessage")
goto(ReceiveMulti) using MultiData(ctx.connection, new StringBuffer(rawMessage))
}
case other => {
log.error(s"Unexpected message $other.")
stay()
}
extractMessage(data.utf8String.lines.toList).foreach{
case (code, message) => context.parent ! Response(code, message)
}
stay()
}
case Event(Request(line), ctx: PlainData) => {
log.debug(s"Sending $line")
Expand All @@ -50,35 +39,38 @@ class FtpConnection(val addr: InetSocketAddress) extends FSM[FtpConnection.State
stay()
}
}
}

object FtpConnection {
import FtpClient._

private[FtpConnection] def extractMessage(dataLines: List[String]): List[(Int, String)] = {

when(ReceiveMulti) {
case Event(Tcp.Received(data), ctx: MultiData) => {
data.utf8String match {
case FtpClient.ResponsePattern(rawCode, rawMessage) => {
log.debug(s"Received EOL $rawCode $rawMessage")
val bytes = rawMessage.getBytes
ctx.data.append(rawMessage)
context.parent ! Response(rawCode.toInt, ctx.data.toString)
goto(ReceivePlain) using PlainData(ctx.connection)
}
case rawMessage: String => {
ctx.data.append(rawMessage)
stay()
}
def extractMultiLineData(accMsg: String, multiLineData: List[String]): (String, List[String]) =
multiLineData match {
case MultilineResponsePattern(_, msg) :: tail => extractMultiLineData(accMsg + "\n" + msg, tail)
case SingleLineResponsePattern(code, msg) :: tail => (accMsg + "\n" + msg, tail)
case Nil => (accMsg, Nil)
}

dataLines match {
case SingleLineResponsePattern(code, msg) :: tail =>
(code.toInt, msg) :: extractMessage(tail)

case MultilineResponsePattern(code, msg) :: tail =>
val (multiLineMessage, rest) = extractMultiLineData(msg, tail)
(code.toInt, multiLineMessage) :: extractMessage(rest)

case Nil => Nil
}
}
}

object FtpConnection {

case class Request(line: String)
case class Response(code: Int, line: String)

trait Data
case object Uninitialized extends Data
case class PlainData(connection: ActorRef) extends Data
case class MultiData(connection: ActorRef, data: StringBuffer) extends Data

trait State
case object Connecting extends State
Expand Down
Expand Up @@ -7,6 +7,8 @@ import scala.concurrent.duration._
import com.github.jarlakxen.reactive.ftp.FtpClient
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.scaladsl.Tcp._
import scala.concurrent.Future
import scala.util._

class FtpProtocolManager extends FSM[FtpProtocolManager.State, FtpProtocolManager.Data] with Stash {
Expand Down Expand Up @@ -87,13 +89,21 @@ class FtpProtocolManager extends FSM[FtpProtocolManager.State, FtpProtocolManage
log.debug(s"Connecting to ${addr.getHostString}:${addr.getPort} for transfer data.")
}

stay() using ctx.copy(dataAddr = address)
self ! StartTransfer

stay() using ctx.copy(socketFlow = address.map(Tcp().outgoingConnection(_)))
}
case Event(FtpConnection.Response(code, message), TransferContext(_, path, Some(addr), replayTo)) if code == 150 => {
case Event(StartTransfer, TransferContext(_, path, Some(socketFlow), replayTo)) => {
log.debug("Sending incoming transfer file to parent.")
replayTo ! FtpClient.DownloadInProgress(Source.repeat(ByteString.empty).via(Tcp().outgoingConnection(addr)))
replayTo ! FtpClient.DownloadInProgress(Source.repeat(ByteString.empty).via(socketFlow))
stay()
}

case Event(FtpConnection.Response(code, message), ctx: TransferContext) if code == 150 => {
log.debug("Accepted data connection.")
stay()
}

case Event(FtpConnection.Response(code, message), ctx: TransferContext) if code == 226 => {
log.debug("Closing data connection.")
ctx.replayTo ! FtpClient.DownloadSuccess
Expand All @@ -111,7 +121,7 @@ class FtpProtocolManager extends FSM[FtpProtocolManager.State, FtpProtocolManage
ctx.connection ! FtpConnection.Request("QUIT")
stay()
}
case Event(FtpConnection.Response(code, message), ctx: DisconnectContext) if code == 221=> {
case Event(FtpConnection.Response(code, message), ctx: DisconnectContext) if code == 221 => {
ctx.replayTo ! FtpClient.Disconnected
goto(Idle) using Uninitialized
}
Expand All @@ -138,10 +148,13 @@ class FtpProtocolManager extends FSM[FtpProtocolManager.State, FtpProtocolManage
log.debug(s"Connecting to $addr for transfer data.")
}

stay() using ctx.copy(dataAddr = address)
self ! StartTransfer

stay() using ctx.copy(socketFlow = address.map(Tcp().outgoingConnection(_)))
}
case Event(FtpConnection.Response(code, message), TransferContext(_, path, Some(addr), _)) if code == 150 => {
Source.repeat(ByteString.empty).via(Tcp().outgoingConnection(addr)).toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.right).run().andThen {

case Event(StartTransfer, TransferContext(_, path, Some(socketFlow), _)) => {
Source.repeat(ByteString.empty).via(socketFlow).toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.right).run().andThen {
case Success(data) =>
log.debug("Processing incoming data.")
self ! TransferBytes(data)
Expand All @@ -150,6 +163,12 @@ class FtpProtocolManager extends FSM[FtpProtocolManager.State, FtpProtocolManage
}
stay()
}

case Event(FtpConnection.Response(code, message), ctx: TransferContext) if code == 150 => {
log.debug("Accepted data connection.")
stay()
}

case Event(FtpConnection.Response(code, message), ctx: TransferContext) if code == 226 => {
log.debug("Closing data connection.")
stay()
Expand All @@ -167,7 +186,8 @@ class FtpProtocolManager extends FSM[FtpProtocolManager.State, FtpProtocolManage
unstashAll()
goto(Active) using ConnectionContext(ctx.connection)
}
case Event(_, _) => {
case Event(res, _) => {
log.debug(s"Stashing: $res")
stash()
stay()
}
Expand Down Expand Up @@ -198,9 +218,10 @@ object FtpProtocolManager {
case class Initializing(commander: ActorRef, authentication: Option[FtpClient.Authentication]) extends Data
case class AuthenticationContext(commander: ActorRef, connection: ActorRef, authentication: FtpClient.Authentication) extends Data
case class ConnectionContext(connection: ActorRef) extends Data
case class TransferContext(connection: ActorRef, path: String, dataAddr: Option[InetSocketAddress] = None, replayTo: ActorRef) extends Data
case class TransferContext(connection: ActorRef, path: String, socketFlow: Option[Flow[ByteString, ByteString, Future[OutgoingConnection]]] = None, replayTo: ActorRef) extends Data
case class DisconnectContext(connection: ActorRef, replayTo: ActorRef) extends Data
case class TransferBytes(data: ByteString)
case object StartTransfer

trait State
case object Idle extends State
Expand Down
Empty file.
1 change: 1 addition & 0 deletions src/test/resources/ftpusers/test1/file2.txt
@@ -0,0 +1 @@
something
4 changes: 2 additions & 2 deletions src/test/resources/logback.xml
@@ -1,13 +1,13 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %logger{36} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level %logger{50}.%M\(%line\) - %msg%n</pattern>
</encoder>
</appender>

<logger name="com.github.jarlakxen.reactive" level="DEBUG" />

<root level="INFO">
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
3 changes: 3 additions & 0 deletions src/test/resources/passwd/pureftpd.passwd
@@ -0,0 +1,3 @@
test1:$1$0Eqjzkw0$5dn80JVACWO/BeNT07jmd.:1000:1000::/home/ftpusers/test1/./::::::::::::
test2:$1$xHp8dxr0$sP1ct9E9tqNTJ.0eABzNO.:1000:1000::/home/ftpusers/test2/./::::::::::::
test3:$1$VWRxrtS0$eu0j0SlkDlPGDdEGdZRvN.:1000:1000::/home/ftpusers/test3/./::::::::::::
Expand Up @@ -4,6 +4,7 @@ import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

import org.junit.runner.RunWith
import org.specs2.concurrent.ExecutionEnv
import org.specs2.mutable.SpecificationLike
import org.specs2.runner.JUnitRunner
import org.specs2.specification.AfterAll
Expand All @@ -24,57 +25,66 @@ import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.util.ByteString

import com.github.jarlakxen.reactive.ftp._

@RunWith(classOf[JUnitRunner])
class FtpProtocolManagerSpec extends TestKit(ActorSystem("FtpProtocolManagerSpec")) with ImplicitSender with SpecificationLike with AfterAll {
class FtpProtocolManagerSpec(implicit ee: ExecutionEnv) extends TestKit(ActorSystem("FtpProtocolManagerSpec")) with DockerFTPSpec with ImplicitSender with SpecificationLike with AfterAll {
import FtpProtocolManagerSpec._
import FtpClient._
sequential

implicit val materializer = ActorMaterializer()

override def afterAll(): Unit = {
super.afterAll()
TestKit.shutdownActorSystem(system)
}

"FtpProtocolManager" >> {
"connect and disconnect" in new FTPContext("user", "password") {

"connect and disconnect" in {
val client = FtpClient()

client ! FtpClient.Connect("localhost", ftpPort, "user", "password")
client ! FtpClient.Connect("localhost", ftpPort, "test1", "test")

expectMsg(FtpClient.Connected)
expectMsg(FtpClient.AuthenticationSuccess)

client ! FtpClient.Disconnect

expectMsg(FtpClient.Disconnected)

ok
}

"connect, list and disconnect" in new FTPContext("user", "password", FTPFile("File1.txt"), FTPFile("File2.txt", Some("something")), FTPDir("somedir")) {
"connect, list and disconnect" in {
val client = FtpClient()

client ! FtpClient.Connect("localhost", ftpPort, "user", "password")
client ! FtpClient.Connect("localhost", ftpPort, "test1", "test")

expectMsg(FtpClient.Connected)
expectMsg(FtpClient.AuthenticationSuccess)

client ! FtpClient.Dir("/")

expectMsg(FtpClient.DirListing(List(FtpClient.FileInfo("File2.txt", 9, "none", "none", "rwxrwxrwx"), FtpClient.DirInfo("somedir", 0, "none", "none", "rwxrwxrwx"), FtpClient.FileInfo("File1.txt", 0, "none", "none", "rwxrwxrwx"))))
expectMsg(DirListing(List(FileInfo("file1.txt", 0, "1000", "ftpgroup", "rw-r--r--"), FileInfo("file2.txt", 10, "1000", "ftpgroup", "rw-r--r--"), DirInfo("somedir", 4096, "1000", "ftpgroup", "rwxr-xr-x"))))

client ! FtpClient.Disconnect

expectMsg(FtpClient.Disconnected)

ok
}

"connect, download and disconnect" in new FTPContext("user", "password", FTPFile("File1.txt"), FTPFile("File2.txt", Some("something"))) {
"connect, download and disconnect" in {
val client = FtpClient()

client ! FtpClient.Connect("localhost", ftpPort, "user", "password")
client ! FtpClient.Connect("localhost", ftpPort, "test1", "test")

expectMsg(FtpClient.Connected)
expectMsg(FtpClient.AuthenticationSuccess)

client ! FtpClient.Download("/File1.txt")
client ! FtpClient.Download("/file1.txt")

expectMsgPF(hint = """Expecting FtpClient.DownloadInProgress("")""") {
case FtpClient.DownloadInProgress(stream) =>
Expand All @@ -84,21 +94,22 @@ class FtpProtocolManagerSpec extends TestKit(ActorSystem("FtpProtocolManagerSpec

expectMsg(FtpClient.DownloadSuccess)

client ! FtpClient.Download("/File2.txt")
client ! FtpClient.Download("/file2.txt")

expectMsgPF(hint = """Expecting FtpClient.DownloadInProgress("something")""") {
case FtpClient.DownloadInProgress(stream) =>
val content = Await.result(stream.runWith(sinkUnderTest), 100 millis).utf8String
if (content == "something") ok else failure
if (content == "something\n") ok else failure
}

expectMsg(FtpClient.DownloadSuccess)

client ! FtpClient.Disconnect

expectMsg(FtpClient.Disconnected)

ok
}

}

}
Expand Down

0 comments on commit d42508f

Please sign in to comment.