add jetty backend for http4s client #2036
Conversation
I've got an alternative implementation for this but it requires an additional dependency (jetty-reactive-httpclient) |
request: Request[F])(implicit F: Effect[F], ec: ExecutionContext) = { | ||
val dcp = new DeferredContentProvider() | ||
|
||
ec.execute(new Runnable { |
marcodippy
Aug 26, 2018
Author
I'm not sure if running this on the ec used for running the responses is the right thing to do... ideas?
I'm not sure if running this on the ec used for running the responses is the right thing to do... ideas?
rossabaker
Aug 27, 2018
Member
Any idea what the execution model is here? Does offer
block until there is space in some internal queue? Or is that why it can return a boolean? If it is full, does it give us any idea when we can reschedule? There's some undocumented offer callback.
If we can do this without blocking, I don't think we need to relocate it at all. If we do need to block, we should be looking at ContextShift
, but then we're stuck on finishing #1933. We're in a bit of flux here. Let's figure out how it works, and then we can figure out how to proceed here.
Any idea what the execution model is here? Does offer
block until there is space in some internal queue? Or is that why it can return a boolean? If it is full, does it give us any idea when we can reschedule? There's some undocumented offer callback.
If we can do this without blocking, I don't think we need to relocate it at all. If we do need to block, we should be looking at ContextShift
, but then we're stuck on finishing #1933. We're in a bit of flux here. Let's figure out how it works, and then we can figure out how to proceed here.
marcodippy
Aug 27, 2018
•
Author
so, I don't think that offer
actually blocks the thread, the chunks are written to a java.util.ArrayDeque
that automatically resizes when needed (code here)
Actually the r/w access to the ArrayDeque
is synchronised
with an object lock
the reason why I relocated it is that otherwise the request won't be sent until the entire body has been loaded into the content provider. I could move the execution of this block of code after the request.send(...)
so that it wouldn't need to be relocated.
so, I don't think that (code here)offer
actually blocks the thread, the chunks are written to a java.util.ArrayDeque
that automatically resizes when needed
Actually the r/w access to the ArrayDeque
is synchronised
with an object lock
the reason why I relocated it is that otherwise the request won't be sent until the entire body has been loaded into the content provider. I could move the execution of this block of code after the request.send(...)
so that it wouldn't need to be relocated.
also, quoting the jetty client doc:
I'm not entirely sure how to enable this kind of "fine-grained control of the req/resp conversation" given the current implementation |
I think there are some subtleties that are nicely covered by wrapping the jetty-reactive-httpclient, but each dependency is a liability, and would add the overhead of the fs2-reactive-streams bridge when we should be able to implement this directly. I could argue either side. Which do you feel better about, having written both? I think you might have the "fine-grained" control right: you're providing the request body asynchronously, and I don't see how you could receive the response headers before the request headers are sent. I suppose we might get more sophisticated about not continuing to send to the deferred content provider? |
try { | ||
client.stop() | ||
} catch { | ||
case NonFatal(t) => logger.warn(t)("Unable to shut down Jetty client") |
rossabaker
Aug 27, 2018
Member
I'd log this at error level.
I'd log this at error level.
jetty-client/src/main/scala/org/http4s/client/jetty/JettyClient.scala
Outdated
Show resolved
Hide resolved
.drain | ||
.runAsync { | ||
case Left(t) => | ||
IO { logger.warn(t)("Unable to write to Jetty sink") } |
rossabaker
Aug 27, 2018
Member
error
: it didn't work
error
: it didn't work
override def run(): Unit = | ||
request.body.chunks | ||
.map(_.toByteBuffer) | ||
.to(Sink((b: ByteBuffer) => F.delay { dcp.offer(b); () })) |
rossabaker
Aug 27, 2018
Member
This returns a boolean. If it's false
, something terrible has happened.
This returns a boolean. If it's false
, something terrible has happened.
rossabaker
Aug 27, 2018
Member
Oh, the Jetty Chunk's Callback
is invoked when the content is read. I think we probably want to use Chunk
instead of ByteBuffer
here, because with Chunk
, you could implement backpressure. As it stands, we're just going to fill up the dcp
without any regard to how fast the server consumes it.
Oh, the Jetty Chunk's Callback
is invoked when the content is read. I think we probably want to use Chunk
instead of ByteBuffer
here, because with Chunk
, you could implement backpressure. As it stands, we're just going to fill up the dcp
without any regard to how fast the server consumes it.
marcodippy
Aug 27, 2018
Author
true, right now the dcp is gonna be loaded regardless of any circumstances. I'm not entirely sure how to implement the back pressure mechanism but I'll try to figure out something, I guess that each dcp.offer
should somehow wait on the previous callback to be called with .succeed()
or .failed(Throwable t)
and decide either to continue or to stop processing, right?
Btw, I'm wondering if by using the InputStreamContentProvider we would get this semantic for free (but it would obviously block on InputStream.read
)
true, right now the dcp is gonna be loaded regardless of any circumstances. I'm not entirely sure how to implement the back pressure mechanism but I'll try to figure out something, I guess that each dcp.offer
should somehow wait on the previous callback to be called with .succeed()
or .failed(Throwable t)
and decide either to continue or to stop processing, right?
Btw, I'm wondering if by using the InputStreamContentProvider we would get this semantic for free (but it would obviously block on InputStream.read
)
rossabaker
Aug 30, 2018
Member
The PublisherContentProvider isn't exactly the same thing, but might be an inspiration.
What about a semaphore with 1 permit? We take a permit to emit a chunk, and return a permit on each callback?
The PublisherContentProvider isn't exactly the same thing, but might be an inspiration.
What about a semaphore with 1 permit? We take a permit to emit a chunk, and return a permit on each callback?
request: Request[F])(implicit F: Effect[F], ec: ExecutionContext) = { | ||
val dcp = new DeferredContentProvider() | ||
|
||
ec.execute(new Runnable { |
rossabaker
Aug 27, 2018
Member
Any idea what the execution model is here? Does offer
block until there is space in some internal queue? Or is that why it can return a boolean? If it is full, does it give us any idea when we can reschedule? There's some undocumented offer callback.
If we can do this without blocking, I don't think we need to relocate it at all. If we do need to block, we should be looking at ContextShift
, but then we're stuck on finishing #1933. We're in a bit of flux here. Let's figure out how it works, and then we can figure out how to proceed here.
Any idea what the execution model is here? Does offer
block until there is space in some internal queue? Or is that why it can return a boolean? If it is full, does it give us any idea when we can reschedule? There's some undocumented offer callback.
If we can do this without blocking, I don't think we need to relocate it at all. If we do need to block, we should be looking at ContextShift
, but then we're stuck on finishing #1933. We're in a bit of flux here. Let's figure out how it works, and then we can figure out how to proceed here.
override def onHeaders(response: JettyResponse): Unit = { | ||
val dr = DisposableResponse[F]( | ||
response = Response( | ||
status = Status.fromInt(response.getStatus).valueOr(throw _), |
rossabaker
Aug 27, 2018
Member
I think this was copied from somewhere else, but it would be better if we could change this exception into a failed response. This is going to crash out of the Jetty listener and never respond to the client in the unlikely event that it happens.
I think this was copied from somewhere else, but it would be better if we could change this exception into a failed response. This is going to crash out of the Jetty listener and never respond to the client in the unlikely event that it happens.
override def onContent(response: JettyResponse, content: ByteBuffer): Unit = { | ||
val copy = ByteBuffer.allocate(content.remaining()) | ||
copy.put(content).flip() | ||
queue.enqueue1(copy.some).runAsync(_ => IO.unit).unsafeRunSync() |
rossabaker
Aug 27, 2018
Member
There's a loggingAsyncCallback
in our internal package that makes sure we see any errors in the runAsync
callback.
There's a loggingAsyncCallback
in our internal package that makes sure we see any errors in the runAsync
callback.
override def onContent(response: JettyResponse, content: ByteBuffer): Unit = { | ||
val copy = ByteBuffer.allocate(content.remaining()) | ||
copy.put(content).flip() | ||
queue.enqueue1(copy.some).runAsync(loggingAsyncCallback(logger)).unsafeRunSync() |
marcodippy
Aug 27, 2018
•
Author
I guess we lack some kind of back pressure mechanism here too @rossabaker ; we could use the onContent(Response response, ByteBuffer content, Callback callback)
method here and invoke the callback
to signal that we need more content (the callback.succeed()
doesn't control only the "demand content" semantic but also the releasing of the ByteBuffer, see here, I'm not sure how big of a problem it would be here)
I guess we lack some kind of back pressure mechanism here too @rossabaker ; we could use the onContent(Response response, ByteBuffer content, Callback callback)
method here and invoke the callback
to signal that we need more content (the callback.succeed()
doesn't control only the "demand content" semantic but also the releasing of the ByteBuffer, see here, I'm not sure how big of a problem it would be here)
rossabaker
Aug 30, 2018
Member
Ooh, yeah, good point. The queue can be a bounded queue, but we don't want to block of spin. Yeah, I think something with the callback would be the right approach here, too.
Ooh, yeah, good point. The queue can be a bounded queue, but we don't want to block of spin. Yeah, I think something with the callback would be the right approach here, too.
d9d6185
to
1be704a
Thanks, and sorry for the delay. The cats-effect-1.0 push consumed us for a bit. This is looking really good. Just a couple more nitpicks, and I think we're ready to go. |
|
||
object JettyClient { | ||
|
||
private val logger = getLogger | ||
implicit private val logger: Logger = getLogger |
rossabaker
Sep 9, 2018
Member
Hmm. Where is this passed implicitly? I don't think I've ever seen this done with log4s.
Hmm. Where is this passed implicitly? I don't think I've ever seen this done with log4s.
marcodippy
Sep 9, 2018
Author
I have no idea how that implicit
ended up in there :)
I have no idea how that implicit
ended up in there :)
F.async[DisposableResponse[F]] { cb => | ||
toRequest(client, req).send(responseHandler(cb)) | ||
F.asyncF[DisposableResponse[F]] { cb => | ||
Resource |
rossabaker
Sep 9, 2018
Member
You could just use Bracket
here instead of making a Resource
. Bracket
excels at nesting more than anything.
You could just use Bracket
here instead of making a Resource
. Bracket
excels at nesting more than anything.
}) | ||
) | ||
}) | ||
F.delay(client.stop()).onError { |
rossabaker
Sep 9, 2018
Member
handleErrorWith
? I don't think we want this to fail the task altogether.
handleErrorWith
? I don't think we want this to fail the task altogether.
.flatTap(_ => IO { dcp.close() }) | ||
.unsafeRunSync() | ||
}) | ||
request.httpVersion match { |
rossabaker
Sep 9, 2018
Member
Could wrap jReq.version
around the match and only say it once.
Could wrap jReq.version
around the match and only say it once.
2f21d18
to
be90cc5
hey @rossabaker, thank you for all the feedbacks :) |
This looks great. We've been adjusting the client builders on master since this PR began, and will want to tweak this one, but I think we should do that in a subsequent PR. |
great, thank you @rossabaker! I'll be happy to help again on http4s :) |
closes #2013