Websocket: Update To Unsafe Signal for Close Rather than Pure Signal #1631
Conversation
|
I'm not actually up to date on my streaming combinators so I can't provide feedback on the changes to |
@@ -80,18 +74,20 @@ class Http4sWSStage[F[_]](ws: ws4s.Websocket[F])(implicit F: Effect[F], val ec: | |||
val onStreamFinalize: F[Unit] = | |||
for { | |||
dec <- F.delay(count.decrementAndGet()) | |||
_ <- deadSignal.map(signal => if (dec == 0) signal.set(true)) | |||
_ <- if (dec == 0) deadSignal.set(true) else ().pure[F] |
SystemFw
Jan 17, 2018
Member
().pure[F]
-->
F.unit
?
nicer, but utterly unimportant ofc
().pure[F]
-->
F.unit
?
nicer, but utterly unimportant ofc
val wsStream = inputstream | ||
.to(ws.receive) | ||
.onFinalize(onStreamFinalize) | ||
.mergeHaltR(ws.send.onFinalize(onStreamFinalize).to(snk).drain) |
SystemFw
Jan 17, 2018
•
Member
foo.mergeHaltR(bar.drain)
can probably be bar.concurrently(foo)
now, but with the new merge
on the way this should work as well
foo.mergeHaltR(bar.drain)
can probably be bar.concurrently(foo)
now, but with the new merge
on the way this should work as well
SystemFw
Jan 17, 2018
Member
Actually scratch that sorry, you nee the output of foo
, but want to complete when bar
is done
Actually scratch that sorry, you nee the output of foo
, but want to complete when bar
is done
Left a few style comments, but nothing too important. I'm also assuming this has been tested |
|
@ChristopherDavenport you were too fast! |
so I was imprecise, |
:( |
No description provided.