Skip to content

Commit

Permalink
Merge pull request #415 from http4s/deadlock
Browse files Browse the repository at this point in the history
try to avoid deadlock for websockets
  • Loading branch information
hamnis authored Feb 5, 2023
2 parents 9705649 + 789a265 commit 08c326d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import scodec.bits.ByteVector

import scala.concurrent.ExecutionContext

private[client] class Http4sWebsocketHandler[F[_]](
handshaker: WebSocketClientHandshaker,
queue: Queue[F, Either[Throwable, WSFrame]],
Expand Down Expand Up @@ -135,14 +133,27 @@ private[client] class Http4sWebsocketHandler[F[_]](
sub: String,
ctx: ChannelHandlerContext
) extends WSConnection[F] {
private val runInNetty = F.evalOnK(ExecutionContext.fromExecutor(ctx.executor()))

override def send(wsf: WSFrame): F[Unit] =
sendMany(List(wsf))

override def sendMany[G[_], A <: WSFrame](wsfs: G[A])(implicit G: Foldable[G]): F[Unit] =
if (ctx.channel().isActive) {
wsfs.traverse_(wsf => runInNetty(F.delay(ctx.writeAndFlush(fromWSFrame(wsf))).liftToF))
def writeAll(): Unit = void {
wsfs.toList.foreach { elem =>
ctx.write(fromWSFrame(elem), ctx.channel.voidPromise())
}
ctx.flush()
}

F.delay {
if (ctx.executor.inEventLoop()) {
writeAll()
} else {
ctx.executor().execute(() => writeAll())
}
}

} else {
closed.complete(()).void
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class NettyWSClientBuilder[F[_]](
)
}
})
F.delay(bs.connect(socketAddress).sync()).as(None)
F.delay(bs.connect(socketAddress)).as(None)
}
} yield connection

Expand Down

0 comments on commit 08c326d

Please sign in to comment.