Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Feature/idle #9

Open
wants to merge 3 commits into
base: series/0.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 98 additions & 21 deletions src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import fs2._
import fs2.async.mutable.Semaphore
import fs2.io.tcp.Socket
import fs2.util.{Async, Catchable, Effect, Monad}
import fs2.util._
import fs2.util.syntax._
import scodec.{Attempt, Codec}
import scodec.bits.{BitVector, ByteVector}
Expand Down Expand Up @@ -109,7 +109,10 @@ trait IMAPClient[F[_]] {
def bodyStructureOf(uid: Long @@ MailUID): F[IMAPResult[Seq[EmailBodyPart]]]

/**
* Allows to fetch bytes of given mime binary part
* Allows to fetch bytes of given mime binary part.
*
* All other commands will fail while this command is being processed. As to prevent deadlock.
*
* @param uid Id of message
* @param part Binary part specification. The data in binary part specification will be used
* to parse conent to stream of bytes.
Expand All @@ -118,14 +121,29 @@ trait IMAPClient[F[_]] {
def bytesOf(uid: Long @@ MailUID, part: EmailBodyPart.BinaryPart): Stream[F, Byte]

/**
* Allows to fetch textual representation of given mime part
* Allows to fetch textual representation of given mime part.
*
* All other commands will fail while this command is being processed. As to prevent deadlock.
*
* @param uid Id of message
* @param part Textual part specification. The data in specification will be used to decode
* text to resulting stream of strings.
* @return
*/
def textOf(uid: Long @@ MailUID, part: EmailBodyPart.TextPart): Stream[F, Char]

/**
* Causes to perform Idle command against the server as per RFC 2177.
*
* All other commands will fail while this command is being processed. As to prevent deadlock.
*
* The idle command should be restarted atleast every 29 minutes to prevent the remote server
* logging out the user due to inactivity. In reality the idle should be restarted more often
* as real servers have much shorter inactivity timeouts.
*
*/
def idle: Stream[F, IMAPIdleContext[F]]

}

/**
Expand Down Expand Up @@ -153,6 +171,7 @@ object IMAPClient {
Stream.eval(Async.refOf[F, Long](0l)) flatMap { idxRef =>
Stream.eval(async.semaphore(0)) flatMap { requestSemaphore =>
Stream.eval(async.boundedQueue[F, IMAPData](bufferLines)) flatMap { incomingQ =>
Stream.eval(F.refOf(false)) flatMap { inProcessing =>

val received =
(
Expand All @@ -169,50 +188,84 @@ object IMAPClient {

val request = requestCmd(idxRef, requestSemaphore, incomingQ.dequeue, send) _

/**
* Guards a request to the server by checking whether there is currently connection blocking command running.
* If such command is running it is impossible to interrupt it.
* As such this request would wait on the command to finish. If this command was called as result of some partial
* data from the "blocking" command then we would get into dead lock.
* Thus we rather fail.
*
* Do note that the connection blocking commands are only that return stream.
*/
def guardedRequest(cmd: IMAPCommand): RequestResult[F] = {
Stream.eval(inProcessing.get).flatMap{
case false => request(cmd)
case true => Stream.fail(new Throwable("Cannot perform action on IMAP client since we are in midst of another IMAP streaming"))
}
}

val client =
new IMAPClient[F] {
def login(userName: String, password: String) =
shortContent(request(LoginPlainText(userName, password)))(parseLogin[F])
shortContent(guardedRequest(LoginPlainText(userName, password)))(parseLogin[F])

def logout =
shortContent(request(Logout)) { _ => F.pure(()) } as (())
shortContent(guardedRequest(Logout)) { _ => F.pure(()) } as (())

def capability =
shortContent(request(Capability))(parseCapability[F])
shortContent(guardedRequest(Capability))(parseCapability[F])

def select(mailbox: @@[String, MailboxName]) =
shortContent(request(Select(mailbox)))(parseSelect[F])
shortContent(guardedRequest(Select(mailbox)))(parseSelect[F])

def examine(mailbox: @@[String, MailboxName]) =
shortContent(request(Examine(mailbox)))(parseSelect[F])
shortContent(guardedRequest(Examine(mailbox)))(parseSelect[F])

def list(reference: String, wildcardName: String): F[IMAPResult[Seq[IMAPMailbox]]] =
shortContent(request(ListMailbox(reference, wildcardName)))(parseMailboxList[F])
shortContent(guardedRequest(ListMailbox(reference, wildcardName)))(parseMailboxList[F])

def search(term: IMAPSearchTerm, charset: Option[String] = None): F[IMAPResult[Seq[Long @@ MailUID]]] =
shortContent(request(Search(charset, term)))(parseSearchResult[F])
shortContent(guardedRequest(Search(charset, term)))(parseSearchResult[F])

def emailHeaders(range: NumericRange[Long]): F[Vector[IMAPEmailHeader]] =
rawContent(request(Fetch(range, Seq(IMAPFetchContent.UID, IMAPFetchContent.Body(BodySection.HEADER)))))
rawContent(guardedRequest(Fetch(range, Seq(IMAPFetchContent.UID, IMAPFetchContent.Body(BodySection.HEADER)))))
.through(fetchLog)
.through(mkEmailHeader(emailHeaderCodec))
.runFold(Vector.empty[IMAPEmailHeader])(_ :+ _)

def bodyStructureOf(uid: @@[Long, MailUID]): F[IMAPResult[Seq[EmailBodyPart]]] =
shortContent(request(Fetch(NumericRange(uid:Long, uid:Long, 1), Seq(IMAPFetchContent.BODYSTRUCTURE))))(parseBodyStructure[F])
shortContent(guardedRequest(Fetch(NumericRange(uid:Long, uid:Long, 1), Seq(IMAPFetchContent.BODYSTRUCTURE))))(parseBodyStructure[F])

def bytesOf(uid: @@[Long, MailUID], part: EmailBodyPart.BinaryPart): Stream[F, Byte] = {
val content = IMAPFetchContent.Body(BodySection(part.partId))
rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through
fetchBytesOf(0, content.content, part.tpe.fields.encoding)
impl.guardStream(
inProcessing
, rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through
fetchBytesOf(0, content.content, part.tpe.fields.encoding)
)
}

def textOf(uid: @@[Long, MailUID], part: EmailBodyPart.TextPart): Stream[F, Char] = {
val content = IMAPFetchContent.Body(BodySection(part.partId))
rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through
fetchTextOf(0, content.content, part.tpe.fields.encoding, part.charsetName)
impl.guardStream(
inProcessing
, rawContent(request(Fetch(NumericRange(uid: Long, uid: Long, 1), Seq(content)))) through
fetchTextOf(0, content.content, part.tpe.fields.encoding, part.charsetName)
)
}
}

def idle: Stream[F, IMAPIdleContext[F]] = {
impl.guardStream(
inProcessing
, request(Idle).flatMap{
case Left(err) => Stream.fail(new Throwable(s"Failed to perform the command: $err"))
case Right(data) => Stream.eval(IMAPIdleContext.mk(data, send))
}
)
}


}

concurrent.join(Int.MaxValue)(Stream(
Stream.emit(client)
Expand All @@ -221,13 +274,37 @@ object IMAPClient {
))
.interruptWhen(terminated)
.onFinalize(terminated.set(true))
}}}}
}}}}}
}



object impl {

/**
* Guards a stream stream against a given ref. Which signals whether there is currently blocking connection
* thus our stream cannot be run.
*
* If the stream is allowed to be run, it itself sets the blocking to prevent other stream to be run while this stream
* is executing.
*
* @param runningGuard The guard that tells us whether there is an existing blocking connection.
* @param stream The stream to be run if there is currently no blocking connection
*/
def guardStream[F[_], A](
runningGuard: Async.Ref[F, Boolean]
, stream: Stream[F, A]
)(implicit F: Applicative[F]): Stream[F, A] = {
Stream.bracket(runningGuard.modify(_ => true))(
change => {
if (change.previous) Stream.fail(new Throwable("Cannot perform action on IMAP client since we are in midst of another IMAP streaming"))
else stream
}
, change => if (change.previous) F.pure(()) else runningGuard.setPure(false)
)
}


type RequestResult[F[_]] = Stream[F, Either[String, Stream[F, IMAPData]]]

/**
Expand Down Expand Up @@ -299,9 +376,9 @@ object IMAPClient {
tail.takeThrough {
case IMAPText(l) => ! l.startsWith(tag)
case _ => true
}.dropLastIf {
case IMAPText(l) => l.startsWith(tag)
case _ => false
}.filter {
case IMAPText(l) => !l.startsWith(tag)
case _ => true
} onFinalize { requestSemaphore.increment }
))
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/spinoco/fs2/mail/imap/IMAPCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ object IMAPCommand {
}
}


final case class Fetch(range: NumericRange[Long], content: Seq[IMAPFetchContent]) extends IMAPCommand {
def asIMAPv4: String = {
val contentString = s"(${content.map(_.content).mkString(" ")})"
Expand All @@ -48,6 +47,10 @@ object IMAPCommand {
}
}

final case object Idle extends IMAPCommand {
def asIMAPv4: String = "IDLE"
}

}


Expand Down
91 changes: 91 additions & 0 deletions src/main/scala/spinoco/fs2/mail/imap/IMAPIdleContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package spinoco.fs2.mail.imap

import fs2.Stream
import fs2.util.Async
import shapeless.tag
import spinoco.fs2.mail.imap.IMAPClient.impl.{IMAPBytes, IMAPData, IMAPText}

/**
* The context of an idling IMAP server connection that is ready to receive notifications from server.
*/
trait IMAPIdleContext[F[_]] {

/**
* The IMAP IDLE events that are being produced as result of calling the IDLE command.
*
* This call can be made only once. As this is not backed by a [[fs2.async.mutable.Topic]], thus
* the data of each call of "events" would be different.
*
* Thus this will fail any subsequent calls to events.
*/
def events: Stream[F, IMAPIdleMessage]

/**
* Closes the IDLE connection with server.
*
* This returns as soon as it writes to the socket.
* To get the end of the idle output await on the events stream, which will
* end as soon as the server stops feeding the data to the client.
*/
def done: F[Unit]

}

object IMAPIdleContext {

/**
* Creates a IDLE context that has events method guarded against multiple calls to it.
*
* @param incoming The data from server as result of IDLE command.
* @param writeToServer A method to write to server.
*/
def mk[F[_]](
incoming: Stream[F, IMAPData]
, writeToServer: String => F[Unit]
)(
implicit F: Async[F]
): F[IMAPIdleContext[F]] = {
F.map(F.refOf(false)) { startedRead =>
new IMAPIdleContext[F] {
def done: F[Unit] = writeToServer("DONE\r\n")

def events: Stream[F, IMAPIdleMessage] = {
Stream.eval(startedRead.modify(_ => true)).flatMap { change =>
if (change.previous) Stream.fail(new Throwable("Cannot try to subscribe to idle events for the second time"))
else impl.makeIdleEvents(incoming)
}
}
}

}
}


object impl {

/**
* Parses out the IDLE IMAP events from preprocessed server data.
*
* @param in The data from the server already split by lines.
*/
def makeIdleEvents[F[_]](in: Stream[F, IMAPData]): Stream[F, IMAPIdleMessage] = {
in.flatMap{
case _: IMAPBytes => Stream.empty //IDLE cannot give bytes, maybe fail here?
case IMAPText(line) =>
val trimmed = line.trim
if (trimmed.startsWith("+ idling")) Stream.empty //Notified about now idling, server accepted we can carry on
else if (trimmed.startsWith("*")) {
val dropped = trimmed.drop(1).trim
val splitIdx = dropped.indexOf(" ")
if (splitIdx < 0) Stream.fail(new Throwable(s"Could not find split between email id and command while parsing IMAP IDLE line: $line"))
else {
val (email, messageType) = dropped.splitAt(splitIdx)
val emailUid = tag[MailUID](email.toLong)
Stream.emit(IMAPIdleMessage.parse(messageType.trim, emailUid))

}
} else Stream.fail(new Throwable(s"Line starts wih an unknown character while parsing IMAP IDLE line: $line"))
}
}
}
}
42 changes: 42 additions & 0 deletions src/main/scala/spinoco/fs2/mail/imap/IMAPIdleMessage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package spinoco.fs2.mail.imap

import shapeless.tag.@@


object IMAPIdleMessage {

/**
* Notification about new email with a given id.
* @param uid The id of the new email.
*/
case class Exists(uid: Long @@ MailUID) extends IMAPIdleMessage

/**
* Notifies a given email being deleted on the server.
* @param uid The id of the email that was deleted.
*/
case class Expunge(uid: Long @@ MailUID) extends IMAPIdleMessage

/**
* A message with a type that we are not expecting appeared.
* @param uid The id of the email this message concerns.
*/
case class CustomMessage(name: String, uid: Long @@ MailUID) extends IMAPIdleMessage

/**
* Build the proper message from the provided message type.
* @param messageType The type of the message to be constructed.
* @param emailUid The id of the message which this message concerns.
* @return
*/
def parse(messageType: String, emailUid: Long @@ MailUID): IMAPIdleMessage = {
messageType match {
case "EXISTS" => Exists(emailUid)
case "EXPUNGE" => Expunge(emailUid)
case other => CustomMessage(other, emailUid)
}
}
}

/** Messages that the server pushes to the client while the client requested IDLE **/
sealed trait IMAPIdleMessage
Loading