Skip to content

Commit

Permalink
add EventStream
Browse files Browse the repository at this point in the history
  • Loading branch information
t3hnar committed Aug 25, 2020
1 parent 606b6cf commit 25a62c5
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.evolutiongaming.akkaeffect

import cats.effect.concurrent.Deferred
import cats.effect.{Concurrent, IO}
import cats.implicits._
import com.evolutiongaming.catshelper.ToFuture
import com.evolutiongaming.akkaeffect.IOSuite._
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.matchers.should.Matchers

class EventStreamTest extends AsyncFunSuite with ActorSuite with Matchers {

test("publish & subscribe") {
publishAndSubscribe[IO].run()
}

private def publishAndSubscribe[F[_]: Concurrent: ToFuture] = {

case class Event(n: Int)

val eventStream = EventStream[F](actorSystem)
for {
deferred <- Deferred[F, Event]
onEvent = (event: Event) => deferred.complete(event)
actual <- eventStream.subscribe(onEvent).use { _ =>
eventStream
.publish(Event(0))
.productR(deferred.get)
}
} yield {
actual shouldEqual Event(0)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.evolutiongaming.akkaeffect

import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
import cats.effect.{Resource, Sync}
import cats.implicits._
import com.evolutiongaming.catshelper.{FromFuture, ToFuture}

import scala.reflect.ClassTag

trait EventStream[F[_]] {

def publish[A](a: A): F[Unit]

def subscribe[A](onEvent: A => F[Unit])(implicit tag: ClassTag[A]): Resource[F, Unit]
}

object EventStream {

def apply[F[_]: Sync: ToFuture: FromFuture](actorSystem: ActorSystem): EventStream[F] = {
apply(actorSystem.eventStream, actorSystem)
}

def apply[F[_]: Sync: ToFuture: FromFuture](
eventStream: akka.event.EventStream,
refFactory: ActorRefFactory,
): EventStream[F] = {

val actorRefOf = ActorRefOf.fromActorRefFactory[F](refFactory)

new EventStream[F] {

def publish[A](a: A) = {
Sync[F].delay { eventStream.publish(a) }
}

def subscribe[A](onEvent: A => F[Unit])(implicit tag: ClassTag[A]) = {

val channel = tag.runtimeClass

def unsubscribe(actorRef: ActorRef): F[Unit] = {
Sync[F].delay { eventStream.unsubscribe(actorRef, channel) }.void
}

def receiveOf = ReceiveOf[F] { actorCtx =>
Resource.make {
Receive[Envelope[Any]] { envelope =>
tag
.unapply(envelope.msg)
.foldMapM { msg =>
onEvent(msg).handleError { _ =>
()
}
}
.as(false)
} {
false.pure[F]
}.pure[F]
} { _ =>
unsubscribe(actorCtx.self)
}
}

def subscribe(actorRef: ActorRef) = {
Resource.make {
Sync[F].delay { eventStream.subscribe(actorRef, channel) }.void
} { _ =>
unsubscribe(actorRef)
}
}

val props = Props(ActorOf(receiveOf))
for {
actorRef <- actorRefOf(props)
result <- subscribe(actorRef)
} yield result
}
}
}
}

0 comments on commit 25a62c5

Please sign in to comment.