Skip to content
Browse files

user functions (wip)

  • Loading branch information...
1 parent c7f59a3 commit 8bfc7821763f4eab3775bec06313c6cfc83eeccf Romain Slootmaekers committed
View
2 _tags
@@ -6,6 +6,7 @@ true: package(lwt.unix)
true: package(oUnit)
true: package(bz2)
true: package(str)
+true: package(dynlink)
"doc": include
"src":include
"src/otc":include
@@ -20,5 +21,6 @@ true: package(str)
"src/system": include
"src/inifiles": include
"src/nursery": include
+"src/plugins": include
"examples/ocaml": include
<**/arakoon.*{byte,native}>: is_main
View
3 cfg/arakoon.ini
@@ -9,6 +9,9 @@ cluster = arakoon_0,arakoon_1,arakoon_2
#name of the cluster:
cluster_id = ricky
+#plugins: these need to be available in the home of EACH node
+plugins = plugin_update_max
+
# DANGER: only set this if you know what you're doing,
# and understand the risk.
# (in the 2 node case,
View
77 examples/ocaml/plugin_demo.ml
@@ -0,0 +1,77 @@
+open Lwt
+open Arakoon_remote_client
+open Arakoon_client
+
+let make_address ip port =
+ let ha = Unix.inet_addr_of_string ip in
+ Unix.ADDR_INET (ha,port)
+
+let with_client cluster_id (ip,port) f =
+ let sa = make_address ip port in
+ let do_it connection =
+ make_remote_client cluster_id connection >>= fun client ->
+ f client
+ in
+ Lwt_io.with_connection sa do_it
+
+let find_master cluster_id cfgs =
+ let rec loop = function
+ | [] -> Lwt.fail (Failure "too many nodes down")
+ | cfg :: rest ->
+ begin
+ let _,(ip, port) = cfg in
+ let sa = make_address ip port in
+ Lwt.catch
+ (fun () ->
+ Lwt_io.with_connection sa
+ (fun connection ->
+ make_remote_client cluster_id connection
+ >>= fun client ->
+ client # who_master ()) >>= function
+ | None -> Lwt.fail (Failure "No Master")
+ | Some m -> Lwt.return m)
+ (function
+ | Unix.Unix_error(Unix.ECONNREFUSED,_,_ ) -> loop rest
+ | exn -> Lwt.fail exn
+ )
+ end
+ in loop cfgs
+
+
+let with_master_client cluster_id cfgs f =
+ find_master cluster_id cfgs >>= fun master_name ->
+ let master_cfg = List.assoc master_name cfgs in
+ with_client cluster_id master_cfg f
+
+
+let vo2s = function
+ | None -> "None"
+ | Some v -> Printf.sprintf "got %S" v
+
+let plugin_demo (client:Arakoon_client.client) =
+ let n = "update_max" in
+ client # user_function n (Some "0") >>= fun _ ->
+ client # user_function n (Some "23") >>= fun _ ->
+ client # user_function n (Some "5") >>= fun vo ->
+ Lwt_io.printl (vo2s vo) >>= fun () ->
+ Lwt.catch
+ (fun () -> client # user_function n (Some "x")
+ >>= fun vo -> Lwt_io.printl (vo2s vo)
+ )
+ (function e ->
+ let s = Printexc.to_string e in
+ Lwt_io.printlf "oops %s" s
+ )
+
+
+let _ =
+ let cluster_id = "ricky" in
+ let cfgs = [
+ ("arakoon_0",("127.0.0.1",4000));
+ (*
+ ("arakoon_1",("127.0.0.1",4001));
+ ("arakoon_2",("127.0.0.1",4002))
+ *)
+ ]
+ in
+ Lwt_main.run (with_master_client cluster_id cfgs plugin_demo)
View
4 src/client/arakoon_remote_client.ml
@@ -69,6 +69,10 @@ object(self: #Arakoon_client.client)
request (fun buf -> test_and_set_to buf key expected wanted) >>= fun () ->
response ic Llio.input_string_option
+ method user_function name po =
+ request (fun buf -> user_function_to buf name po) >>= fun () ->
+ response id Llio.input_string_option
+
method multi_get ?(allow_dirty=false) keys =
request (fun buf -> multiget_to buf ~allow_dirty keys) >>= fun () ->
response ic value_list
View
16 src/client/client_protocol.ml
@@ -198,6 +198,22 @@ let one_command (ic,oc) (backend:Backend.backend) =
Llio.output_string_option oc vo >>= fun () ->
Lwt_io.flush oc
end
+ | USER_FUNCTION ->
+ begin
+ Llio.input_string ic >>= fun name ->
+ Llio.input_string_option ic >>= fun po ->
+ Lwt.catch
+ (fun () ->
+ begin
+ backend # user_function name po >>= fun ro ->
+ Llio.output_int oc 0 >>= fun () ->
+ Llio.output_string_option oc ro
+ end
+ )
+ (handle_exception oc)
+ >>= fun () ->
+ Lwt_io.flush oc
+ end
| PREFIX_KEYS ->
begin
Llio.input_bool ic >>= fun allow_dirty ->
View
14 src/client/common.ml
@@ -50,6 +50,7 @@ type client_command =
| EXPECT_PROGRESS_POSSIBLE
| STATISTICS
| COLLAPSE_TLOGS
+ | USER_FUNCTION
| SET_RANGE
| GET_ROUTING
| SET_ROUTING
@@ -72,9 +73,10 @@ let code2int = [
EXPECT_PROGRESS_POSSIBLE, 0x12l;
STATISTICS , 0x13l;
COLLAPSE_TLOGS , 0x14l;
- SET_RANGE , 0x15l;
- GET_ROUTING , 0x16l;
- SET_ROUTING , 0x17l;
+ USER_FUNCTION , 0x15l;
+ SET_RANGE , 0x16l;
+ GET_ROUTING , 0x17l;
+ SET_ROUTING , 0x18l;
]
let int2code =
@@ -202,6 +204,12 @@ let test_and_set_to b key expected wanted =
Llio.string_option_to b expected;
Llio.string_option_to b wanted
+let user_function_to b name po =
+ command_to b USER_FUNCTION;
+ Llio.string_to b name;
+ Llio.int_to b (List.length keys);
+ List.iter (Llio.string_to b) keys
+
let multiget_to b ~allow_dirty keys =
command_to b MULTI_GET;
Llio.bool_to b allow_dirty;
View
2 src/node/backend.ml
@@ -64,6 +64,8 @@ class type backend = object
method collapse : int -> (int -> unit Lwt.t) -> (unit -> unit Lwt.t) -> unit Lwt.t
+ method user_function: string -> string option -> (string option) Lwt.t
+
method set_range : Range.t -> unit Lwt.t
method get_routing: unit -> Routing.t Lwt.t
View
33 src/node/local_store.ml
@@ -125,6 +125,14 @@ let _test_and_set bdb key expected wanted =
end
| Some v' -> None
+open Registry
+
+let _user_function bdb (name:string) (po:string option) =
+ let f = Registry.lookup name in
+ let ro = f bdb po in
+ ro
+
+
let _set_master bdb master (lease_start:int64) =
Bdb.put bdb __master_key master;
let buffer = Buffer.create 8 in
@@ -138,6 +146,8 @@ let rec _sequence bdb updates =
| Update.Delete key -> _delete bdb key
| Update.TestAndSet(key,expected, wanted) ->
let _ = _test_and_set bdb key expected wanted in () (* TODO: do we want this? *)
+ | Update.UserFunction(name,po) ->
+ let _ = _user_function bdb name po in ()
| Update.MasterSet (m,ls) -> _set_master bdb m ls
| Update.Sequence us -> _sequence bdb us
| Update.SetRange range -> _set_range bdb range
@@ -311,17 +321,32 @@ object(self: #store)
| ex -> Lwt.fail ex
)
-
+ method user_function name (po:string option) =
+ Lwt_log.debug_f "user_function :%s" name >>= fun () ->
+ Lwt.catch (
+ fun () ->
+ Hotc.transaction db
+ (fun db ->
+ _incr_i db >>= fun () ->
+ let (ro:string option) = _user_function db name po in
+ Lwt.return ro)
+ )
+ (function
+ | ex ->
+ Hotc.transaction db (fun db -> _incr_i db)
+ >>= fun () -> Lwt.fail ex)
+
+
method consensus_i () =
Hotc.transaction db (fun db -> _consensus_i db)
-
+
method close () =
Hotc.close db >>= fun () ->
Lwt_log.debug "local_store :: close () " >>= fun () ->
Lwt.return ()
-
+
method get_filename () = Hotc.filename db
-
+
method reopen f =
Lwt_log.debug "local_store :: reopen() " >>= fun () ->
Hotc.reopen db f >>= fun () ->
View
4 src/node/mem_store.ml
@@ -174,6 +174,10 @@ object (self: #store)
method get_filename () = failwith "not supported"
+ method user_function name po =
+ Lwt_log.debug_f "mem_store :: user_function %s" name >>= fun () ->
+ Lwt.return None
+
method set_range range =
Lwt_log.debug_f "set_range %s" (Range.to_string range) >>= fun () ->
_range <- range;
View
16 src/node/node_cfg.ml
@@ -49,6 +49,7 @@ module Node_cfg = struct
quorum_function: int -> int;
_lease_period: int;
cluster_id : string;
+ plugins: string list;
}
let make_test_config n_nodes master lease_period =
@@ -85,7 +86,9 @@ module Node_cfg = struct
_master = master;
quorum_function = quorum_function;
_lease_period = lease_period;
- cluster_id = cluster_id}
+ cluster_id = cluster_id;
+ plugins = ["plugin_update_max"];
+ }
in
cluster_cfg
@@ -116,6 +119,12 @@ module Node_cfg = struct
let _node_names inifile = _get_string_list inifile "global" "cluster"
+ let _plugins inifile =
+ try
+ _get_string_list inifile "global" "plugins"
+ with Inifiles.Invalid_element _ -> []
+
+
let _get_lease_period inifile =
try
let les = (inifile # getval "global" "lease_period") in
@@ -212,6 +221,7 @@ module Node_cfg = struct
let inifile = new Inifiles.inifile config_file in
let fm = _forced_master inifile in
let nodes = _node_names inifile in
+ let plugin_names = _plugins inifile in
let cfgs, remaining = List.fold_left
(fun (a,remaining) section ->
if List.mem section nodes || _get_bool inifile section "learner"
@@ -233,7 +243,9 @@ module Node_cfg = struct
_master = fm;
quorum_function = quorum_function;
_lease_period = lease_period;
- cluster_id = cluster_id}
+ cluster_id = cluster_id;
+ plugins = plugin_names;
+ }
in
cluster_cfg
View
1 src/node/node_main.ml
@@ -258,6 +258,7 @@ let _main_2 make_store make_tlog_coll make_config ~name
(fun i -> Lwt.ignore_result (_log_rotate me.node_name i make_config ))
in
log_prelude cluster_cfg >>= fun () ->
+ Plugin_loader.load me.home cluster_cfg.plugins >>= fun () ->
let my_name = me.node_name in
let cookie = cluster_id in
let messaging = _config_messaging me cfgs cookie me.is_laggy in
View
15 src/node/store.ml
@@ -60,6 +60,7 @@ class type store = object
method reopen: (unit -> unit Lwt.t) -> unit Lwt.t
method get_filename: unit -> string
+ method user_function : string -> string option -> (string option) Lwt.t
method set_range: Range.t -> unit Lwt.t
method get_routing : unit -> Routing.t Lwt.t
method set_routing : Routing.t -> unit Lwt.t
@@ -112,6 +113,20 @@ let _insert_update (store:store) update =
in
Lwt.return (Update_fail (rc,msg))
)
+ | Update.UserFunction(name,po) ->
+ Lwt.catch
+ (fun () ->
+ store # user_function name po >>= fun ro ->
+ Lwt.return (Ok ro)
+ )
+ (function
+ | Common.XException(rc,msg) -> Lwt.return (Update_fail(rc,msg))
+ | e ->
+ let rc = Arakoon_exc.E_UNKNOWN_FAILURE
+ and msg = Printexc.to_string e
+ in
+ Lwt.return (Update_fail(rc,msg))
+ )
| Update.Sequence updates ->
Lwt.catch
(fun () ->
View
12 src/node/sync_backend.ml
@@ -205,6 +205,18 @@ object(self: #backend)
let update = Update.SetRange range in
self # _update_rendezvous update (fun () -> ())
+ method user_function name po =
+ log_o self "user_function %s" name >>= fun () ->
+ let update = Update.UserFunction(name,po) in
+ let p_value = Update.make_update_value update in
+ let sleep, awake = Lwt.wait() in
+ let went_well = make_went_well (fun () -> ()) awake sleep in
+ push_update (Some p_value, went_well) >>= fun () ->
+ sleep >>= function
+ | Store.Stop -> Lwt.fail Forced_stop
+ | Store.Update_fail(rc,str) -> Lwt.fail(Common.XException (rc,str))
+ | Store.Ok x -> Lwt.return x
+
method test_and_set key expected (wanted:string option) =
log_o self "test_and_set %s %s %s" key
(string_option_to_string expected)
View
2 src/node/test_backend.ml
@@ -93,6 +93,8 @@ class test_backend my_name = object(self:#backend)
method test_and_set (key:string) (expected: string option) (wanted:string option) =
Lwt.return wanted
+ method user_function name po = Lwt.return None
+
method multi_get ~allow_dirty (keys: string list) =
let values = List.fold_left
(fun acc k ->
View
15 src/plugins/plugin_loader.ml
@@ -0,0 +1,15 @@
+open Lwt
+
+let load home pnames =
+ let rec _inner = function
+ | [] -> Lwt.return ()
+ | p :: ps ->
+ Lwt_log.info_f "loading plugin %s" p >>= fun () ->
+ let pwe = p ^ ".cmo" in
+ let full = Filename.concat home pwe in
+ let qual = Dynlink.adapt_filename full in
+ Lwt_log.info_f "qualified as: %s" qual >>= fun () ->
+ Dynlink.loadfile_private qual;
+ _inner ps
+ in
+ _inner pnames
View
90 src/plugins/plugin_terremark.ml
@@ -0,0 +1,90 @@
+open Otc
+
+(* Fairly literal translation of the Haskell implementation *)
+let rec range (i:int) (j:int) = if i > j then [] else i :: (range (i + 1) j)
+let charFromTo (f:char) (l:char) = List.map Char.chr
+ (range (Char.code f) (Char.code l))
+
+let find_idx (e:char) (l:char list) =
+ let rec find_it elt acc = function
+ | hd :: tl when elt = hd -> acc
+ | hd :: tl -> find_it elt (acc + 1) tl
+ | _ -> raise Not_found
+ in find_it e 0 l
+
+let c2s = Char.escaped
+
+let (suffixes:char list) = charFromTo '0' '9' @ charFromTo 'A' 'Z' @
+ charFromTo 'a' 'z'
+
+let (firstSuffixChar:char) = List.hd suffixes
+let (lastSuffixChar:char) = List.hd (List.rev suffixes)
+
+let tailKeyName (k:string) = k ^ "_tail"
+let keyPrefix (k:string) = k ^ "__"
+let firstKeyName (k:string) = keyPrefix k ^ c2s firstSuffixChar
+
+let rightNeighbour (c:char) = List.nth suffixes (find_idx c suffixes + 1)
+
+let incrementKey (k:string) =
+ let init (s:string) = String.sub s 0 (String.length s - 1) in
+ let last (s:string) = String.get s (String.length s - 1) in
+ let incr (c:char) = if c = lastSuffixChar
+ then c2s c ^ c2s firstSuffixChar
+ else Char.escaped (rightNeighbour c) in
+ init k ^ incr (last k)
+
+
+(* Database interface *)
+let get db k =
+ try let s = Bdb.get db (Local_store._p k) in Some s
+ with Not_found -> None
+let set = Local_store._set
+let testAndSet = Local_store._test_and_set
+let delete = Local_store._delete
+let rangeEntries db f fi l li n =
+ let ka = Bdb.range db (Local_store._f f) fi (Local_store._l l) li n in
+ let kl = Array.to_list ka in
+ List.fold_left
+ (fun ret_list k ->
+ let l = String.length k in
+ ((String.sub k 1 (l - 1)), Bdb.get db k) :: ret_list) [] kl
+
+let pop db q = match q with
+ None -> raise (Failure "Invalid argument None")
+ | Some q' ->
+ let kp = Some (keyPrefix q') in
+ let entries = rangeEntries db kp true None true 1 in
+ match entries with
+ [] -> None
+ | [(key, value)] ->
+ let () = delete db key in
+ let tailKey = get db (tailKeyName q') in
+ let () = match tailKey with
+ None -> ()
+ | Some _ -> set db (tailKeyName q') (firstKeyName q')
+ in Some value
+ | _ -> raise (Failure "Invalid number of values returned")
+
+let push' db q v =
+ let oldTail = get db (tailKeyName q) in
+ let newTail = match oldTail with
+ None -> firstKeyName q
+ | Some oldTail' -> incrementKey oldTail'
+ in
+ let r = testAndSet db newTail None (Some v) in
+ let () = match r with
+ None -> set db (tailKeyName q) newTail
+ | Some _ -> raise (Failure ("Key " ^ newTail ^ " already exists"))
+ in None
+
+let push db a = match a with
+ None -> raise (Failure "Invalid argument None")
+ | Some s ->
+ let q, pos = Llio.string_from s 0 in
+ let v, _ = Llio.string_from s pos in
+ push' db q v
+
+open Registry
+let () = Registry.register "terremark.queue.pop_1" pop
+let () = Registry.register "terremark.queue.push_1" push
View
27 src/plugins/plugin_update_max.ml
@@ -0,0 +1,27 @@
+open Lwt
+open Otc
+
+let s2i = int_of_string
+let i2s = string_of_int
+
+let update_max bdb po =
+ let _k = "max" in
+ let v =
+ try let s = Bdb.get bdb _k in s2i s
+ with Not_found -> 0
+ in
+ let v' = match po with
+ | None -> 0
+ | Some s ->
+ try s2i s
+ with _ -> raise (Common.XException(Arakoon_exc.E_UNKNOWN_FAILURE, "invalid arg"))
+ in
+ let m = max v v' in
+ let ms = i2s m in
+ Bdb.put bdb _k ms;
+ Some (i2s m)
+
+
+open Registry
+
+let () = Registry.register "update_max" update_max
View
18 src/plugins/registry.ml
@@ -0,0 +1,18 @@
+open Otc
+
+
+module type Dict = sig
+ val set : key:string -> value:string -> unit
+ val get : key:string -> string
+end
+
+module Registry : sig
+ type f = Bdb.bdb -> string option -> string option
+ val register : string -> f -> unit
+ val lookup : string -> f
+end = struct
+ type f = Bdb.bdb -> string option -> string option
+ let _r = Hashtbl.create 42
+ let register name (f:f) = Hashtbl.replace _r name f
+ let lookup name = Hashtbl.find _r name
+end
View
4 src/system/startup.ml
@@ -97,6 +97,7 @@ let post_failure () =
quorum_function = Quorum.quorum_function;
_lease_period = 2;
cluster_id = "ricky";
+ plugins = [];
}
in
let get_cfgs () = cluster_cfg in
@@ -146,10 +147,11 @@ let restart_slaves () =
quorum_function = Quorum.quorum_function;
_lease_period = 2;
cluster_id = "ricky";
+ plugins = [];
}
in
let get_cfgs () = cluster_cfg in
- let u0 = Update.MasterSet(node0,0L) in
+ let u0 = Update.MasterSet(node0, 0L) in
let u1 = Update.Set("xxx","xxx") in
let tlcs = Hashtbl.create 5 in
let stores = Hashtbl.create 5 in
View
27 src/tlog/update.ml
@@ -34,22 +34,23 @@ module Update = struct
| SetRange of Range.t
| SetRouting of Routing.t
| Nop
+ | UserFunction of string * string option
let make_master_set me maybe_lease =
match maybe_lease with
| None -> MasterSet (me,0L)
| Some lease -> MasterSet (me,lease)
+ let _size_of = function
+ | None -> 0
+ | Some w -> String.length w
+
let rec string_of = function
| Set (k,v) -> Printf.sprintf "Set ;%S;%i" k (String.length v)
| Delete k -> Printf.sprintf "Delete ;%S" k
| MasterSet (m,i) -> Printf.sprintf "MasterSet ;%S;%Ld" m i
| TestAndSet (k, _, wo) ->
- let ws =
- match wo with
- | None -> 0
- | Some w -> String.length w
- in
+ let ws = _size_of wo in
Printf.sprintf "TestAndSet;%S;%i" k ws
| Sequence updates ->
let buf = Buffer.create (64 * List.length updates) in
@@ -67,6 +68,9 @@ module Update = struct
| SetRange range -> Printf.sprintf "SetRange ;%s" (Range.to_string range)
| SetRouting routing -> Printf.sprintf "SetRouting;%s" (Routing.to_s routing)
| Nop -> "NOP"
+ | UserFunction (name,param) ->
+ let ps = _size_of param in
+ Printf.sprintf "UserFunction;%s;%i" name ps
let rec to_buffer b t =
match t with
@@ -93,8 +97,12 @@ module Update = struct
List.iter f us
| Nop ->
Llio.int_to b 6
- | SetRange range ->
+ | UserFunction (name, param) ->
Llio.int_to b 7;
+ Llio.string_to b name;
+ Llio.string_option_to b param
+ | SetRange range ->
+ Llio.int_to b 9;
Range.range_to b range
@@ -129,7 +137,12 @@ module Update = struct
loop (i+1) (u::acc) next_pos in
loop 0 [] pos2
| 6 -> Nop, pos1
- | 7 ->
+ | 7 ->
+ let n, pos2 = Llio.string_from b pos1 in
+ let po, pos3 = Llio.string_option_from b pos2 in
+ let r = UserFunction(n,po) in
+ r, pos3
+ | 9 ->
let range,pos2 = Range.range_from b pos1 in
SetRange range, pos2
| _ -> failwith (Printf.sprintf "%i:not an update" kind)

0 comments on commit 8bfc782

Please sign in to comment.
Something went wrong with that request. Please try again.