New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dequeue1 performance #1296

Closed
wants to merge 2 commits into
base: series/1.0
from

Conversation

Projects
None yet
4 participants
@maphi

maphi commented Oct 21, 2018

Improves dequeue1 performance on Queue. Also fixed some typos

@pchlupacek

This comment has been minimized.

Contributor

pchlupacek commented Oct 22, 2018

@maphi thanks for that. Could you please past some short benchmark and sample code for this ?

Thanks

2 similar comments
@pchlupacek

This comment has been minimized.

Contributor

pchlupacek commented Oct 22, 2018

@maphi thanks for that. Could you please past some short benchmark and sample code for this ?

Thanks

@pchlupacek

This comment has been minimized.

Contributor

pchlupacek commented Oct 22, 2018

@maphi thanks for that. Could you please past some short benchmark and sample code for this ?

Thanks

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

7 similar comments
@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

Sample Code to test performance:

import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
import fs2.concurrent.Queue

object Dequeue1PerfApp extends IOApp {

  val fillQueueCount = 100000

  override def run(args: List[String]): IO[ExitCode] =
    for {
      queue       <- Queue.unbounded[IO, Unit]
      _           <- List.range(0, fillQueueCount).traverse(_ => queue.offer1(()))
      dequeue1K   <- List.range(0, fillQueueCount / 1000).traverse{_ =>
                      for {
                        start <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                        _     <- List.range(0, 1000).traverse(_ => queue.dequeue1)
                        end   <- timer.clock.monotonic(TimeUnit.NANOSECONDS)
                      } yield end - start
                    }
      _           <- IO(dequeue1K.foreach(println))
    } yield ExitCode.Success

}

For me without this fix it takes ~90 seconds and with only 5 seconds .

@pchlupacek

This comment has been minimized.

Contributor

pchlupacek commented Oct 22, 2018

@maphi thanks for providing the sample. After discussing this and having more thought of it, this seems to be a larger problem, and we have to address that. Your fix seems to only work only in this corner case, and seems like if you will need more than 1 element, you will be again subject of poor performance.

Moreover, note that head/tail operations on SQueue have also very doubtful performance, for example if your test will interleave dequeue of one element with enqueue and dequeue of more elements, then your fix won't help and performance will be similar to split, perhaps even worse.

This seems to demonstrate that SQueue is not right container for Queue.

@mpilquist @SystemFw Any idea on better container? You mentioned that Vector has issues too, but what are these and wouldn't they be irrelevant here? Also when experimenting with Strategies, I tried Chain and it was like 10% slower on average than Queue.

Also we have to think about Chunk.Queue in this context, b/c it may be subject of similar problem.

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

@pchlupacek Totally agree with you. I was surprised operations on Queue in scala std lib offer that bad performance. From performance perspective i think a circular buffer backed by a plain Array would offer best performance. For unbounded queue one would need to add new arrays when existing ones overflow. I'm not deep enough into fs2 to know if such a mutable buffer would be an option.
I think the akka guys solved similar problems with akka.util.ByteString

@maphi

This comment has been minimized.

maphi commented Oct 22, 2018

After looking deeper into akka.util.ByteString implementation this looks like a good candidate for providing a queue-like, immutable datastructure. For example slicing is done just by changing start and end indices backed by an array.

@mpilquist

This comment has been minimized.

Member

mpilquist commented Oct 23, 2018

@pchlupacek @maphi scala.collection.immutable.Queue claims to have constant time prepend/append and amortized constant time head/tail. Does this not match your observations? Can we microbenchmark scala.collection.immutable.Queue here?

@@ -272,10 +272,10 @@ object Queue {
else q.tail :+ a
}
/** Unbounded lifo strategy. */
/** Unbounded fifo strategy. */

This comment has been minimized.

@ghostdogpr

ghostdogpr Oct 30, 2018

Contributor

Actually the comments are correct, but the function names are wrong.
I submitted #1301 to fix it.

@mpilquist

This comment has been minimized.

Member

mpilquist commented Oct 30, 2018

Initial benchmarks are showing Queue beating Vector:

package fs2
package benchmarks

import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State}
import java.util.concurrent.TimeUnit

import scala.collection.immutable.Queue

@State(Scope.Thread)
class QueueBenchmark {

  private val vLarge = Vector.fill(100000)(0)
  private val qLarge = Queue.fill(100000)(0)

  private val vSmall = Vector.fill(5)(0)
  private val qSmall = Queue.fill(5)(0)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def vLarge10M: Vector[Int] = runV(vLarge, 10000000)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def vLarge10K: Vector[Int] = runV(vLarge, 10000)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def vSmall10M: Vector[Int] = runV(vSmall, 10000000)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def vSmall10K: Vector[Int] = runV(vSmall, 10000)
  
  private def runV(v: Vector[Int], elems: Int): Vector[Int] = {
  var i = 0
    var acc = 0
    var current = vLarge
    while (i < elems) {
      val hd = current.head
      acc = acc + hd
      val tl = current.tail
      current = tl :+ acc
      i += 1
    }
    current
  }
  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def qLarge10M: Queue[Int] = runQ(qLarge, 10000000)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def qLarge10K: Queue[Int] = runQ(qLarge, 10000)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def qSmall10M: Queue[Int] = runQ(qSmall, 10000000)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def qSmall10K: Queue[Int] = runQ(qSmall, 10000)


  private def runQ(q: Queue[Int], elems: Int): Queue[Int] = {
    var i = 0
    var acc = 0
    var current = q
    while (i < elems) {
      val (hd, tl) = current.dequeue
      acc = acc + hd
      current = tl :+ acc
      i += 1
    }
    current
  }
}

Results:

[info] Benchmark                 Mode  Cnt     Score      Error  Units
[info] QueueBenchmark.qLarge10K  avgt    4     0.276 ±    0.106  ms/op
[info] QueueBenchmark.qLarge10M  avgt    4   310.492 ±  161.722  ms/op
[info] QueueBenchmark.qSmall10K  avgt    4     0.227 ±    0.122  ms/op
[info] QueueBenchmark.qSmall10M  avgt    4   208.318 ±   41.961  ms/op
[info] QueueBenchmark.vLarge10K  avgt    4     3.404 ±    1.289  ms/op
[info] QueueBenchmark.vLarge10M  avgt    4  3624.123 ± 1702.716  ms/op
[info] QueueBenchmark.vSmall10K  avgt    4     3.741 ±    1.694  ms/op
[info] QueueBenchmark.vSmall10M  avgt    4  3634.355 ± 1303.183  ms/op
@mpilquist

This comment has been minimized.

Member

mpilquist commented Oct 30, 2018

Update -- the problem is the use of splitAt, which is not constant time for Queue. More to come.

@mpilquist

This comment has been minimized.

Member

mpilquist commented Oct 30, 2018

scala.collection.immutable.Queue#splitAt inherits its implementation from scala.collection.TraversableLike: https://github.com/scala/scala/blob/bb1ce486dfb38f035c80f7d8a8ca06a17a0a59b1/src/library/scala/collection/TraversableLike.scala#L542-L552

Hence, splitAt is O(n). Same with take/drop. To fix, we should call dequeue in a loop everywhere we call splitAt now -- see dequeueN definition below in the new benchmark.

package fs2
package benchmarks

import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State}
import java.util.concurrent.TimeUnit

import scala.collection.immutable.Queue

@State(Scope.Thread)
class QueueBenchmark {

  private val vLarge = Vector.fill(100000)(0)
  private val qLarge = Queue.fill(100000)(0)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def vLarge10M: Vector[Int] = runV(vLarge, 10000000)
  
  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def vLarge10MBatch100: Vector[Int] = runVBatch(vLarge, 10000000, 100)

  private def runV(v: Vector[Int], elems: Int): Vector[Int] = {
    var i = 0
    var acc = 0
    var current = vLarge
    while (i < elems) {
      val (hd, tl) = current.splitAt(1)
      acc = acc + hd.head
      current = tl :+ acc
      i += 1
    }
    current
  }
  private def runVBatch(v: Vector[Int], elems: Int, batch: Int): Vector[Int] = {
    var i = 0
    var current = vLarge
    while (i < elems) {
      val (hds, tl) = current.splitAt(batch)
      current = tl ++ hds
      i += batch
    }
    current
  }

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def qLarge10M: Queue[Int] = runQ(qLarge, 10000000)

  @Benchmark @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.MILLISECONDS)
  def qLarge10MBatch: Queue[Int] = runQBatch(qLarge, 10000000, 100)

  private def runQ(q: Queue[Int], elems: Int): Queue[Int] = {
    var i = 0
    var acc = 0
    var current = q
    while (i < elems) {
      val (hd, tl) = current.dequeue
      acc = acc + hd
      current = tl :+ acc
      i += 1
    }
    current
  }

  private def runQBatch(q: Queue[Int], elems: Int, batch: Int): Queue[Int] = {
    var i = 0
    var current = q
    while (i < elems) {
      val (hds, tl) = dequeueN(current, batch)
      current = tl ++ hds
      i += batch
    }
    current
  }

  private def dequeueN[A](q: Queue[A], n: Int): (List[A], Queue[A]) = {
    val bldr = List.newBuilder[A]
    bldr.sizeHint(n)
    var cur = q
    var rem = n
    while (rem > 0 && cur.nonEmpty) {
      val (hd, tl) = cur.dequeue
      bldr += hd
      cur = tl
      rem -= 1
    }
    (bldr.result, cur)
  }
}

Results:

[info] Benchmark                         Mode  Cnt     Score     Error  Units
[info] QueueBenchmark.qLarge10M          avgt    4   300.635 ± 205.532  ms/op
[info] QueueBenchmark.qLarge10MBatch     avgt    4   394.301 ±  71.312  ms/op
[info] QueueBenchmark.vLarge10M          avgt    4  4871.769 ± 783.961  ms/op
[info] QueueBenchmark.vLarge10MBatch100  avgt    4   589.389 ± 520.994  ms/op
@maphi

This comment has been minimized.

maphi commented Oct 30, 2018

@mpilquist Looks fine. Using dequeue instead of head + tail also avoids queue running the reverse twice.

@mpilquist

This comment has been minimized.

Member

mpilquist commented Oct 30, 2018

@maphi Thanks for reviewing. Do you want to update this PR to use dequeueN instead? If not, I can open a PR today or tomorrow hopefully.

@pchlupacek

This comment has been minimized.

Contributor

pchlupacek commented Oct 30, 2018

@mpilquist excellent. Also could we add this to chunk.Queue ?

@mpilquist

This comment has been minimized.

Member

mpilquist commented Oct 30, 2018

@pchlupacek I think Chunk.Queue is safe, as it only calls head/tail. It would be good to update it to use dequeue to pick up some constant factor increases but I don't see any linear time issues there now. I can PR that separately.

@pchlupacek

This comment has been minimized.

Contributor

pchlupacek commented Oct 30, 2018

@mpilquist cool and thanks

@maphi

This comment has been minimized.

maphi commented Oct 30, 2018

@mpilquist I'd propose you open a new PR as it's your code :)
Btw: Thanks for your awesome work for the community. Currently trying to convince my colleagues to use more of typelevel stack.

mpilquist added a commit to mpilquist/fs2 that referenced this pull request Oct 31, 2018

@mpilquist

This comment has been minimized.

Member

mpilquist commented Oct 31, 2018

Closing this in favor of #1302

@mpilquist mpilquist closed this Oct 31, 2018

mpilquist added a commit that referenced this pull request Nov 6, 2018

Merge pull request #1302 from mpilquist/topic/queue-perf
Fixed #1296 - linear time Queue#dequeue1 when should be constant
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment