forked from akka/akka-http
/
package.scala
165 lines (138 loc) · 6.68 KB
/
package.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.http.impl
import language.implicitConversions
import java.nio.charset.Charset
import com.typesafe.config.Config
import akka.stream.scaladsl.Source
import akka.stream.stage._
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.reflect.ClassTag
import scala.util.{ Failure, Success }
import scala.util.matching.Regex
import akka.util.ByteString
import akka.actor._
import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse }
package object util {
private[http] val UTF8 = Charset.forName("UTF8")
private[http] val ASCII = Charset.forName("ASCII")
private[http] val ISO88591 = Charset.forName("ISO-8859-1")
private[http] val EmptyByteArray = Array.empty[Byte]
private[http] def actorSystem(implicit refFactory: ActorRefFactory): ExtendedActorSystem =
refFactory match {
case x: ActorContext ⇒ actorSystem(x.system)
case x: ExtendedActorSystem ⇒ x
case x ⇒ throw new IllegalStateException(s"Unknown factory $x")
}
private[http] implicit def enhanceByteArray(array: Array[Byte]): EnhancedByteArray = new EnhancedByteArray(array)
private[http] implicit def enhanceConfig(config: Config): EnhancedConfig = new EnhancedConfig(config)
private[http] implicit def enhanceString_(s: String): EnhancedString = new EnhancedString(s)
private[http] implicit def enhanceRegex(regex: Regex): EnhancedRegex = new EnhancedRegex(regex)
private[http] implicit def enhanceByteStrings(byteStrings: TraversableOnce[ByteString]): EnhancedByteStringTraversableOnce =
new EnhancedByteStringTraversableOnce(byteStrings)
private[http] implicit def enhanceByteStringsMat[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] =
new EnhancedByteStringSource(byteStrings)
private[this] var eventStreamLogger: ActorRef = _
private[http] def installEventStreamLoggerFor(channel: Class[_])(implicit system: ActorSystem): Unit = {
synchronized {
if (eventStreamLogger == null)
eventStreamLogger = system.actorOf(Props[util.EventStreamLogger]().withDeploy(Deploy.local), name = "event-stream-logger")
}
system.eventStream.subscribe(eventStreamLogger, channel)
}
private[http] def installEventStreamLoggerFor[T](implicit ct: ClassTag[T], system: ActorSystem): Unit =
installEventStreamLoggerFor(ct.runtimeClass)
private[http] implicit class AddFutureAwaitResult[T](future: Future[T]) {
/** "Safe" Await.result that doesn't throw away half of the stacktrace */
def awaitResult(atMost: Duration): T = {
Await.ready(future, atMost)
future.value.get match {
case Success(t) ⇒ t
case Failure(ex) ⇒ throw new RuntimeException("Trying to await result of failed Future, see the cause for the original problem.", ex)
}
}
}
private[http] def humanReadableByteCount(bytes: Long, si: Boolean): String = {
val unit = if (si) 1000 else 1024
if (bytes >= unit) {
val exp = (math.log(bytes) / math.log(unit)).toInt
val pre = if (si) "kMGTPE".charAt(exp - 1).toString else "KMGTPE".charAt(exp - 1).toString + 'i'
"%.1f %sB" format (bytes / math.pow(unit, exp), pre)
} else bytes.toString + " B"
}
private[http] implicit class RichHttpRequest(val request: HttpRequest) extends AnyVal {
def debugString: String = s"${request.method.value} ${request.uri.path} ${entityDebugInfo(request.entity)}"
}
private[http] implicit class RichHttpResponse(val response: HttpResponse) extends AnyVal {
def debugString: String = s"${response.status.value} ${entityDebugInfo(response.entity)}"
}
private def entityDebugInfo(e: HttpEntity): String = e match {
case HttpEntity.Empty ⇒ "Empty"
case HttpEntity.Strict(_, data) ⇒ s"Strict(${data.size} bytes)"
case HttpEntity.Default(_, length, _) ⇒ s"Default($length bytes)"
case _: HttpEntity.CloseDelimited ⇒ "CloseDelimited"
case _: HttpEntity.IndefiniteLength ⇒ "IndefiniteLength"
case _: HttpEntity.Chunked ⇒ "Chunked"
}
}
package util {
import akka.http.scaladsl.model.{ ContentType, EntityStreamException, ErrorInfo, HttpEntity }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import scala.concurrent.duration.FiniteDuration
private[http] class ToStrict(timeout: FiniteDuration, maxBytes: Option[Long], contentType: ContentType)
extends GraphStage[FlowShape[ByteString, HttpEntity.Strict]] {
val byteStringIn = Inlet[ByteString]("ToStrict.byteStringIn")
val httpEntityOut = Outlet[HttpEntity.Strict]("ToStrict.httpEntityOut")
override def initialAttributes = Attributes.name("ToStrict")
override val shape = FlowShape(byteStringIn, httpEntityOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
val bytes = ByteString.newBuilder
private var emptyStream = false
override def preStart(): Unit = scheduleOnce("ToStrictTimeoutTimer", timeout)
setHandler(httpEntityOut, new OutHandler {
override def onPull(): Unit = {
if (emptyStream) {
push(httpEntityOut, HttpEntity.Strict(contentType, ByteString.empty))
completeStage()
} else pull(byteStringIn)
}
})
setHandler(byteStringIn, new InHandler {
override def onPush(): Unit = {
bytes ++= grab(byteStringIn)
maxBytes match {
case Some(max) if bytes.length > max ⇒
failStage(new EntityStreamException(new ErrorInfo("Request too large", s"Request was longer than the maximum of $max")))
case _ ⇒
pull(byteStringIn)
}
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(httpEntityOut)) {
push(httpEntityOut, HttpEntity.Strict(contentType, bytes.result()))
completeStage()
} else emptyStream = true
}
})
override def onTimer(key: Any): Unit =
failStage(new java.util.concurrent.TimeoutException(
s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data"))
}
override def toString = "ToStrict"
}
private[http] class EventStreamLogger extends Actor with ActorLogging {
def receive = { case x ⇒ log.warning(x.toString) }
}
private[http] trait LogMessages extends ActorLogging { this: Actor ⇒
def logMessages(mark: String = "")(r: Receive): Receive =
new Receive {
def isDefinedAt(x: Any): Boolean = r.isDefinedAt(x)
def apply(x: Any): Unit = {
log.debug(s"[$mark] received: $x")
r(x)
}
}
}
}