forked from mdedetrich/akka-monix-test
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAkkaHTTPExample.scala
81 lines (70 loc) · 2.34 KB
/
AkkaHTTPExample.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import com.typesafe.scalalogging.StrictLogging
import monix.eval.Task
import monix.execution.Scheduler
import monix.mdc.MonixMDCAdapter
import org.slf4j.MDC
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Random, Try}
object AkkaHTTPExample extends App with StrictLogging {
MonixMDCAdapter.initialize()
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val scheduler: Scheduler = Scheduler.traced
val addCorrelationIdHeader: Directive0 =
mapInnerRoute(_.andThen(_.flatMap {
case result @ RouteResult.Complete(response) =>
Future {
val l = MDC.get("correlationId")
if (l != null)
RouteResult.Complete(response.addHeader(RawHeader("correlationId", l)))
else
result
}
case result @ RouteResult.Rejected(_) =>
Future {
result
}
}))
def onCompleteTask[T](task: => Task[T]): Directive1[Try[T]] =
Directive { inner => ctx =>
task.runToFuture.transformWith(t => inner(Tuple1(t))(ctx))
}
def routes: Route =
path("hello" / IntNumber) { id =>
get {
addCorrelationIdHeader {
onCompleteTask(Task {
MDC.put("correlationId", id.toString)
}) { _ =>
complete(StatusCodes.OK)
}
}
}
}
val serverBinding = Await.result(Http(actorSystem).bindAndHandle(routes, "localhost", 8080), 1.minutes)
val http = Http()
val futures = List.fill(512) {
val int = Random.nextInt(1000)
for {
result <- http.singleRequest(
HttpRequest(
HttpMethods.GET,
Uri(s"http://localhost:8080/hello/$int")
))
header = result.getHeader("correlationId").asScala
output = s"input $int: output: ${header.map(_.value()).getOrElse("no header")}"
_ = result.discardEntityBytes()
_ = println(output)
} yield ()
}
Await.result(Future.sequence(futures), 1.minutes)
Await.result(serverBinding.terminate(1.minutes), 1.minutes)
System.exit(0)
}