/
Akka.scala
68 lines (58 loc) · 2.09 KB
/
Akka.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package acolyte.reactivemongo
import scala.concurrent.{ ExecutionContext, Future }
import com.typesafe.config.Config
import akka.actor.{ ActorRef, ActorSystem ⇒ AkkaSystem, Props }
import reactivemongo.core.actors.{
Close,
CheckedWriteRequestExpectingResponse,
RequestMakerExpectingResponse
}
import reactivemongo.core.protocol.{ Query ⇒ RQuery, RequestMaker }
/** Akka companion for Acolyte mongo system. */
private[reactivemongo] object Akka {
/**
* Creates an Acolyte actor system for ReactiveMongo use.
*
* {{{
* import acolyte.reactivemongo.MongoSystem
* import akka.actor.ActorSystem
*
* val mongo: ActorSystem = Akka.actorSystem()
* }}}
*
* @param handler Connection handler
* @param name Actor system name (default: "ReactiveMongoAcolyte")
*/
def actorSystem(handler: ConnectionHandler, name: String = "ReactiveMongoAcolyte"): AkkaSystem = new ActorSystem(AkkaSystem(name), new ActorRefFactory() {
def before(system: AkkaSystem, next: ActorRef): ActorRef = {
system actorOf Props(classOf[Actor], handler, next)
}
})
}
private[reactivemongo] class Actor(
handler: ConnectionHandler, next: ActorRef /* TODO: Remove */ ) extends akka.actor.Actor {
def receive = {
case msg @ CheckedWriteRequestExpectingResponse(req) ⇒
println(s"wreq = $req")
/*
CheckedWriteRequest(Insert(0,test-db.a-col),BufferSequence(DynamicChannelBuffer(ridx=0, widx=37, cap=64),WrappedArray()),GetLastError(false,None,0,false))
*/
next forward msg
case msg @ RequestMakerExpectingResponse(RequestMaker(
op @ RQuery(_ /*flags*/ , coln, off, len), doc, _ /*pref*/ , chanId)) ⇒
val exp = new ExpectingResponse(msg)
handler queryHandler Query(coln, doc.merged) match {
case Some(body) ⇒
println(s"query = ${body}")
exp.promise.success(MongoDB.Success(chanId getOrElse 1, body: _*).get)
case _ ⇒ ???
}
case close @ Close ⇒ /* Do nothing: next forward close */
case msg ⇒
/*
println(s"message = $msg")
next forward msg
*/
()
}
}