Skip to content

Commit

Permalink
update fs2 to 1.0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
gafiatulin committed Jul 12, 2018
1 parent e0ebde8 commit 103fa6c
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 77 deletions.
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Here is a simple example of the use of a circuit breaker to protect against casc

```scala
import cats.effect.IO
import fs2.{ Scheduler, Stream }
import fs2.Stream

import java.io.IOException
import java.util.UUID
Expand Down Expand Up @@ -112,8 +112,7 @@ class DatabaseService(cbRegistry: CircuitBreakerRegistry[IO], circuitBreakerSett
* above.
*/
val protectedSystem = (for {
scheduler <- Scheduler[IO](1)
cbRegistry <- Stream.eval(CircuitBreakerRegistry.create[IO](circuitBreakerRegistrySettings, scheduler))
cbRegistry <- Stream.eval(CircuitBreakerRegistry.create[IO](circuitBreakerRegistrySettings))
dbService = new DatabaseService(cbRegistry, databaseCircuitBreakerSettings)
result <- Stream.eval(dbService.getAllQuarterlyProductSales)
} yield ()).runLast.map(_.getOrElse(Vector.empty))
Expand All @@ -131,8 +130,8 @@ protectedSystem.unsafeRunSync()
Interested processes can subscribe to events related to a system's circuit breaker activity. There is an example of its use.

```scala
import cats.effect.IO
import fs2.{ async, Stream }
import cats.effect.{Concurrent, IO}
import fs2.Stream
import com.ccadllc.cedi.circuitbreaker.{ CircuitBreakerRegistry, CircuitBreaker }
import com.ccadllc.cedi.circuitbreaker.statistics.{ FailureStatistics, FlowControlStatistics }

Expand All @@ -151,16 +150,15 @@ def monitorCircuitBreakerEvents(cbRegistry: CircuitBreakerRegistry[IO]): IO[Unit
val eventStream = cbRegistry.events(maxQueuedEvents) flatMap { event =>
Stream.eval_(processCircuitBreakerEvent(event))
}
async.start(eventStream.run) map { _ => () }
Concurrent[IO].start(eventStream.run) map { _ => () }
}

/*
* Near the beginning of the universe, create a circuit breaker registry for the
* system to use. The circuit breaker is passed configuration settings, as described above.
*/
val protectedSystem = (for {
scheduler <- Scheduler[IO](1)
cbRegistry <- Stream.eval(CircuitBreakerRegistry.create[IO](circuitBreakerRegistrySettings, scheduler))
cbRegistry <- Stream.eval(CircuitBreakerRegistry.create[IO](circuitBreakerRegistrySettings))
dbService = new DatabaseService(cbRegistry, databaseCircuitBreakerSettings)
result <- Stream.eval(dbService.getAllQuarterlyProductSales)
} yield ()).runLast.map(_.getOrElse(Vector.empty))
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ lazy val core = project.in(file("core")).enablePlugins(SbtOsgi).
name := "circuitbreaker",
libraryDependencies ++= Seq(
"com.ccadllc.cedi" %% "config" % "1.1.0",
"co.fs2" %% "fs2-core" % "0.10.0",
"org.scalatest" %% "scalatest" % "3.0.4" % "test"
"co.fs2" %% "fs2-core" % "1.0.0-M1",
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
),
buildOsgiBundle("com.ccadllc.cedi.circuitbreaker"),
scalacOptions ++= (if (scalaBinaryVersion.value startsWith "2.11") List("-Xexperimental") else Nil) // 2.11 needs -Xexperimental to enable SAM conversion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,17 @@
*/
package com.ccadllc.cedi.circuitbreaker

import cats.effect.{ Effect, Sync }
import cats.effect.{ Concurrent, Sync, Timer }
import cats.implicits._

import fs2._
import fs2.async.mutable.{ Signal, Topic }

import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

import scala.language.higherKinds

import CircuitBreaker._
import CircuitBreakerRegistry._

import statistics.Statistics

/**
Expand All @@ -45,9 +40,8 @@ import statistics.Statistics
final class CircuitBreakerRegistry[F[_]] private (
state: StateRef[F, State[F]],
eventTopic: Topic[F, Option[CircuitBreakerEvent]],
shutdownTrigger: ShutdownTrigger[F],
scheduler: Scheduler
)(implicit ec: ExecutionContext, F: Effect[F]) {
shutdownTrigger: ShutdownTrigger[F]
)(implicit ec: ExecutionContext, F: Concurrent[F], T: Timer[F]) {

/**
* Creates an `fs2.Stream` of [[CircuitBreaker#CircuitBreakerEvent]]s by subscribing to the event `fs2.async.mutable.Topic` maintained
Expand All @@ -69,7 +63,7 @@ final class CircuitBreakerRegistry[F[_]] private (
cbs <- circuitBreakers
stats <- cbs.values.toVector.traverse { _.currentStatistics }
} yield stats
scheduler.awakeEvery[F](retrievalInterval).evalMap { _ => retrieveStatistics }.flatMap(Stream.emits(_)).interruptWhen(shutdownTrigger.signal)
Stream.awakeEvery[F](retrievalInterval).evalMap { _ => retrieveStatistics }.flatMap(Stream.emits(_)).interruptWhen(shutdownTrigger.signal)
}

/**
Expand Down Expand Up @@ -162,14 +156,14 @@ object CircuitBreakerRegistry {
* Creates a `CircuitBreakerRegistry` instance given a [[RegistrySettings]] configuration, providing for clean up of resources
* when the registry is shutdown.
* @param settings the configuration for the registry.
* @param scheduler the scheduler used for the execution of periodic tasks, such as the statistics stream and the
* @param ec the execution context used for the execution of an effectful program `F` with a `Effect` in implicit scope.
* @param ec the execution context used for the execution of an effectful program `F` with a `Concurrent` in implicit scope.
* registry [[CircuitBreaker]] garbage collection.
* @tparam F - the type of effectful programs the circuit breakers in this registry will protect.
* @tparam T — the timer used for the execution of periodic tasks, such as the statistics stream and the
* @return circuitBreakerRegistry - an effectful program describing the creation of the circuit breaker registry which
* will be executed when the program is run.
*/
def create[F[_]](settings: RegistrySettings, scheduler: Scheduler)(implicit F: Effect[F], ec: ExecutionContext): F[CircuitBreakerRegistry[F]] = {
def create[F[_]](settings: RegistrySettings)(implicit F: Concurrent[F], T: Timer[F], ec: ExecutionContext): F[CircuitBreakerRegistry[F]] = {
def collectGarbageInBackground(state: StateRef[F, State[F]], shutdownSignal: Signal[F, Boolean]) = {
val collectGarbage = for {
now <- F.delay(Instant.now)
Expand All @@ -180,16 +174,16 @@ object CircuitBreakerRegistry {
}.map { _.flatten }
_ <- state.modify(s => s.copy(circuitBreakers = s.circuitBreakers filterNot { case (id, _) => expiredIds.contains(id) }))
} yield ()
if (settings.garbageCollection.checkInterval > 0.nanoseconds) async.start(
scheduler.awakeEvery[F](settings.garbageCollection.checkInterval).evalMap { _ => collectGarbage }.interruptWhen(shutdownSignal).compile.drain.map { _ => () }
)
if (settings.garbageCollection.checkInterval > 0.nanoseconds) F.start(
Stream.awakeEvery[F](settings.garbageCollection.checkInterval).evalMap { _ => collectGarbage }.interruptWhen(shutdownSignal).compile.drain.map { _ => () }
).map(_.join)
else F.pure(())
}
for {
eventTopic <- async.topic[F, Option[CircuitBreakerEvent]](None)
state <- StateRef.create[F, State[F]](State.empty[F])
shutdownSignal <- async.signalOf[F, Boolean](false)
_ <- collectGarbageInBackground(state, shutdownSignal)
} yield new CircuitBreakerRegistry(state, eventTopic, new ShutdownTrigger(shutdownSignal), scheduler)
} yield new CircuitBreakerRegistry(state, eventTopic, new ShutdownTrigger(shutdownSignal))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,60 +27,60 @@ class CircuitBreakerRegistryTest extends WordSpec with TestSupport {
"The circuit breaker registry" should {
"register a new failure circuit breaker when an existing one with the given identifier does not exist" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync shouldBe 'empty
val cb = registry.forFailure(id, testFailureConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
}
"return the existing failure circuit breaker when requested with the given identifier when it is already registered" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val cb = registry.forFailure(id, testFailureConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
registry.forFailure(id, testFailureConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
}
"remove an existing failure circuit breaker" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val cb = registry.forFailure(id, testFailureConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
registry.removeCircuitBreaker(id).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe 'empty
}
"register a new flow control circuit breaker when an existing one with the given identifier does not exist" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync shouldBe 'empty
val cb = registry.forFlowControl(id, testFlowControlConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
}
"return the existing flow control circuit breaker when requested with the given identifier when it is already registered" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val cb = registry.forFlowControl(id, testFlowControlConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
registry.forFlowControl(id, testFlowControlConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
}
"remove an existing flow control circuit breaker" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val cb = registry.forFlowControl(id, testFlowControlConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe Some(cb)
registry.removeCircuitBreaker(id).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe 'empty
}
"ignore removal of a non-existent circuit breaker" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe 'empty
registry.removeCircuitBreaker(id).unsafeRunSync
registry.circuitBreakers.unsafeRunSync.get(id) shouldBe 'empty
}
"request a stream of events (when events are available)" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val tseo = TestStreamedEventObserver.create(registry)
val failureThreshold = Percentage(20.0)
val cb = registry.forFailure(id, testFailureConfig.copy(degradationThreshold = failureThreshold)).unsafeRunSync
Expand All @@ -89,7 +89,7 @@ class CircuitBreakerRegistryTest extends WordSpec with TestSupport {
}
"request a stream of statistics (when statistics are available)" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val tsso = TestStreamedStatisticsObserver.create(registry, 20.milliseconds)
val failureThreshold = Percentage(20.0)
val cb = registry.forFailure(id, testFailureConfig.copy(degradationThreshold = failureThreshold)).unsafeRunSync
Expand All @@ -102,13 +102,13 @@ class CircuitBreakerRegistryTest extends WordSpec with TestSupport {
val gcCheckInterval = 50.milliseconds
val inactivityCutoff = 500.milliseconds
val gcSettings = RegistrySettings.GarbageCollection(gcCheckInterval, inactivityCutoff)
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig.copy(garbageCollection = gcSettings), scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig.copy(garbageCollection = gcSettings)).unsafeRunSync
registry.forFailure(id, testFailureConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync should not be ('empty)
registry.circuitBreakers.unsafeRunSync should not be 'empty
Thread.sleep(inactivityCutoff.toMillis + (gcCheckInterval.toMillis * 2L))
registry.circuitBreakers.unsafeRunSync shouldBe 'empty
registry.forFailure(id, testFailureConfig).unsafeRunSync
registry.circuitBreakers.unsafeRunSync should not be ('empty)
registry.circuitBreakers.unsafeRunSync should not be 'empty
Thread.sleep(inactivityCutoff.toMillis + (gcCheckInterval.toMillis * 2L))
registry.circuitBreakers.unsafeRunSync shouldBe 'empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ class FailureCircuitBreakerTest extends WordSpec with TestSupport {
"The failure circuit breaker" should {
"switch to the open position when the failure rate of the services it is protecting exceeds the configured threshold" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val failureThreshold = Percentage(20.0)
val cb = registry.forFailure(id, testFailureConfig.copy(degradationThreshold = failureThreshold)).unsafeRunSync
val tseo = TestStreamedEventObserver.create(registry)
tseo.openedCount(id) shouldBe 0
val results = protectFailure(cb, failureThreshold.plus(Percentage(10.0)))
tseo.openedCount(id) should be > 0
results.collectFirst { case Some(CircuitBreaker.OpenException(id, _)) => id } shouldBe Some(id)
results.collectFirst { case Some(CircuitBreaker.OpenException(eId, _)) => eId } shouldBe Some(id)
}
"switch to the closed position after opening when the configured minimum number of service tests succeed" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val failureThreshold = Percentage(20.0)
val cb = registry.forFailure(
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class FlowControlCircuitBreakerTest extends WordSpec with TestSupport {
"The flow control circuitbreaker" should {
"throttle down the request rate when the inbound rate continue to exceed the max acceptable rate" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val cb = registry.forFlowControl(id, testFlowControlConfig).unsafeRunSync
val tseo = TestStreamedEventObserver.create(registry)
protectFlowControl(cb, 20.milliseconds, 5.milliseconds, 750.milliseconds)
Expand All @@ -42,7 +42,7 @@ class FlowControlCircuitBreakerTest extends WordSpec with TestSupport {
}
"throttle up the request rate when the inbound rate no longer exceeds the max acceptable rate" in {
val id = CircuitBreaker.Identifier("test")
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig, scheduler).unsafeRunSync
val registry = CircuitBreakerRegistry.create[IO](testRegistryConfig).unsafeRunSync
val cb = registry.forFlowControl(id, testFlowControlConfig).unsafeRunSync
val tseo = TestStreamedEventObserver.create(registry)
protectFlowControl(cb, 20.milliseconds, 10.milliseconds, 750.milliseconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,22 @@
*/
package com.ccadllc.cedi.circuitbreaker

import cats.effect.IO
import cats.effect.{ Concurrent, IO }
import cats.implicits._

import fs2.{ async, Scheduler }

import scala.collection.immutable.Vector
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

import org.scalatest.{ BeforeAndAfterAll, Matchers, Suite, WordSpecLike }

import CircuitBreaker._

import statistics._

trait TestSupport extends WordSpecLike with Matchers with BeforeAndAfterAll {
self: Suite =>

private val GroupTasksBy = 10

var scheduler: Scheduler = _
private var shutdownScheduler: IO[Unit] = _

override def beforeAll() = {
val (s, shutdown) = Scheduler.allocate[IO](1).unsafeRunSync
scheduler = s
shutdownScheduler = shutdown
}

override def afterAll() = {
shutdownScheduler.unsafeRunSync
}

val testRegistryConfig: RegistrySettings = RegistrySettings(RegistrySettings.GarbageCollection(5.minutes, 60.minutes))

val testRequestSamples: Int = 200
Expand Down Expand Up @@ -179,10 +161,10 @@ trait TestSupport extends WordSpecLike with Matchers with BeforeAndAfterAll {
}
}

Random.shuffle(successfulTasks ++ failedTasks).grouped(groupedTasks).toVector.traverse {
Random.shuffle(successfulTasks ++ failedTasks).grouped(groupedTasks).toVector.traverse[IO, Vector[Option[Throwable]]] {
def protectInParallel(taskGroup: Vector[IO[Unit]]) = {
def attemptProtect(t: IO[Unit]) = cb.protect(t).attempt map { _.left.toOption }
async.parallelTraverse(taskGroup map attemptProtect)(identity)
def attemptProtect(t: IO[Unit]): IO[Option[Throwable]] = cb.protect(t).attempt map { _.left.toOption }
taskGroup.map(attemptProtect).parTraverse[IO, IO.Par, Option[Throwable]](identity)
}
protectInParallel(_) map { r =>
Thread.sleep(50L)
Expand Down Expand Up @@ -218,10 +200,10 @@ trait TestSupport extends WordSpecLike with Matchers with BeforeAndAfterAll {
}
ExecutionContext(ctx.tasks :+ task, updatedRequestTime)
}
ec.tasks.grouped(groupedTasks).toVector.traverse {
ec.tasks.grouped(groupedTasks).toVector.traverse[IO, Vector[IO[Either[Throwable, Unit]]]] {
def protectInParallel(taskGroup: Vector[IO[Unit]]) = {
def attemptProtect(t: IO[Unit]) = async.start(cb.protect(t).attempt)
async.parallelTraverse(taskGroup map attemptProtect)(identity)
def attemptProtect(t: IO[Unit]): IO[IO[Either[Throwable, Unit]]] = Concurrent[IO].start(cb.protect(t).attempt).map(_.join)
taskGroup.map(attemptProtect).parTraverse[IO, IO.Par, IO[Either[Throwable, Unit]]](identity)
}
protectInParallel(_) map { r =>
Thread.sleep(pauseBetweenTaskGroups.toMillis)
Expand Down

0 comments on commit 103fa6c

Please sign in to comment.