Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection RequestKey to logs #3607

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 24 additions & 18 deletions client/src/main/scala/org/http4s/client/PoolManager.scala
Expand Up @@ -163,16 +163,18 @@ private final class PoolManager[F[_], A <: Connection[F]](
def go(): F[Unit] =
getConnectionFromQueue(key).flatMap {
case Some(conn) if !conn.isClosed =>
F.delay(logger.debug(s"Recycling connection: $stats")) *>
F.delay(logger.debug(s"Recycling connection for $key: $stats")) *>
F.delay(callback(Right(NextConnection(conn, fresh = false))))

case Some(closedConn @ _) =>
F.delay(logger.debug(s"Evicting closed connection: $stats")) *>
F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *>
decrConnection(key) *>
go()

case None if numConnectionsCheckHolds(key) =>
F.delay(logger.debug(s"Active connection not found. Creating new one. $stats")) *>
F.delay(
logger.debug(
s"Active connection not found for $key. Creating new one. $stats")) *>
createConnection(key, callback)

case None if maxConnectionsPerRequestKey(key) <= 0 =>
Expand All @@ -199,11 +201,12 @@ private final class PoolManager[F[_], A <: Connection[F]](

case None => // we're full up. Add to waiting queue.
F.delay(
logger.debug(s"No connections available. Waiting on new connection: $stats")) *>
logger.debug(
s"No connections available for $key. Waiting on new connection: $stats")) *>
addToWaitQueue(key, callback)
}

F.delay(logger.debug(s"Requesting connection: $stats")) *>
F.delay(logger.debug(s"Requesting connection for $key: $stats")) *>
go()
} else
F.delay(callback(Left(new IllegalStateException("Connection pool is closed"))))
Expand All @@ -214,15 +217,15 @@ private final class PoolManager[F[_], A <: Connection[F]](
F.delay(waitQueue.dequeueFirst(_.key == key)).flatMap {
case Some(Waiting(_, callback, at)) =>
if (isExpired(at))
F.delay(logger.debug(s"Request expired")) *>
F.delay(logger.debug(s"Request expired for $key")) *>
F.delay(callback(Left(WaitQueueTimeoutException))) *>
releaseRecyclable(key, connection)
else
F.delay(logger.debug(s"Fulfilling waiting connection request: $stats")) *>
F.delay(logger.debug(s"Fulfilling waiting connection request for $key: $stats")) *>
F.delay(callback(Right(NextConnection(connection, fresh = false))))

case None if waitQueue.isEmpty =>
F.delay(logger.debug(s"Returning idle connection to pool: $stats")) *>
F.delay(logger.debug(s"Returning idle connection to pool for $key: $stats")) *>
addToIdleQueue(connection, key)

case None =>
Expand All @@ -246,20 +249,20 @@ private final class PoolManager[F[_], A <: Connection[F]](
decrConnection(key) *>
F.delay {
if (!connection.isClosed) {
logger.debug(s"Connection returned was busy. Shutting down: $stats")
logger.debug(s"Connection returned was busy for $key. Shutting down: $stats")
connection.shutdown()
}
} *>
findFirstAllowedWaiter.flatMap {
case Some(Waiting(k, callback, _)) =>
F.delay(logger
.debug(s"Connection returned could not be recycled, new connection needed: $stats")) *>
.debug(
s"Connection returned could not be recycled, new connection needed for $key: $stats")) *>
createConnection(k, callback)

case None =>
F.delay(
logger.debug(
s"Connection could not be recycled, no pending requests. Shrinking pool: $stats"))
F.delay(logger.debug(
s"Connection could not be recycled for $key, no pending requests. Shrinking pool: $stats"))
}

/**
Expand All @@ -284,8 +287,8 @@ private final class PoolManager[F[_], A <: Connection[F]](
*/
def release(connection: A): F[Unit] =
semaphore.withPermit {
logger.debug(s"Recycling connection: $stats")
val key = connection.requestKey
logger.debug(s"Recycling connection for $key: $stats")
if (connection.isRecyclable)
releaseRecyclable(key, connection)
else
Expand Down Expand Up @@ -315,16 +318,19 @@ private final class PoolManager[F[_], A <: Connection[F]](
*/
override def invalidate(connection: A): F[Unit] =
semaphore.withPermit {
decrConnection(connection.requestKey) *>
val key = connection.requestKey
decrConnection(key) *>
F.delay(if (!connection.isClosed) connection.shutdown()) *>
findFirstAllowedWaiter.flatMap {
case Some(Waiting(k, callback, _)) =>
F.delay(logger.debug(s"Invalidated connection, new connection needed: $stats")) *>
F.delay(
logger.debug(s"Invalidated connection for $key, new connection needed: $stats")) *>
createConnection(k, callback)

case None =>
F.delay(
logger.debug(s"Invalidated connection, no pending requests. Shrinking pool: $stats"))
logger.debug(
s"Invalidated connection for $key, no pending requests. Shrinking pool: $stats"))
}
}

Expand All @@ -338,7 +344,7 @@ private final class PoolManager[F[_], A <: Connection[F]](
*/
private def disposeConnection(key: RequestKey, connection: Option[A]): F[Unit] =
semaphore.withPermit {
F.delay(logger.debug(s"Disposing of connection: $stats")) *>
F.delay(logger.debug(s"Disposing of connection for $key: $stats")) *>
decrConnection(key) *>
F.delay {
connection.foreach { s =>
Expand Down