-
Notifications
You must be signed in to change notification settings - Fork 787
/
BlazeClient.scala
166 lines (155 loc) · 6.08 KB
/
BlazeClient.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package org.http4s
package client
package blaze
import cats.effect._
import cats.effect.concurrent._
import cats.effect.implicits._
import cats.implicits._
import java.nio.ByteBuffer
import java.util.concurrent.TimeoutException
import org.http4s.blaze.pipeline.Command
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.blazecore.{IdleTimeoutStage, ResponseHeaderTimeoutStage}
import org.log4s.getLogger
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
/** Blaze client implementation */
object BlazeClient {
private[this] val logger = getLogger
/** Construct a new [[Client]] using blaze components
*
* @param manager source for acquiring and releasing connections. Not owned by the returned client.
* @param config blaze client configuration.
* @param onShutdown arbitrary tasks that will be executed when this client is shutdown
*/
@deprecated("Use BlazeClientBuilder", "0.19.0-M2")
def apply[F[_], A <: BlazeConnection[F]](
manager: ConnectionManager[F, A],
config: BlazeClientConfig,
onShutdown: F[Unit],
ec: ExecutionContext)(implicit F: ConcurrentEffect[F]): Client[F] =
makeClient(
manager,
responseHeaderTimeout = config.responseHeaderTimeout,
idleTimeout = config.idleTimeout,
requestTimeout = config.requestTimeout,
scheduler = bits.ClientTickWheel,
ec = ec
)
private[blaze] def makeClient[F[_], A <: BlazeConnection[F]](
manager: ConnectionManager[F, A],
responseHeaderTimeout: Duration,
idleTimeout: Duration,
requestTimeout: Duration,
scheduler: TickWheelExecutor,
ec: ExecutionContext
)(implicit F: ConcurrentEffect[F]) =
Client[F] { req =>
Resource.suspend {
val key = RequestKey.fromRequest(req)
// If we can't invalidate a connection, it shouldn't tank the subsequent operation,
// but it should be noisy.
def invalidate(connection: A): F[Unit] =
manager
.invalidate(connection)
.handleError(e => logger.error(e)("Error invalidating connection"))
def borrow: Resource[F, manager.NextConnection] =
Resource.makeCase(manager.borrow(key)) {
case (_, ExitCase.Completed) =>
F.unit
case (next, ExitCase.Error(_) | ExitCase.Canceled) =>
invalidate(next.connection)
}
def idleTimeoutStage(conn: A) =
Resource.makeCase({
idleTimeout match {
case d: FiniteDuration =>
val stage = new IdleTimeoutStage[ByteBuffer](d, scheduler, ec)
F.delay(conn.spliceBefore(stage)).as(Some(stage))
case _ =>
F.pure(None)
}
}) {
case (_, ExitCase.Completed) => F.unit
case (stageOpt, _) => F.delay(stageOpt.foreach(_.removeStage()))
}
def loop: F[Resource[F, Response[F]]] =
borrow.use { next =>
idleTimeoutStage(next.connection).use { stageOpt =>
val idleTimeoutF = stageOpt match {
case Some(stage) => F.async[TimeoutException](stage.init)
case None => F.never[TimeoutException]
}
val res = next.connection
.runRequest(req, idleTimeoutF)
.map { r =>
Resource.makeCase(F.pure(r)) {
case (_, ExitCase.Completed) =>
F.delay(stageOpt.foreach(_.removeStage()))
.guarantee(manager.release(next.connection))
case _ =>
F.delay(stageOpt.foreach(_.removeStage()))
.guarantee(manager.invalidate(next.connection))
}
}
.recoverWith {
case Command.EOF =>
invalidate(next.connection).flatMap { _ =>
if (next.fresh)
F.raiseError(
new java.net.ConnectException(s"Failed to connect to endpoint: $key"))
else {
loop
}
}
}
responseHeaderTimeout match {
case responseHeaderTimeout: FiniteDuration =>
Deferred[F, Unit].flatMap { gate =>
val responseHeaderTimeoutF: F[TimeoutException] =
F.delay {
val stage =
new ResponseHeaderTimeoutStage[ByteBuffer](
responseHeaderTimeout,
scheduler,
ec)
next.connection.spliceBefore(stage)
stage
}
.bracket(stage =>
F.asyncF[TimeoutException] { cb =>
F.delay(stage.init(cb)) >> gate.complete(())
})(stage => F.delay(stage.removeStage()))
F.racePair(gate.get *> res, responseHeaderTimeoutF)
.flatMap[Resource[F, Response[F]]] {
case Left((r, fiber)) => fiber.cancel.as(r)
case Right((fiber, t)) => fiber.cancel >> F.raiseError(t)
}
}
case _ => res
}
}
}
val res = loop
requestTimeout match {
case d: FiniteDuration =>
F.racePair(
res,
F.cancelable[TimeoutException] { cb =>
val c = scheduler.schedule(new Runnable {
def run() =
cb(Right(new TimeoutException(s"Request timeout after ${d.toMillis} ms")))
}, ec, d)
F.delay(c.cancel)
}
)
.flatMap {
case Left((r, fiber)) => fiber.cancel.as(r)
case Right((fiber, t)) => fiber.cancel >> F.raiseError(t)
}
case _ =>
res
}
}
}
}