Permalink
Browse files

Added Benchmark, fairQueue

  • Loading branch information...
pchlupacek committed Sep 14, 2018
1 parent 828ce38 commit b49098008a9654d485fa78d792d1e0ebfaef138b
@@ -0,0 +1,38 @@
package fs2.benchmark
import cats.syntax.all._
import cats.effect.{Concurrent, IO}
import fs2._
import fs2.concurrent.Queue
import org.openjdk.jmh.annotations.{Benchmark, Scope, State}
import scala.concurrent.ExecutionContext
@State(Scope.Thread)
class QueueBenchmark {
implicit val cs = IO.contextShift(ExecutionContext.global)
implicit val concurrent = IO.ioConcurrentEffect
val size = 100000
@GenerateN(1, 2, 5, 10, 50, 100)
@Benchmark
def chunkedQueue10k(N: Int): Unit =
Queue
.unbounded[IO, Int]
.flatMap { q =>
Concurrent[IO].start(Stream.constant(1, N).take(size).to(q.enqueue).compile.drain) >>
q.dequeue.take(size).compile.drain
}
.unsafeRunSync()
@GenerateN(1, 2, 5, 10, 50, 100)
@Benchmark
def chunkedQueueManually10k(N: Int): Unit =
Queue
.unbounded[IO, Chunk[Int]]
.flatMap { q =>
Concurrent[IO].start(Stream.constant(1, N).take(size).chunks.to(q.enqueue).compile.drain) >>
q.dequeue.flatMap(Stream.chunk).take(size).compile.drain
}
.unsafeRunSync()
}
@@ -50,7 +50,7 @@ trait Dequeue[F[_], A] {
/** Dequeue elements from the queue */
def dequeue: Stream[F, A] =
dequeueChunk(1)
dequeueChunk(Int.MaxValue)
/** Dequeue elements from the queue, size of the chunks dequeue is restricted by `maxSize` */
def dequeueChunk(maxSize: Int): Stream[F, A]
@@ -113,10 +113,18 @@ object Queue {
def unbounded[F[_], A](implicit F: Concurrent[F]): F[Queue[F, A]] =
forStrategy(UnboundedQueue.strategy[A])
/** unbounded queue that distributed always at max `fairSize` elements to any subscriber **/
def fairUnbounded[F[_], A](fairSize: Int)(implicit F: Concurrent[F]): F[Queue[F, A]] =
forStrategy(UnboundedQueue.strategy[A].transformSelector[Int]((sz, _) => sz.min(fairSize)))
/** Creates a queue with the specified size bound. */
def bounded[F[_], A](maxSize: Int)(implicit F: Concurrent[F]): F[Queue[F, A]] =
forStrategy(BoundedQueue.strategy(maxSize))
/** bounded queue that distributed always at max `fairSize` elements to any subscriber **/
def fairBounded[F[_], A](maxSize: Int, fairSize: Int)(implicit F: Concurrent[F]): F[Queue[F, A]] =
forStrategy(BoundedQueue.strategy(maxSize).transformSelector[Int]((sz, _) => sz.min(fairSize)))
/** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */
def circularBuffer[F[_], A](maxSize: Int)(implicit F: Concurrent[F]): F[Queue[F, A]] =
forStrategy(UnboundedQueue.circularBuffer(maxSize))
@@ -202,7 +202,6 @@ object SignallingRef {
}
}
}
implicit def invariantInstance[F[_]: Functor]: Invariant[SignallingRef[F, ?]] =
new Invariant[SignallingRef[F, ?]] {
@@ -1,6 +1,6 @@
package fs2.concurrent.pubsub
trait PubSubStrategy[I, O, S, Selector] {
trait PubSubStrategy[I, O, S, Selector] { self =>
/** provides initial state **/
def initial: S
@@ -55,4 +55,24 @@ trait PubSubStrategy[I, O, S, Selector] {
* @param selector Selector, whose selection shall be cancelled. Shall contain subscriber's identity.
*/
def unsubscribe(selector: Selector, queueState: S): S
/** transforms selector to selector of this state by applying the `f` to Sel2 and state of this strategy **/
def transformSelector[Sel2](f: (Sel2, S) => Selector): PubSubStrategy[I, O, S, Sel2] =
new PubSubStrategy[I, O, S, Sel2] {
def initial: S =
self.initial
def accepts(i: I, queueState: S): Boolean =
self.accepts(i, queueState)
def publish(i: I, queueState: S): S =
self.publish(i, queueState)
def get(selector: Sel2, queueState: S): (S, Option[O]) =
self.get(f(selector, queueState), queueState)
def empty(queueState: S): Boolean =
self.empty(queueState)
def subscribe(selector: Sel2, queueState: S): (S, Boolean) =
self.subscribe(f(selector, queueState), queueState)
def unsubscribe(selector: Sel2, queueState: S): S =
self.unsubscribe(f(selector, queueState), queueState)
}
}

0 comments on commit b490980

Please sign in to comment.