Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize timeout tests. Fix twitter timeout. #131

Merged
merged 1 commit into from
Aug 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lazy val fetchJS = fetch.js

lazy val monix = crossProject
.in(file("monix"))
.dependsOn(fetch)
.dependsOn(fetch % "compile->compile;test->test")
.settings(name := "fetch-monix")
.jsSettings(sharedJsSettings: _*)
.crossDepSettings(commonCrossDependencies ++ monixCrossDependencies: _*)
Expand All @@ -40,7 +40,7 @@ lazy val debugJS = debug.js

lazy val twitter = crossProject
.in(file("twitter"))
.dependsOn(fetch)
.dependsOn(fetch % "compile->compile;test->test")
.crossDepSettings(commonCrossDependencies ++ twitterUtilDependencies: _*)

lazy val twitterJVM = twitter.jvm
Expand Down
56 changes: 56 additions & 0 deletions jvm/src/test/scala/FetchMonadErrorTimoutSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2016-2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch

import java.util.concurrent.TimeoutException
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.scalatest.{AsyncFlatSpecLike, Matchers}

// Note that this test cannot run on Scala.js

trait FetchMonadErrorTimeoutSpec[F[_]] { self: AsyncFlatSpecLike with Matchers =>

def runAsFuture[A](fa: F[A]): Future[A]

def fetchMonadError: FetchMonadError[F]

def delayQuery(timeout: Duration, delay: FiniteDuration): Query[Option[Int]] =
Query.async((ok, fail) => {
Thread.sleep(delay.toMillis)
ok(Some(1))
}, timeout)

"FetchMonadError" should "fail with timeout when a Query does not complete in time" in {
recoverToSucceededIf[TimeoutException] {
runAsFuture { fetchMonadError.runQuery(delayQuery(100.millis, 300.millis)) }
}
}

it should "not fail with timeout when a Query does complete in time" in {
runAsFuture {
fetchMonadError.runQuery(delayQuery(300.millis, 100.millis))
}.map(_ shouldEqual Some(1))
}

it should "not fail with timeout when infinite timeout specified" in {
runAsFuture {
fetchMonadError.runQuery(delayQuery(Duration.Inf, 100.millis))
}.map(_ shouldEqual Some(1))
}

}
66 changes: 6 additions & 60 deletions jvm/src/test/scala/FutureTimeoutTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,74 +16,20 @@

package fetch

import scala.concurrent._
import scala.concurrent.duration._
import org.scalatest._
import cats.data.NonEmptyList
import scala.concurrent.{ExecutionContext, Future}
import org.scalatest.{AsyncFlatSpec, Matchers}
import fetch.implicits._

// Note that this test cannot run on Scala.js

class FutureTimeoutTests
extends AsyncFlatSpec
with Matchers
with OptionValues
with Inside
with Inspectors {
with FetchMonadErrorTimeoutSpec[Future] {

implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit override val executionContext: ExecutionContext = ExecutionContext.Implicits.global

case class ArticleId(id: Int)
case class Article(id: Int, content: String)

def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] =
Fetch(ArticleId(id))

// A sample datasource with configurable delay and timeout

case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration)
extends DataSource[ArticleId, Article] {
override def name = "ArticleFuture"
override def fetchOne(id: ArticleId): Query[Option[Article]] =
Query.async((ok, fail) => {
Thread.sleep(delay.toMillis)
ok(Option(Article(id.id, "An article with id " + id.id)))
}, timeout)
override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] =
batchingNotSupported(ids)
}

"FetchMonadError[Future]" should "fail with timeout when a datasource does not complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

recoverToSucceededIf[TimeoutException] {
fut

}
}

it should "not fail with timeout when a datasource does complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}

it should "not fail with timeout when infinite timeout specified" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Inf, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}
def runAsFuture[A](fa: Future[A]): Future[A] = fa

def fetchMonadError: FetchMonadError[Future] = FetchMonadError[Future]
}
38 changes: 38 additions & 0 deletions monix/jvm/src/test/scala/FetchTaskTimoutTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2016-2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch.monixTask

import monix.eval.Task
import monix.execution.Scheduler
import scala.concurrent.{ExecutionContext, Future}
import org.scalatest.{AsyncFlatSpec, Matchers}
import fetch.{FetchMonadError, FetchMonadErrorTimeoutSpec}
import fetch.monixTask.implicits._

// Note that this test cannot run on Scala.js

class FetchTaskTimeoutTests
extends AsyncFlatSpec
with Matchers
with FetchMonadErrorTimeoutSpec[Task] {

implicit override val executionContext: Scheduler = Scheduler.Implicits.global

def runAsFuture[A](task: Task[A]): Future[A] = task.runAsync

def fetchMonadError: FetchMonadError[Task] = FetchMonadError[Task]
}
11 changes: 5 additions & 6 deletions monix/shared/src/test/scala/FetchTaskTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,20 @@
* limitations under the License.
*/

package fetch.monixTask

import monix.eval.Task
import monix.execution.Scheduler

import org.scalatest._

import org.scalatest.{AsyncFreeSpec, Matchers}
import cats.data.NonEmptyList
import cats.instances.list._
import scala.concurrent.Future

import fetch._
import fetch.monixTask.implicits._

import scala.concurrent.Future

class FetchTaskTests extends AsyncFreeSpec with Matchers {
implicit override def executionContext = Scheduler.Implicits.global
implicit override val executionContext = Scheduler.Implicits.global

case class ArticleId(id: Int)
case class Article(id: Int, content: String) {
Expand Down
26 changes: 10 additions & 16 deletions twitter/jvm/src/main/scala/TwitterFuture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package fetch.twitterFuture

import cats.{Always, Eval, Later, Now}
import com.twitter.util.{Duration, Future, FuturePool, JavaTimer, Promise, Timer}
import io.catbird.util._
import fetch._
import scala.concurrent.duration.FiniteDuration

object implicits {

import cats._
import com.twitter.util.{Duration, Future, FuturePool, Promise, Timer}
import io.catbird.util._
private[fetch] val timeoutTimer =
new JavaTimer(true, Some("fetch-twitter-future-timeout-daemon"))

def evalToRerunnable[A](e: Eval[A]): Rerunnable[A] = e match {
case Now(x) => Rerunnable.const(x)
Expand All @@ -36,18 +38,10 @@ object implicits {
implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool
): FetchMonadError[Rerunnable] =
new FetchMonadError.FromMonadError[Rerunnable] {
override def runQuery[A](j: Query[A]): Rerunnable[A] = j match {
override def runQuery[A](q: Query[A]): Rerunnable[A] = q match {
case Sync(e) => evalToRerunnable(e)
case Async(ac, timeout) =>
Rerunnable.fromFuture {
val p: Promise[A] = Promise()
pool(ac(p setValue _, p setException _))
timeout match {
case _: FiniteDuration =>
p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil)
case _ => p
}
}
case Async(_, _) =>
Rerunnable.fromFuture { fetchTwFutureMonadError(pool).runQuery(q) }
case Ap(qf, qx) =>
runQuery(qf).product(runQuery(qx)) map { case (f, x) => f(x) }
}
Expand All @@ -57,14 +51,14 @@ object implicits {
implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool
): FetchMonadError[Future] =
new FetchMonadError.FromMonadError[Future] {
override def runQuery[A](j: Query[A]): Future[A] = j match {
override def runQuery[A](q: Query[A]): Future[A] = q match {
case Sync(e) => Future(e.value)
case Async(ac, timeout) =>
val p: Promise[A] = Promise()
pool(ac(p setValue _, p setException _))
timeout match {
case _: FiniteDuration =>
p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil)
p.raiseWithin(Duration(timeout.length, timeout.unit))(timeoutTimer)
case _ => p
}
case Ap(qf, qx) =>
Expand Down
80 changes: 17 additions & 63 deletions twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,30 @@

package fetch.twitterFuture

import cats.data.NonEmptyList
import com.twitter.util.{Await, Duration, Future, TimeoutException}
import com.twitter.conversions.time._
import fetch._
import fetch.implicits._
import io.catbird.util.Rerunnable
import com.twitter.util.{ExecutorServiceFuturePool, FuturePool}
import scala.concurrent.{Future => ScalaFuture, ExecutionContext}
import org.scalatest.{AsyncFlatSpec, Matchers}

import fetch.{FetchMonadError, FetchMonadErrorTimeoutSpec}
import fetch.twitterFuture.implicits._
import io.catbird.util._
import org.scalatest._

class RerunnableTimeoutSpac
extends FlatSpec
class RerunnableTimeoutSpec
extends AsyncFlatSpec
with Matchers
with OptionValues
with Inside
with Inspectors {

case class ArticleId(id: Int)
case class Article(id: Int, content: String)

def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] =
Fetch(ArticleId(id))

// A sample datasource with configurable delay and timeout

case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration)
extends DataSource[ArticleId, Article] {
override def name = "ArticleRerunnable"
override def fetchOne(id: ArticleId): Query[Option[Article]] =
Query.async(
(ok, fail) => {
Thread.sleep(delay.inMillis)
ok(Option(Article(id.id, "An article with id " + id.id)))
},
scala.concurrent.duration.Duration.fromNanos(timeout.inNanoseconds)
)
override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] =
batchingNotSupported(ids)
}

"FetchMonadError[Rerunnable]" should "fail with timeout when a datasource does not complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)
with FetchMonadErrorTimeoutSpec[Rerunnable] {

assertThrows[TimeoutException] {
Await.result(fut.run, 1 seconds)
}
implicit val pool: FuturePool = FuturePool.interruptibleUnboundedPool

implicit override val executionContext: ExecutionContext = {
val executor = pool.asInstanceOf[ExecutorServiceFuturePool].executor
ExecutionContext.fromExecutorService(executor)
}

it should "not fail with timeout when a datasource does complete in time" in {
def runAsFuture[A](rerun: Rerunnable[A]): ScalaFuture[A] =
Convert.twitterToScalaFuture(rerun.run)

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}

it should "not fail with timeout when infinite timeout specified" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Top, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}
def fetchMonadError: FetchMonadError[Rerunnable] =
FetchMonadError[Rerunnable]

}
Loading