Permalink
Browse files

ported map etc.

  • Loading branch information...
1 parent 66c813c commit 2904d30bc46a08f2a7666eb4a79478d2650403c2 @eed3si9n committed Aug 26, 2011
View
@@ -15,6 +15,7 @@ object Builds extends Build {
libraryDependencies ++= Seq(
"org.specs2" %% "specs2" % "1.5" % "test"
),
+ scalacOptions += "-unchecked",
publishArtifact in (Compile, packageBin) := true,
publishArtifact in (Test, packageBin) := false,
publishArtifact in (Compile, packageDoc) := false,
@@ -3,30 +3,34 @@ package sff4s.impl
import sff4s._
object ActorsFuture extends Futures {
- def future[A](result: => A): Future[A] = new WrappedActorsFuture(
- scala.actors.Futures.future(result)
- )
+ def futureEither[A](result: => Either[Throwable, A]): Future[A] =
+ new WrappedActorsFuture(scala.actors.Futures future { result })
}
-class WrappedActorsFuture[A](val underlying: scala.actors.Future[A]) extends Future[A] {
- def isDefined = underlying.isSet
+class WrappedActorsFuture[A](val underlying: scala.actors.Future[Either[Throwable, A]]) extends Future[A] {
+ val factory = ActorsFuture
- def get(timeoutInMsec: Long): Either[Throwable, A] =
- if (timeoutInMsec < 0)
- try {
- Right(underlying.apply())
- }
- catch {
- case e: Throwable => Left(e)
- }
- else
- try {
- scala.actors.Futures.awaitAll(timeoutInMsec, underlying).head match {
- case Some(value) => Right(value.asInstanceOf[A])
- case _ => Left(new TimeoutException(timeoutInMsec))
- }
- }
- catch {
- case e: Throwable => Left(e)
- }
+ def get: Either[Throwable, A] =
+ try {
+ underlying.apply()
+ } catch {
+ case e: Throwable => Left(e)
+ }
+
+ def get(timeoutInMsec: Long): Either[Throwable, A] = {
+ val x = scala.actors.Futures.awaitAll(timeoutInMsec, underlying).head
+ x match {
+ case Some(value) => value.asInstanceOf[Either[Throwable, A]]
+ case None => Left(new TimeoutException(timeoutInMsec))
+ }
+ }
+
+ def isDefined = underlying.isSet
+}
+
+class WrappedActorsThrowable[A](val underlying: Throwable) extends Future[A] {
+ val factory = ActorsFuture
+ def get: Either[Throwable, A] = Left(underlying)
+ def get(timeoutInMsec: Long): Either[Throwable, A] = get
+ def isDefined = true
}
@@ -1,7 +1,8 @@
package sff4s
trait Futures {
- def future[A](result: => A): Future[A]
+ def future[A](result: => A): Future[A] = futureEither(Right(result))
+ def futureEither[A](result: => Either[Throwable, A]): Future[A]
}
object Future {
@@ -10,33 +11,96 @@ object Future {
/**
* A computation evaluated asynchronously.
+ * Much of the code taken from https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala.
*/
-abstract class Future[+A] {
+abstract class Future[+A] {
+ protected def factory: Futures
+
/**
* Block indefinitely, wait for the result of the Future to be available.
*/
- def apply(): A = apply(Future.DEFAULT_TIMEOUT)
+ def apply(): A = get.fold(throw _, x => x)
/**
* Block as long as the given timeoutInMsec if non-negative, otherwise indefinitely.
*/
- def apply(timeoutInMsec: Long): A = get(timeoutInMsec) match {
- case Right(value) => value
- case Left(e) => throw e
- }
+ def apply(timeoutInMsec: Long): A = get(timeoutInMsec).fold(throw _, x => x)
+ /**
+ * Demands that the result of the future be available, and blocks indefinitly.
+ */
+ def get: Either[Throwable, A]
+
/**
* Demands that the result of the future be available within `timeoutInMsec`.
* The result is a Right[A] or Left[Throwable] depending upon whether the computation
* finished in time.
*/
- def get(timeoutInMsec: Long): Either[Throwable, A]
+ def get(timeoutInMsec: Long): Either[Throwable, A]
+
+ /**
+ * When the computation completes, invoke the given callback function.
+ */
+ def respond(k: Either[Throwable, A] => Unit): Future[A] =
+ factory.futureEither {
+ val result = get
+ k(result)
+ result
+ }
+
+ /**
+ * Invoke the callback only if the Future returns successfully. Useful for Scala for comprehensions.
+ * Use onSuccess instead of this method for more readable code.
+ */
+ def foreach(f: A => Unit) { onSuccess(f) }
+
+ /**
+ * Returns the given function applied to the value from this Right or returns this if this is a Left.
+ */
+ def flatMap[B](f: A => Future[B]): Future[B] = factory.futureEither { get fold (Left(_), f(_).get) }
+
+ /**
+ * Maps the given function to the value from this Right or returns this if this is a Left.
+ */
+ def map[B](f: A => B): Future[B] = factory.futureEither { get.right map {f} }
+
+ /**
+ * Converts this to a Left if the predicate does not obtain.
+ */
+ def filter(p: A => Boolean): Future[A] = factory.futureEither { get fold (Left(_),
+ value =>
+ if (p(value)) Right(value)
+ else Left(PredicateException())) }
+
+ /**
+ * Invoke the function on the result, if the computation was
+ * successful. Returns a chained Future as in `respond`.
+ *
+ * @return chained Future
+ */
+ def onSuccess(f: A => Unit): Future[A] =
+ respond {
+ case Right(value) => f(value)
+ case _ =>
+ }
+
+ /**
+ * Invoke the function on the error, if the computation was
+ * unsuccessful. Returns a chained Future as in `respond`.
+ *
+ * @return chained Future
+ */
+ def onFailure(rescueException: Throwable => Unit): Future[A] =
+ respond {
+ case Left(e) => rescueException(e)
+ case _ =>
+ }
/**
* Is the result of the Future available yet?
*/
def isDefined: Boolean
}
-class TimeoutException(val timeoutInMsec: Long) extends Exception(timeoutInMsec.toString) {
-}
+case class TimeoutException(val timeoutInMsec: Long) extends Exception(timeoutInMsec.toString)
+case class PredicateException() extends Exception()
@@ -6,24 +6,38 @@ trait FutureSpec extends Specification { def is =
"This is a specification to check a future" ^
p^
"The sample future should" ^
- "be not be defined right away" ! e1^
- "evaluate to 1 eventually " ! e2^
- "be defined eventually" ! e3^
- "throw a TimeoutException for 50 msec wait" ! e4^
+ "behave like a future" ^ isFuture(future, 1)^
+ endp^
+ "The chained future should" ^
+ "behave like a future" ^ isFuture(mapped, 2)^
end
- def e1 = future.isDefined must beFalse
- def e2 = future() must be_==(1)
- def e3 = {
- val f = future
- f()
- f.isDefined must beTrue
+ def isFuture(v: => Future[Int], n: Int) =
+ "not be defined right away" ! F(v, n).e1^
+ "evaluate to %d eventually".format(n) ! F(v, n).e2^
+ "be defined eventually" ! F(v, n).e3^
+ "throw a TimeoutException for 50 msec wait" ! F(v, n).e4
+
+ case class F(v: Future[Int], n: Int) {
+ def e1 = v.isDefined must beFalse
+ def e2 = v() must be_==(n)
+ def e3 = {
+ val f = v
+ f()
+ f.isDefined must beTrue
+ }
+ def e4 = v(50) must throwA[TimeoutException]
}
- def e4 = future(50) must throwA[TimeoutException]
def factory: Futures
- def future: Future[Int] = factory future {
- Thread.sleep(100)
- 1
- }
+ def future: Future[Int] =
+ factory future {
+ Thread.sleep(100)
+ 1
+ }
+ def mapped =
+ future map { result =>
+ Thread.sleep(100)
+ result + 1
+ }
}

0 comments on commit 2904d30

Please sign in to comment.