Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JettyClient: properly propagate streamed errors #2909

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -12,26 +12,38 @@ import java.nio.ByteBuffer
import org.eclipse.jetty.client.api.{Result, Response => JettyResponse}
import org.eclipse.jetty.http.{HttpFields, HttpVersion => JHttpVersion}
import org.eclipse.jetty.util.{Callback => JettyCallback}
import org.http4s.client.jetty.ResponseListener.Item
import org.http4s.internal.{invokeCallback, loggingAsyncCallback}
import org.http4s.internal.CollectionCompat.CollectionConverters._
import org.log4s.getLogger

private[jetty] final case class ResponseListener[F[_]](
queue: Queue[F, Option[ByteBuffer]],
queue: Queue[F, Item],
cb: Callback[Resource[F, Response[F]]])(implicit F: ConcurrentEffect[F])
extends JettyResponse.Listener.Adapter {

import ResponseListener.logger

/* Needed to properly propagate client errors */
private[this] var responseSent = false

override def onHeaders(response: JettyResponse): Unit = {
val r = Status
.fromInt(response.getStatus)
.map { s =>
responseSent = true
Resource.pure[F, Response[F]](Response(
status = s,
httpVersion = getHttpVersion(response.getVersion),
headers = getHeaders(response.getHeaders),
body = queue.dequeue.unNoneTerminate.flatMap(bBuf => chunk(Chunk.byteBuffer(bBuf)))
body = queue.dequeue.repeatPull {
_.uncons1.flatMap {
case None => Pull.pure(None)
case Some((Item.Done, _)) => Pull.pure(None)
case Some((Item.Buf(b), tl)) => Pull.output(Chunk.byteBuffer(b)).as(Some(tl))
case Some((Item.Raise(t), _)) => Pull.raiseError[F](t)
}
}
))
}
.leftMap(t => { abort(t, response); t })
Expand All @@ -56,15 +68,16 @@ private[jetty] final case class ResponseListener[F[_]](
callback: JettyCallback): Unit = {
val copy = ByteBuffer.allocate(content.remaining())
copy.put(content).flip()
enqueue(copy.some) {
enqueue(Item.Buf(copy)) {
case Right(_) => IO(callback.succeeded())
case Left(e) =>
IO(logger.error(e)("Error in asynchronous callback")) >> IO(callback.failed(e))
}
}

override def onFailure(response: JettyResponse, failure: Throwable): Unit =
invokeCallback(logger)(cb(Left(failure)))
if (responseSent) enqueue(Item.Raise(failure))(_ => IO.unit)
else invokeCallback(logger)(cb(Left(failure)))

// the entire response has been received
override def onSuccess(response: JettyResponse): Unit =
Expand All @@ -83,19 +96,26 @@ private[jetty] final case class ResponseListener[F[_]](
closeStream()

private def closeStream(): Unit =
enqueue(None)(loggingAsyncCallback(logger))
enqueue(Item.Done)(loggingAsyncCallback(logger))

private def enqueue(b: Option[ByteBuffer])(cb: Either[Throwable, Unit] => IO[Unit]): Unit =
queue.enqueue1(b).runAsync(cb).unsafeRunSync()
private def enqueue(item: Item)(cb: Either[Throwable, Unit] => IO[Unit]): Unit =
queue.enqueue1(item).runAsync(cb).unsafeRunSync()

}

private[jetty] object ResponseListener {
sealed trait Item
object Item {
case object Done extends Item
case class Raise(t: Throwable) extends Item
case class Buf(b: ByteBuffer) extends Item
}

private val logger = getLogger

def apply[F[_]](cb: Callback[Resource[F, Response[F]]])(
implicit F: ConcurrentEffect[F]): F[ResponseListener[F]] =
Queue
.synchronous[F, Option[ByteBuffer]]
.synchronous[F, Item]
.map(q => ResponseListener(q, cb))
}