Skip to content

Commit

Permalink
[feature] stdlib: Added slaveok mode to MongoDB driver.
Browse files Browse the repository at this point in the history
  • Loading branch information
nrs135 authored and OpaOnWindowsNow committed Feb 13, 2012
1 parent b256e98 commit d292986
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 61 deletions.
6 changes: 6 additions & 0 deletions libbase/mongo.ml
Expand Up @@ -271,6 +271,12 @@ let start_query m rid flags ns numberToSkip numberToReturn =
Bson.add_le_int32 m.Bson.buf numberToSkip;
Bson.add_le_int32 m.Bson.buf numberToReturn

let set_query_flags m flags =
if (geti32 m.Bson.buf 12) = _OP_QUERY
then Bson.FillbufString.lei32 m.Bson.buf.Buf.str 16 ((geti32 m.Bson.buf 16) lor flags)

let get_opCode m = geti32 m.Bson.buf 12

(*struct OP_GETMORE {
MsgHeader header; // standard message header
int32 ZERO; // 0 - reserved for future use
Expand Down
2 changes: 2 additions & 0 deletions libbase/mongo.mli
Expand Up @@ -58,6 +58,8 @@ val free : mongo_buf -> unit
val start_insert : mongo_buf -> int32 -> int -> string -> unit
val start_update : mongo_buf -> int32 -> int -> string -> unit
val start_query : mongo_buf -> int32 -> int -> string -> int -> int -> unit
val set_query_flags : mongo_buf -> int -> unit
val get_opCode : mongo_buf -> int
val start_getmore : mongo_buf -> int32 -> string -> int -> int64 -> unit
val start_delete : mongo_buf -> int32 -> int -> string -> unit
val start_kill_cursors : mongo_buf -> int32 -> int64 list -> unit
Expand Down
6 changes: 6 additions & 0 deletions opabsl/mlbsl/bslMongo.ml
Expand Up @@ -332,6 +332,12 @@ let query m flags ns numberToSkip numberToReturn query returnFieldSelector_opt =
| None -> ());
Mongo.finish m

##register set_query_flags: Mongo.mongo_buf, int -> void
let set_query_flags m flags = Mongo.set_query_flags m flags

##register get_opCode: Mongo.mongo_buf -> int
let get_opCode m = Mongo.get_opCode m

##register get_more: Mongo.mongo_buf, string, int, Mongo.cursorID -> void
let get_more m ns numberToReturn cursorID =
Mongo.start_getmore m (nextrid()) ns numberToReturn cursorID;
Expand Down
5 changes: 5 additions & 0 deletions stdlib/apis/mongo/collection.opa
Expand Up @@ -277,6 +277,11 @@ MongoCollection = {{
partial(c:Mongo.collection('value)): Mongo.collection('value)
= {c with db={ c.db with query_flags=Bitwise.lor(c.db.query_flags,MongoCommon.PartialBit) }}

/**
* Get the current SlaveOK status.
**/
get_slaveok(c:Mongo.collection('value)): bool = MongoDriver.get_slaveok(c.db.mongo)

/**
* Insert an OPA value into a collection.
**/
Expand Down
5 changes: 5 additions & 0 deletions stdlib/apis/mongo/common.opa
Expand Up @@ -56,6 +56,7 @@ type Mongo.failure =
/ {DocError : Bson.document}
/ {Incomplete}
/ {NotFound}
/ {SlaveOK}

/**
* Mongo success status, just a document.
Expand Down Expand Up @@ -140,6 +141,9 @@ MongoCommon = {{
@private ML = MongoLog
@private H = Bson.Abbrevs

/** Code for query operations */
_OP_QUERY = 2004

/** Generate a driver error message outcome **/
failErr(msg:string): outcome('a,Mongo.failure) = {failure={Error=msg}}

Expand Down Expand Up @@ -198,6 +202,7 @@ MongoCommon = {{
| {OK} -> "Ok"
| {Incomplete} -> "Incomplete"
| {NotFound} -> "NotFound"
| {SlaveOK} -> "SlaveOK"

/** Make an error report string out of a [Mongo.result] value, will print "<ok>" if no error. **/
string_of_result(result:Mongo.result): string = outcome_map(result, Bson.string_of_doc_error, string_of_failure)
Expand Down
40 changes: 30 additions & 10 deletions stdlib/apis/mongo/connection.opa
Expand Up @@ -81,6 +81,7 @@ type Mongo.param = {
replname:option(string);
bufsize:int;
pool_max:int;
allow_slaveok:bool;
log:bool;
seeds:list(Mongo.mongo_host);
}
Expand All @@ -98,6 +99,7 @@ MongoConnection = {{
replname={none};
bufsize=50*1024;
pool_max=2;
allow_slaveok=false;
log=false;
seeds=default_seeds;
}:Mongo.param)
Expand Down Expand Up @@ -173,6 +175,12 @@ MongoConnection = {{
param_doc = "<int>"
on_param(p) = parser n={Rule.natural} -> {no_params = add_param((p -> { p with pool_max=Int.max(n,1) }),p)}
},
{CommandLine.default_parser with
names = ["--mongo-allow-slaveok", "--mongoallowslaveok", "--mk", "-mk"]
description = "Allow SlaveOK mode (ReplSet only)"
param_doc = "<bool>"
on_param(p) = parser b={Rule.bool} -> {no_params = add_param((p -> { p with allow_slaveok=b }),p)}
},
{CommandLine.default_parser with
names = ["--mongo-log", "--mongolog", "--ml", "-ml"]
description = "Enable MongoLog logging"
Expand Down Expand Up @@ -217,9 +225,10 @@ MongoConnection = {{
params_done.set(true)
@private
open_(dbo:outcome(Mongo.db,Mongo.failure),name:string): outcome(Mongo.mongodb,Mongo.failure) =
open_(dbo:outcome((bool,Mongo.db),Mongo.failure),name:string): outcome(Mongo.mongodb,Mongo.failure) =
match dbo with
| {success=mongo} ->
| {success=(slaveok,mongo)} ->
do SocketPool.setslaveok(mongo.pool,slaveok)
(match SocketPool.gethost(mongo.pool) with
| (addr,port) ->
db = {~mongo; ~name; bufsize=mongo.bufsize; ~addr; ~port; link_count=Mutable.make(1);
Expand Down Expand Up @@ -248,22 +257,29 @@ MongoConnection = {{
*
* Example: [openraw(name, bufsize, pool_max, log, host, port)]
**/
openraw(name:string, bufsize:int, pool_max:int, log:bool, addr:string, port:int)
openraw(name:string, bufsize:int, pool_max:int, allow_slaveok:bool, log:bool, addr:string, port:int)
: outcome(Mongo.mongodb,Mongo.failure) =
open_(MongoDriver.open(bufsize,pool_max,false,addr,port,log),name)
open_((match MongoDriver.open(bufsize,pool_max,allow_slaveok,false,addr,port,log) with
| {success=m} -> {success=(false,m)}
| {~failure} -> {~failure}),name)
/**
* Open a connection to a replica set starting from the given list of seeds.
*
* Example: [replraw(name, bufsize, pool_max, log, seeds)]
*
* This routine causes a serach for the current host list among the seeds
* and then searches for the primary among the hosts. Rconnection logic
* and then searches for the primary among the hosts. Reconnection logic
* is enabled.
**/
replraw(name:string, bufsize:int, pool_max:int, log:bool, seeds:list(Mongo.mongo_host))
replraw(name:string, bufsize:int, pool_max:int, allow_slaveok:bool, log:bool, seeds:list(Mongo.mongo_host))
: outcome(Mongo.mongodb,Mongo.failure) =
open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,pool_max,log,seeds)),name)
open_(MongoReplicaSet.connect(MongoReplicaSet.init(name,bufsize,pool_max,allow_slaveok,log,seeds)),name)
/**
* Get the current SlaveOK status.
**/
get_slaveok(m:Mongo.mongodb): bool = MongoDriver.get_slaveok(m.mongo)
/**
* Force a reconnection.
Expand All @@ -288,11 +304,11 @@ MongoConnection = {{
match List.find((p -> p.name == name),params.get()) with
| {some=p} ->
(match p.replname with
| {some=rn} -> replraw(rn,p.bufsize,p.pool_max,p.log,p.seeds)
| {some=rn} -> replraw(rn,p.bufsize,p.pool_max,p.allow_slaveok,p.log,p.seeds)
| {none} ->
(match p.seeds with
| [] -> {failure={Error="MongoConnection.open: No host for plain connection"}}
| [(host,port)] -> openraw(name,p.bufsize,p.pool_max,p.log,host,port)
| [(host,port)] -> openraw(name,p.bufsize,p.pool_max,p.allow_slaveok,p.log,host,port)
| _ -> {failure={Error="MongoConnection.open: Multiple hosts for plain connection"}}))
| {none} -> {failure={Error="MongoConnection.open: No such replica name {name}"}}
Expand Down Expand Up @@ -342,7 +358,11 @@ MongoConnection = {{
pool_max(db:Mongo.mongodb, pool_max:int): Mongo.mongodb =
{ db with mongo={ db.mongo with pool_max=Int.max(pool_max,1) } }
/** Chenge the bufsize hint (only applies to newly created buffers) **/
/** Change the SlaveOk allowed status **/
allow_slaveok(db:Mongo.mongodb, allow_slaveok:bool): Mongo.mongodb =
{ db with mongo={ db.mongo with ~allow_slaveok } }
/** Change the bufsize hint (only applies to newly created buffers) **/
bufsize(db:Mongo.mongodb, bufsize:int): Mongo.mongodb =
{ db with mongo={ db.mongo with ~bufsize } }
Expand Down

0 comments on commit d292986

Please sign in to comment.