Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve client timeout and wait queue handling #1570

Closed
wants to merge 0 commits into from

Conversation

@eklavya
Copy link
Contributor

@eklavya eklavya commented Nov 28, 2017

No description provided.

@eklavya eklavya force-pushed the eklavya:master branch to c86ff5e Nov 28, 2017
Copy link
Member

@rossabaker rossabaker left a comment

Thanks. This is a good start on a tough problem.

Am I correct that the timing out of requests is only going to happen when some request in the pool gets released and we look for the next one to unblock? If the pool is consumed by several slow connections, I don't think the waiting connections would time out in a timely fashion. Am I misreading this?

* @param config blaze client configuration options
*/
def apply[F[_]: Effect](
maxTotalConnections: Int = DefaultMaxTotalConnections,
maxWaitQueueLimit: Int = DefaultMaxWaitQueueLimit,
maxConnectionsPerRequestKey: RequestKey => Int = _ => DefaultMaxTotalConnections,
waitExpiryTime: RequestKey => Int = _ => DefaultWaitExpiryTime,

This comment has been minimized.

@rossabaker

rossabaker Nov 30, 2017
Member

How about a Duration for the type here instead of Int?

This comment has been minimized.

@eklavya

eklavya Nov 30, 2017
Author Contributor

Ok.


private[this] val logger = getLogger

private var isClosed = false
private var curTotal = 0
private val allocated = mutable.Map.empty[RequestKey, Int]
private val idleQueues = mutable.Map.empty[RequestKey, mutable.Queue[A]]
private val waitQueue = mutable.Queue.empty[Waiting]
private var waitQueue = mutable.Queue.empty[Waiting]

This comment has been minimized.

@rossabaker

rossabaker Nov 30, 2017
Member

Why did this need to become a var?

This comment has been minimized.

@eklavya

eklavya Nov 30, 2017
Author Contributor

It was a val before and we were dequeuing for all expired one by one. Now we want to use span while taking out expired requests and just assign the new queue. Let me know if you feel this needs to change.

@@ -12,14 +13,21 @@ import scala.util.Random
import org.http4s.client.testroutes.GetRoutes
import org.http4s.client.JettyScaffold

class MaxConnectionsInPoolSpec extends Http4sSpec {
import scala.concurrent.Await

This comment has been minimized.

@rossabaker

rossabaker Nov 30, 2017
Member

Can we sort all these imports alphabetically? That's how it's done in most places in the code. I think this one got IntelliJ'ed.

This comment has been minimized.

@eklavya

eklavya Nov 30, 2017
Author Contributor

I was relying on scalafmt to do that.

private def isExpired(k: RequestKey, t: Instant): Boolean =
waitExpiryTime(k) != -1 && t
.plus(waitExpiryTime(k).toLong, ChronoUnit.SECONDS)
.isBefore(Instant.now())

This comment has been minimized.

@rossabaker

rossabaker Nov 30, 2017
Member

This gets a lot cleaner if the duration suggestion is followed. The -1 is !_.isFinite, and there's a clear time unit.

This comment has been minimized.

@eklavya

eklavya Nov 30, 2017
Author Contributor

Ok.

@eklavya
Copy link
Contributor Author

@eklavya eklavya commented Nov 30, 2017

You are right, we do need to wait in that case, although we take care of the next expired ones immediately in "findFirstAllowedWaiter". But I can't think of an easy solution to this, ideas?

@eklavya eklavya force-pushed the eklavya:master branch to 3335170 Nov 30, 2017
@rossabaker
Copy link
Member

@rossabaker rossabaker commented Dec 1, 2017

We could have a scheduler proactively clean out things caught in the wait queue. I don't know what semantics we should have for a request that's in flight. Imagine a non-idempotent POST that has been submitted but not yet responded.

@eklavya
Copy link
Contributor Author

@eklavya eklavya commented Dec 1, 2017

I thought about a scheduler but I thought it maybe overkill. Do we really want this?

I don’t understand the second point. If the request is in flight that means it’s not in wait queue, right?

@rossabaker
Copy link
Member

@rossabaker rossabaker commented Dec 3, 2017

From the client perspective, do we care how long we wait for a connection from the pool, or do we care how long it takes from submission to getting a timeout response? Which wait are we trying to fix here?

If we're trying to get timeout responses in a timely fashion, then I think we want a scheduler, and need to account for responses whose timeout is partially but incompletely consumed while in the wait queue. (My second point). I think this is a positive, but it gets ugly when a non-idempotent request is in flight.

.plus(waitExpiryTime(k).toSeconds, ChronoUnit.SECONDS)
.isBefore(Instant.now())
private def isExpired(t: Instant): Boolean = {
val elapsed = Instant.now().getEpochSecond - t.getEpochSecond

This comment has been minimized.

@rossabaker

rossabaker Dec 15, 2017
Member

Why is this in seconds instead of milliseconds? Is there a performance boost for the loss of granularity?

This comment has been minimized.

@eklavya

eklavya Dec 15, 2017
Author Contributor

Because the timeouts are specified in seconds (aren't they?).

This comment has been minimized.

@rossabaker

rossabaker Dec 20, 2017
Member

Do they have to be? I've had subsecond SLAs before.

.isBefore(Instant.now())
private def isExpired(t: Instant): Boolean = {
val elapsed = Instant.now().getEpochSecond - t.getEpochSecond
(requestTimeout.isFinite() && elapsed >= requestTimeout.toSeconds) || (responseHeaderTimeout

This comment has been minimized.

@rossabaker

rossabaker Dec 15, 2017
Member

I am feeling stupid. What's the difference between requestTimeout and responseHeaderTimeout? They both seem to be enforced in the same place.

This comment has been minimized.

@eklavya

eklavya Dec 15, 2017
Author Contributor

I am not sure what you mean. Any of them could have timed out, we need to check for both, no?

This comment has been minimized.

@rossabaker

rossabaker Dec 20, 2017
Member

I don't see any difference in how these two properties are enforced in this function, or anywhere else. What is the semantic difference between them?

This comment has been minimized.

@eklavya

eklavya Dec 20, 2017
Author Contributor

I may have the wrong interpretation of the two. What happens when requestTimeout is 2 seconds and responseHeaderTimeout is 1 second? responseHeaderTimeout can timeout independent of requestTimeout and vice versa. Is this correct?

This comment has been minimized.

@rossabaker

rossabaker Dec 20, 2017
Member

I don't understand how they're being enforced such that they're two different properties. They're both being compared to the same elapsed time.

This comment has been minimized.

@eklavya

eklavya Dec 20, 2017
Author Contributor

responseHeaderTimeout not set
requestTimeout set

if we check only for responseHeaderTimeout we miss requestTimeout

responseHeaderTimeout set to 1 sec
requestTimeout set to 2 sec

if we only check for requestTimeout and we start after 1 sec and finish before 2 sec we are ok but we missed responseHeaderTimeout

etc. What am I missing?

This comment has been minimized.

@rossabaker

rossabaker Dec 20, 2017
Member

There are two properties on the config. Both of them are enforced right here, based on the same elapsed time. i.e., the clock on one does not start ticking before the other. Effectively, we're enforcing a single timeout that is min(requestTimeout, responseHeaderTimeout), no?

This comment has been minimized.

@eklavya

eklavya Dec 20, 2017
Author Contributor

Yes.

This comment has been minimized.

@rossabaker

rossabaker Dec 20, 2017
Member

So if the property we're enforcing is just a min of two properties, why two properties? I would think that the start or end time we're measuring against would be different.

Copy link
Member

@rossabaker rossabaker left a comment

It would also be good to document how the timeout gets reset when popped off the waiting queue. And then I think we can merge this as "better than before."

client/src/test/scala/org/http4s/client/testroutes/GetRoutes.scala Outdated

val getPaths: Map[String, Response[IO]] = {
Map(
SimplePath -> Response[IO](Ok).withBody("simple path"),
ChunkedPath -> Response[IO](Ok).withBody(
Stream.emits("chunk".toSeq.map(_.toString)).covary[IO])
Stream.emits("chunk".toSeq.map(_.toString)).covary[IO]),
DelayedPath -> IO(Thread.sleep(1000L)) *> Response[IO](Ok).withBody("delayed path")

This comment has been minimized.

@rossabaker

rossabaker Dec 21, 2017
Member

We can sleep with an fs2 scheduler and not block a thread. This should make tests more robust.

@eklavya eklavya force-pushed the eklavya:master branch to b29d557 Dec 21, 2017
Copy link
Member

@rossabaker rossabaker left a comment

This LGTM as an interim step forward. As discussed on Gitter, going to try to integrate this fully without the resetting of the clock for 0.18.0.

@rossabaker
Copy link
Member

@rossabaker rossabaker commented Dec 22, 2017

I think this is correct. We should take the nasty disclaimers out of the scaladoc.

I'd love to get an automated test around this, but am not yet sure the best way to do so.

@rossabaker
Copy link
Member

@rossabaker rossabaker commented Dec 22, 2017

I removed the limitations from the scaladoc.

Note that our response header timeout is from when the request is submitted via the client, and not when the request goes over the wire. (This is different from the cloudflare document we've all been using as a discussion starter.)

@@ -259,10 +279,17 @@ private final class PoolManager[F[_], A <: Connection[F]](
}
}

private def findFirstAllowedWaiter =
private def findFirstAllowedWaiter = {
val (expired, rest) = waitQueue.span(w => isExpired(w.at))

This comment has been minimized.

@jmcardon

jmcardon Dec 22, 2017
Member

Should this be partition? Do we want the longest prefix of the expired requests, or all of them?

This comment has been minimized.

@rossabaker

rossabaker Dec 22, 2017
Member

Great question. The oldest should be at the front of the queue, so I think span is a fair optimization that prevents us from iterating the entire queue.

If this were a takeWhile, and we dropped its size, we could get rid of the var.

import scala.util.Random

private final class PoolManager[F[_], A <: Connection[F]](
builder: ConnectionBuilder[F, A],
maxTotal: Int,
maxWaitQueueLimit: Int,
maxConnectionsPerRequestKey: RequestKey => Int,
responseHeaderTimeout: Duration,

This comment has been minimized.

@jmcardon

jmcardon Dec 22, 2017
Member

If you are doing an isFinite check with this every time, is there a reason this can't be a FiniteDuration?

This comment has been minimized.

@eklavya

eklavya Dec 22, 2017
Author Contributor

We need to support no timeouts (infinite timeouts).

This comment has been minimized.

@jmcardon

jmcardon Dec 22, 2017
Member

oh herp derp. Duration.Inf is a thing. Forgot. My bad 👍

@rossabaker rossabaker force-pushed the eklavya:master branch 2 times, most recently to 2a0e50d Dec 22, 2017
F.delay(synchronized {
decrConnection(connection.requestKey)
if (!connection.isClosed) connection.shutdown()
findFirstAllowedWaiter match {

This comment has been minimized.

@rossabaker

rossabaker Dec 22, 2017
Member

This could be consolidated with some code in releaseNonRecyclable. Just trying to get this working right now, to refactor more later.

@@ -147,6 +162,9 @@ private final class PoolManager[F[_], A <: Connection[F]](
logger.debug(s"Active connection not found. Creating new one. $stats")
createConnection(key, callback)

case None if maxConnectionsPerRequestKey(key) <= 0 =>

This comment has been minimized.

@rossabaker

rossabaker Dec 22, 2017
Member

This is a new enhancement, but it prevents a hang in the spec, so bolted on to this.

@rossabaker
Copy link
Member

@rossabaker rossabaker commented Dec 22, 2017

I think the formatting error in Travis is something I fixed in master in 8ed6be9

@rossabaker rossabaker changed the title For #1532 Client timeout from time of request Improve client timeout and wait queue handling Dec 23, 2017
@eklavya
Copy link
Contributor Author

@eklavya eklavya commented Dec 23, 2017

Shouldn’t we also drain wait queue (fail all) while shutting down pool in PoolManager.scala? On mobile don’t really know if you have done it already.

@rossabaker
Copy link
Member

@rossabaker rossabaker commented Dec 23, 2017

I was aiming for a graceful shutdown: accept no new connections, but fulfill everything that was already queued. I think that's achieved in 54cfe4a.

@rossabaker
Copy link
Member

@rossabaker rossabaker commented Dec 23, 2017

Sorry, I accidentally pushed my master to your master on merge, and then I couldn't fix it. This is all on #1613, minus the discussion. :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

3 participants
You can’t perform that action at this time.