Skip to content

Commit

Permalink
Unbound most queues (#1159)
Browse files Browse the repository at this point in the history
It doesn't really make sense for us to bound queues here… they all get
filled after actions users manually took (like doing "Run all cells" in a
notebook with x hundreds cells…), so we can't really get flooded and crash
because of that - we're not a web service treating 1000s of req / s…
  • Loading branch information
alexarchambault committed Jun 20, 2023
1 parent 9317efb commit 159a12a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ object MessageHandler {
val poisonPill: (Channel, RawMessage) = null // a bit meh

val task = for {
queue <- Queue.bounded[IO, (Channel, RawMessage)](40) // FIXME sizing?
queue <- Queue.unbounded[IO, (Channel, RawMessage)]
main = run(queue)
_ <- {
val t = for {
Expand Down Expand Up @@ -253,7 +253,7 @@ object MessageHandler {

val task =
for {
queue <- Queue.bounded[IO, Either[Throwable, (Channel, RawMessage)]](40) // FIXME sizing?
queue <- Queue.unbounded[IO, Either[Throwable, (Channel, RawMessage)]]
main = run(queue)
_ <- {
val t = for {
Expand Down
14 changes: 6 additions & 8 deletions modules/shared/kernel/src/main/scala/almond/kernel/Kernel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,12 @@ object Kernel {
noExecuteInputFor: Set[String]
): IO[Kernel] =
for {
backgroundMessagesQueue <- Queue.bounded[IO, (Channel, RawMessage)](20) // FIXME Sizing
executeQueue <-
// FIXME Sizing
Queue.bounded[IO, Option[(
Option[(Channel, RawMessage)],
Stream[IO, (Channel, RawMessage)]
)]](50)
otherQueue <- Queue.bounded[IO, Option[Stream[IO, (Channel, RawMessage)]]](50) // FIXME Sizing
backgroundMessagesQueue <- Queue.unbounded[IO, (Channel, RawMessage)]
executeQueue <- Queue.unbounded[IO, Option[(
Option[(Channel, RawMessage)],
Stream[IO, (Channel, RawMessage)]
)]]
otherQueue <- Queue.unbounded[IO, Option[Stream[IO, (Channel, RawMessage)]]]
backgroundCommHandlerOpt <- IO {
if (interpreter.supportComm)
Some {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ object ClientStreams {

val poisonPill: (Channel, RawMessage) = null

val q = Queue.bounded[IO, (Channel, RawMessage)](10).unsafeRunSync()(IORuntime.global)
val q = Queue.unbounded[IO, (Channel, RawMessage)].unsafeRunSync()(IORuntime.global)

val sink: Pipe[IO, (Channel, RawMessage), Unit] = { s =>

Expand Down

0 comments on commit 159a12a

Please sign in to comment.