/
spray.scala
123 lines (107 loc) · 3.13 KB
/
spray.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package spark
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.util.Timeout
import akka.util.duration._
import akka.pattern.ask
import cc.spray._
import cc.spray.json._
import cc.spray.typeconversion.SprayJsonSupport
import spark.util.AkkaUtils
import spark.streaming.DStream
import be.bigdata.p2._
object SparkSpray extends Directives {
case class Result(stock:Stock, score:Int, time:Long)
object MyJsonProtocol extends DefaultJsonProtocol with SprayJsonSupport {
implicit object StockJsonFormat extends RootJsonFormat[Stock] {
def write(s: Stock) =
JsObject(
"id" -> JsString(s.id),
"keywords" -> s.keywords.toJson
)
def read(value: JsValue) =
value.asJsObject.getFields("id", "keywords") match {
case Seq(JsString(id), JsArray(keywords)) =>
Stock(id, keywords.collect { case JsString(s) => s })
case _ => throw new DeserializationException("Stock expected")
}
}
implicit val ResultFormat = jsonFormat3(Result)
}
class ResultActor() extends Actor {
var cache:Map[String, List[Result]] = Map.empty
def receive = {
case d@(stock:Stock, i:Int, time:Long) => cache = cache + (stock.id -> (Result(stock, i, time) :: cache.get(stock.id).getOrElse(Nil)))
case "results" => sender ! cache
//case x => println(("*"*100)+"ERROR : " + x)
}
}
lazy val actorSystem = SparkAkka.actorSystem
def start() = {
val actor = actorSystem.actorOf(Props(new ResultActor()), name = "results")
(
AkkaUtils.startSprayServer(SparkAkka.actorSystem, SparkAkka.host, SparkAkka.port+1, handler(actor), "Results")
,
actor
)
}
private[this] implicit val timeout = Timeout(10 seconds)
import MyJsonProtocol._
def safeMax(s:Seq[Long]) = if (s.isEmpty) 0 else s.max
private[this] def handler(actor:ActorRef) = {
get {
path("") {
completeWith {
"use /results"
}
} ~
path("results") {
completeWith {
(actor ? "results").mapTo[Map[String, List[Result]]]
}
} ~
path("after") {
parameters('time ?).as((x:Option[Long]) => x) { t =>
completeWith {
val time = t.getOrElse(0L)
(actor ? "results")
.mapTo[Map[String, List[Result]]]
.map { results =>
results.map { case (s, ls) =>
(s, ls.takeWhile(_.time > time))
}
}.map { results =>
(
results,
safeMax(
results
.view
.map(_._2)
.flatten
.map(_.time)
.toSeq
)
)
}
}
}
} ~
pathPrefix("web") {
cache {
getFromResourceDirectory("web")
}
} ~
path("start") {
completeWith {
P2.start
"started"
}
} ~
path("stop") {
completeWith {
P2.stop
"stopped"
}
}
}
}
}