Permalink
Browse files

[reactivemongo] Fallback functions for write errors

  • Loading branch information...
cchantep
cchantep committed Sep 13, 2014
1 parent 8f9d473 commit 95e66462f6c225a84be1b69732a15385f61261c4
@@ -48,9 +48,9 @@ private[reactivemongo] class Actor(
r @ CheckedWriteRequest(op, doc, GetLastError(_, _, _, _)))
val req = Request(op.fullCollectionName, doc.merged)
val chan = r()._1.channelIdHint getOrElse 1
val chanId = r()._1.channelIdHint getOrElse 1
println(s"oper = ${MongoDB.WriteOp(op)}, chan = $chan, ${req.body.elements.toList}")
println(s"oper = ${MongoDB.WriteOp(op)}, chan = $chanId, ${req.body.elements.toList}")
// op = Insert(0,test-db.a-col)
val exp = new ExpectingResponse(msg)
@@ -76,10 +76,11 @@ private[reactivemongo] class Actor(
*/
// Success:
exp.promise.success(MongoDB.WriteSuccess(chan).get)
//exp.promise.success(MongoDB.WriteSuccess(chan).get)
// Error:
//exp.promise.success(MongoDB.WriteError(chan, "Err_1").get)
exp.promise.success(NoWriteResponse(chanId, msg.toString))
case msg @ RequestMakerExpectingResponse(RequestMaker(
op @ RQuery(_ /*flags*/ , coln, off, len), doc, _ /*pref*/ , chanId))
@@ -112,4 +113,11 @@ private[reactivemongo] class Actor(
case Success(resp) resp
case _ MongoDB.MkQueryError(chanId)
}
/** Fallback response when no handler provides a write response. */
@inline private def NoWriteResponse(chanId: Int, req: String): Response =
MongoDB.WriteError(chanId, s"No response: $req") match {
case Success(resp) resp
case _ MongoDB.MkQueryError(chanId)
}
}
@@ -76,10 +76,9 @@ object QueryHandler {
*
* }}}
*/
implicit def SimpleQueryHandler(f: Request QueryResponse): QueryHandler =
new QueryHandler {
def apply(chanId: Int, q: Request): Option[Try[Response]] = f(q)(chanId)
}
implicit def SimpleQueryHandler(f: Request PreparedResponse): QueryHandler = new QueryHandler {
def apply(chanId: Int, q: Request): Option[Try[Response]] = f(q)(chanId)
}
/**
* Empty query handler, not handling any request.
@@ -98,10 +98,22 @@ object MongoDB {
case _ None
}
private[reactivemongo] def MkQueryError(channelId: Int = 0): Response = {
val buf = ChannelBuffers.copiedBuffer(ByteOrder.LITTLE_ENDIAN, Array[Byte](76, 0, 0, 0, 16, -55, -63, 115, -49, 116, 119, 55, 4, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 40, 0, 0, 0, 2, 36, 101, 114, 114, 0, 25, 0, 0, 0, 70, 97, 105, 108, 115, 32, 116, 111, 32, 99, 114, 101, 97, 116, 101, 32, 114, 101, 115, 112, 111, 110, 115, 101, 0, 0)) // "Fails to create response"
val in = ChannelBuffers.unmodifiableBuffer(buf)
private[reactivemongo] def MkQueryError(channelId: Int = 0): Response =
mkError(channelId, Array[Byte](76, 0, 0, 0, 16, -55, -63, 115, -49, 116, 119, 55, 4, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 40, 0, 0, 0, 2, 36, 101, 114, 114, 0, 25, 0, 0, 0, 70, 97, 105, 108, 115, 32, 116, 111, 32, 99, 114, 101, 97, 116, 101, 32, 114, 101, 115, 112, 111, 110, 115, 101, 0, 0)) // "Fails to create response"
Response(MessageHeader(in), Reply(in), in, ResponseInfo(channelId))
private[reactivemongo] def MkWriteError(channelId: Int = 0): Response =
mkError(channelId, Array[Byte](-126, 0, 0, 0, -29, 50, 14, 73, -115, -6, 46, 67, 4, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 94, 0, 0, 0, 16, 111, 107, 0, 0, 0, 0, 0, 2, 101, 114, 114, 0, 25, 0, 0, 0, 70, 97, 105, 108, 115, 32, 116, 111, 32, 99, 114, 101, 97, 116, 101, 32, 114, 101, 115, 112, 111, 110, 115, 101, 0, 2, 101, 114, 114, 109, 115, 103, 0, 25, 0, 0, 0, 70, 97, 105, 108, 115, 32, 116, 111, 32, 99, 114, 101, 97, 116, 101, 32, 114, 101, 115, 112, 111, 110, 115, 101, 0, 16, 99, 111, 100, 101, 0, -1, -1, -1, -1, 0))
@inline private def mkError(channelId: Int, docs: Array[Byte]): Response = {
val buf = ChannelBuffers.unmodifiableBuffer(
ChannelBuffers.copiedBuffer(ByteOrder.LITTLE_ENDIAN, docs))
Response(MessageHeader(buf), Reply(buf), buf, ResponseInfo(channelId))
}
}
/** Response prepared for Mongo request executed with Acolyte driver. */
trait PreparedResponse {
/** Applies this response to specified Mongo channel. */
def apply(chanId: Int): Option[Try[Response]]
}
@@ -5,19 +5,10 @@ import scala.util.Try
import _root_.reactivemongo.bson.BSONDocument
import _root_.reactivemongo.core.protocol.Response
/** Response to Mongo query executed with Acolyte driver. */
sealed trait QueryResponse {
/** Applies this response to specified Mongo channel. */
def apply(chanId: Int): Option[Try[Response]]
}
/** Query response companion. */
object QueryResponse {
/** Mongo Error, in response to some request. */
/** Successful result */
/** Creates a response for given `body`. */
def apply[T](body: T)(implicit mkResponse: QueryResponseMaker[T]): QueryResponse = new QueryResponse {
def apply[T](body: T)(implicit mkResponse: QueryResponseMaker[T]): PreparedResponse = new PreparedResponse {
def apply(chanId: Int) = mkResponse(chanId, body)
}

0 comments on commit 95e6646

Please sign in to comment.