Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed IMAP when fetching multiple headers and getting body description. #3

Merged
merged 1 commit into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,14 @@ object IMAPClient {
Stream.eval(async.boundedQueue[F, IMAPData](bufferLines)) flatMap { incomingQ =>

val received =
Stream.eval_(F.delay{ println("STARTING THE RECEIVED RUNNER ")}) ++
((
socket.reads(maxReadBytes, None) through
lines map { s => println(s">>> $s") ; s } through
incomingQ.enqueue
) interruptWhen terminated)
.onError(err => Stream.eval_(F.delay(println(s"FAILED: $err"))) ++ Stream.fail(err))
.onFinalize { F.delay { println("DONE WITH RUNNER") }}
(
socket.reads(maxReadBytes, None) through
lines through
incomingQ.enqueue
) interruptWhen terminated

val handshakeInitial =
incomingQ.dequeue through concatLines takeWhile { ! _.startsWith("* OK") } onFinalize (requestSemaphore.increment map { _ => println("RELEASED RQ LOCK") })
incomingQ.dequeue through concatLines takeWhile { ! _.startsWith("* OK") } onFinalize (requestSemaphore.increment)

def send(line: String): F[Unit] =
socket.write(Chunk.bytes(line.getBytes), None)
Expand Down Expand Up @@ -284,7 +281,6 @@ object IMAPClient {
Stream.eval_(requestSemaphore.decrement) ++
Stream.eval(idxRef.modify { _ + 1 } map { c => java.lang.Long.toHexString(c.now) }) flatMap { tag =>
val commandLine = s"$tag ${cmd.asIMAPv4}\r\n"
println(s"<<<: $commandLine")

def unlock = Stream.eval(requestSemaphore.increment)

Expand All @@ -298,10 +294,10 @@ object IMAPClient {
} else {
Stream(Right(
Stream.emit[F, IMAPData](IMAPText(resp)) ++
tail map { x => println(s"QQQQ $x"); x } takeThrough {
tail.takeThrough {
case IMAPText(l) => ! l.startsWith(tag)
case _ => true
} dropLastIf {
}.dropLastIf {
case IMAPText(l) => l.startsWith(tag)
case _ => false
} onFinalize { requestSemaphore.increment }
Expand Down Expand Up @@ -336,7 +332,6 @@ object IMAPClient {
(s through concatLines).runLog map { acc =>
acc.map { s =>
val line = s.dropWhile { c => c != '*'}
println(s"XXXX $line")
if (line.headOption.contains('*')) line.tail
else line
}
Expand Down Expand Up @@ -377,16 +372,22 @@ object IMAPClient {
}

/** parses reult of FETCH xyz (BODYSTRUCTURE) request **/
def parseBodyStructure[F[_]](lines: Seq[String])(implicit F: Effect[F]): F[Seq[EmailBodyPart]] = {
IMAPBodyPartCodec.bodyStructure.decode(BitVector.view(lines.mkString.getBytes)) match {
case Attempt.Successful(result) => F.pure(EmailBodyPart.flatten(result.value))
case Attempt.Failure(err) => F.fail(new Throwable(s"failed to decode BODYSTRUCTURE: $err ($lines)"))
def parseBodyStructure[F[_]](lines: Seq[String])(implicit F: Effect[F]): F[Seq[EmailBodyPart]] = {
val line = lines.mkString
val indexStart = line.indexOf("(")
if (indexStart < 0) F.fail(new Throwable("Could not find start of body structure."))
else {
IMAPBodyPartCodec.bodyStructure.decode(BitVector.view(line.drop(indexStart).getBytes)) match {
case Attempt.Successful(result) => F.pure(EmailBodyPart.flatten(result.value))
case Attempt.Failure(err) => F.fail(new Throwable(s"failed to decode BODYSTRUCTURE: $err ($lines)"))
}
}
}





/**
* Causes to transform data from the fetch command to Binary content (Bytes)
* given the specific contained in pa
Expand Down Expand Up @@ -565,10 +566,11 @@ object IMAPClient {
def collectBytes( recordIdx: Int, key: String, in: Stream[F, IMAPData]): Stream[F, (Int, String, IMAPData)] = {
def go(curr: Stream[F, IMAPData]): Stream[F, (Int, String, IMAPData)] = {
curr.uncons1 flatMap {
case Some((d@IMAPBytes(bs), tail)) =>
case Some((d: IMAPBytes, tail)) =>
Stream.emit((recordIdx, key, d)) ++ go(tail)

case Some((IMAPText(l), _)) => findEntry(recordIdx, curr)
case Some((d: IMAPText, tail)) =>
findEntry(recordIdx, Stream.emit(d) ++ tail)

case None =>
Stream.fail(new Throwable(s"Expected end of bytes for record: $recordIdx, key: $key, but EOF reached"))
Expand All @@ -579,7 +581,7 @@ object IMAPClient {
case Some((d@IMAPBytes(bs), tail)) =>
Stream.emit((recordIdx, key, d)) ++ go(tail)

case Some((IMAPText(l), tail)) =>
case Some((IMAPText(l), _)) =>
Stream.fail(new Throwable(s"Expected bytes for record: $recordIdx, key: $key, but got $l"))

case None =>
Expand Down Expand Up @@ -743,9 +745,5 @@ object IMAPClient {

s => collectLines(ByteVector.empty, s).scope
}



}

}
10 changes: 5 additions & 5 deletions src/main/scala/spinoco/fs2/mail/imap/IMAPCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@ object IMAPCommand {
}

final case object Logout extends IMAPCommand {
def asIMAPv4: String = s"LOGOUT"
def asIMAPv4: String = "LOGOUT"
}

final case object Capability extends IMAPCommand {
def asIMAPv4: String = s"CAPABILITY"
def asIMAPv4: String = "CAPABILITY"
}

final case class ListMailbox(reference:String, wildcard: String) extends IMAPCommand {
def asIMAPv4: String = s"""LIST "$reference" "$wildcard""""
}

final case class Examine(name: String @@ MailboxName) extends IMAPCommand {
def asIMAPv4: String = s"EXAMINE $name"
def asIMAPv4: String = s"""EXAMINE "$name""""
}


final case class Select(name: String @@ MailboxName) extends IMAPCommand {
def asIMAPv4: String = s"SELECT $name"
def asIMAPv4: String = s"""SELECT "$name""""
}

final case class Search(charset: Option[String], term: IMAPSearchTerm) extends IMAPCommand {
def asIMAPv4: String = charset match {
case None => s"SEARCH ${term.term}"
case Some(chset) => s"SEARCH CHARSET $chset $term"
case Some(chset) => s"SEARCH CHARSET $chset ${term.term}"
}
}

Expand Down
61 changes: 61 additions & 0 deletions src/test/scala/spinoco/fs2/mail/imap/IMAPCommandSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package spinoco.fs2.mail.imap

import org.scalacheck.Properties
import org.scalacheck.Prop._
import shapeless.tag
import spinoco.fs2.mail.imap.IMAPCommand._


object IMAPCommandSpec extends Properties("IMAPCommand"){

property("login") = protect{

LoginPlainText("user1", "pass1").asIMAPv4 ?= "LOGIN user1 pass1"
}

property("logout") = protect{

Logout.asIMAPv4 ?= "LOGOUT"
}

property("capability") = protect{

Capability.asIMAPv4 ?= "CAPABILITY"
}

property("listMailbox") = protect{

ListMailbox("base", "match").asIMAPv4 ?= """LIST "base" "match""""
}

property("examine") = protect{

Examine(tag[MailboxName]("box1")).asIMAPv4 ?= """EXAMINE "box1""""
}

property("select") = protect{

Select(tag[MailboxName]("box2")).asIMAPv4 ?= """SELECT "box2""""
}

property("search.no-charset") = protect{

Search(None, IMAPSearchTerm.ALL).asIMAPv4 ?= "SEARCH ALL"
}

property("search.charset") = protect{

Search(Some("UTF-8"), IMAPSearchTerm.ALL).asIMAPv4 ?= "SEARCH CHARSET UTF-8 ALL"
}

property("fetch.exact") = protect{

Fetch(1l to 1l, Nil).asIMAPv4 ?= "FETCH 1 ()"
}

property("fetch.range") = protect{

Fetch(1l to 10l, Nil).asIMAPv4 ?= "FETCH 1:10 ()"
}

}