Skip to content

Commit

Permalink
Merge 5fecdd6 into 1bae1cf
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-popov committed Nov 24, 2020
2 parents 1bae1cf + 5fecdd6 commit e841a24
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -8,8 +8,8 @@ Implementation of the reactive streams for fs2.
This version is not cross-platform, but faster than official module:
```
Benchmark Mode Cnt Score Error Units
ReadOneMillionNumbers.dokworkStreamSubscriber avgt 25 39.377 ± 2.749 ms/op
ReadOneMillionNumbers.fs2StreamSubscriber avgt 25 11189.737 ± 2213.225 ms/op
ReadOneMillionNumbers.dokworkStreamSubscriber avgt 25 56.017 ± 1.877 ms/op
ReadOneMillionNumbers.fs2StreamSubscriber avgt 25 3222.676 ± 1143.118 ms/op
```

## Installation
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
@@ -1,5 +1,5 @@
lazy val scala213 = "2.13.1"
lazy val scala212 = "2.12.10"
lazy val scala213 = "2.13.4"
lazy val scala212 = "2.12.12"
lazy val scala211 = "2.11.12"

lazy val `fast-reactive-fs2` = (project in file("."))
Expand Down
19 changes: 8 additions & 11 deletions src/main/scala/ru.dokwork.fs2/StreamSubscriber.scala
Expand Up @@ -3,12 +3,12 @@ package ru.dokwork.fs2
import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator

import cats.effect.{ CancelToken, Sync, Timer }
import cats.effect.{CancelToken, Sync, Timer}
import cats.implicits._
import fs2.{ Chunk, Stream }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import fs2.{Chunk, Stream}
import org.reactivestreams.{Publisher, Subscriber, Subscription}

import scala.collection.mutable
import scala.collection.immutable.Queue
import scala.concurrent.duration._

trait StreamSubscriber[F[_], A] extends Subscriber[A] {
Expand Down Expand Up @@ -91,7 +91,7 @@ object StreamSubscriber {
}
}

private def neNull[R](x: Any)(f: => R): R = if (x equals null) throw new NullPointerException else f
private def neNull[R](x: Any)(f: => R): R = if (x == null) throw new NullPointerException else f
}

/** Side-effect-free fsm of the subscriber. */
Expand Down Expand Up @@ -140,17 +140,14 @@ object StreamSubscriber {
case _ => Completed
}

def nonCompleted: Boolean = ref.get() != Completed

private def updateAndGet(f: State => State): State =
ref.updateAndGet(unaryOperator(f))
}

private final class ChunkQueue[A] extends AtomicReference[mutable.Buffer[A]](mutable.Buffer.empty[A]) {
// it's not dangerous to update buffer here because it happens sequentially only in the 'OnNext' method
def put(a: A): Unit = updateAndGet(unaryOperator(_ += a))
private final class ChunkQueue[A] extends AtomicReference[Queue[A]](Queue.empty[A]) {
def put(a: A): Unit = updateAndGet(unaryOperator(_ appended a))

def popAll: Chunk[A] = Chunk.buffer(getAndUpdate(unaryOperator(_ => mutable.Buffer.empty[A])))
def popAll: Chunk[A] = Chunk.seq(getAndUpdate(unaryOperator(_ => Queue.empty[A])))
}

private def unaryOperator[A](f: A => A) = new UnaryOperator[A] {
Expand Down

0 comments on commit e841a24

Please sign in to comment.