-
Notifications
You must be signed in to change notification settings - Fork 21
/
AkkaHttpBackend.scala
162 lines (136 loc) · 5.61 KB
/
AkkaHttpBackend.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
* Copyright 2018 Facundo Viale
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jarlakxen.drunk.backend
import java.io.{ File, IOException, UnsupportedEncodingException }
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import akka.http.scaladsl.{ ClientTransport, Http, HttpsConnectionContext }
import akka.http.scaladsl.coding.{ Deflate, Gzip, NoCoding }
import akka.http.scaladsl.model.{ ContentTypes, HttpEntity, HttpHeader, HttpMethods, HttpRequest, HttpResponse, Uri }
import akka.http.scaladsl.model.headers.HttpEncodings
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.github.jarlakxen.drunk._
class AkkaHttpBackend private[AkkaHttpBackend] (
actorSystem: ActorSystem,
terminateActorSystemOnClose: Boolean,
opts: ConnectionOptions,
customHttpsContext: Option[HttpsConnectionContext],
customConnectionPoolSettings: Option[ConnectionPoolSettings],
customLog: Option[LoggingAdapter],
headers: immutable.Seq[HttpHeader]) {
private implicit val as: ActorSystem = actorSystem
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private val http = Http()
private val connectionSettings =
ClientConnectionSettings(actorSystem)
.withConnectingTimeout(opts.connectionTimeout)
private val connectionPoolSettings = {
val base = customConnectionPoolSettings.getOrElse(ConnectionPoolSettings(actorSystem))
opts.proxy match {
case None => base
case Some(p) =>
base.withTransport(ClientTransport.httpsProxy(p.inetSocketAddress))
}
}
def send(uri: Uri, body: String): Future[(Int, String)] = {
implicit val ec: ExecutionContext = as.dispatcher
val req = HttpRequest(HttpMethods.POST, uri, headers, HttpEntity(ContentTypes.`application/json`, body))
val res = http.singleRequest(
req,
settings = connectionPoolSettings,
connectionContext = customHttpsContext.getOrElse(http.defaultClientHttpsContext),
log = customLog.getOrElse(actorSystem.log))
res.flatMap { hr =>
val code = hr.status.intValue()
val charsetFromHeaders = encodingFromContentType(hr.entity.contentType.toString).getOrElse("utf-8")
val decodedResponse = decodeResponse(hr)
val stringBody = bodyToString(decodedResponse, charsetFromHeaders)
if (code >= 200 && code < 300) {
stringBody.map((code, _))
} else {
stringBody.flatMap { body => Future.failed(new RuntimeException(s"${uri.toString} return $code with body: $body")) }
}
}
}
private def encodingFromContentType(ct: String): Option[String] =
ct.split(";").map(_.trim.toLowerCase).collectFirst {
case s if s.startsWith("charset=") => s.substring(8)
}
private def decodeResponse(response: HttpResponse): HttpResponse = {
val decoder = response.encoding match {
case HttpEncodings.gzip => Gzip
case HttpEncodings.deflate => Deflate
case HttpEncodings.identity => NoCoding
case ce =>
throw new UnsupportedEncodingException(s"Unsupported encoding: $ce")
}
decoder.decodeMessage(response)
}
private def bodyToString(hr: HttpResponse, charsetFromHeaders: String): Future[String] = {
implicit val ec: ExecutionContext = as.dispatcher
hr.entity.dataBytes
.runFold(ByteString.empty)(_ ++ _)
.map(_.decodeString(charsetFromHeaders))
}
def close(): Unit =
if (terminateActorSystemOnClose) actorSystem.terminate()
}
object AkkaHttpBackend {
val ContentTypeHeader = "Content-Type"
def apply(
options: ConnectionOptions = ConnectionOptions.Default,
customHttpsContext: Option[HttpsConnectionContext] = None,
customConnectionPoolSettings: Option[ConnectionPoolSettings] = None,
customLog: Option[LoggingAdapter] = None,
headers: immutable.Seq[HttpHeader] = Nil): AkkaHttpBackend =
new AkkaHttpBackend(
ActorSystem("sttp"),
terminateActorSystemOnClose = true,
options,
customHttpsContext,
customConnectionPoolSettings,
customLog,
headers)
/**
* @param actorSystem The actor system which will be used for the http-client
* actors.
* @param ec The execution context for running non-network related operations,
* e.g. mapping responses. Defaults to the global execution
* context.
*/
def usingActorSystem(
actorSystem: ActorSystem,
options: ConnectionOptions = ConnectionOptions.Default,
customHttpsContext: Option[HttpsConnectionContext] = None,
customConnectionPoolSettings: Option[ConnectionPoolSettings] = None,
customLog: Option[LoggingAdapter] = None,
headers: immutable.Seq[HttpHeader] = Nil): AkkaHttpBackend =
new AkkaHttpBackend(
actorSystem,
terminateActorSystemOnClose = false,
options,
customHttpsContext,
customConnectionPoolSettings,
customLog,
headers)
}