diff --git a/pylabs/test/server/system_tests_common.py b/pylabs/test/server/system_tests_common.py
index ba34075e..93227451 100644
--- a/pylabs/test/server/system_tests_common.py
+++ b/pylabs/test/server/system_tests_common.py
@@ -21,7 +21,7 @@
"""
import sys
-
+import logging
from nose.tools import *
@@ -623,7 +623,7 @@ def basic_teardown( removeDirs ):
X.logging.info( "Teardown complete" )
def nursery_teardown( removeDirs ):
- common_teardown(removeDirs, nursery_cluster_ids)
+ common_teardown(removeDirs, CONFIG.nursery_cluster_ids)
def get_client ( c_id = None):
if c_id is None:
@@ -634,11 +634,13 @@ def get_client ( c_id = None):
return c
def get_nursery_client():
- client = ArakoonClient.getClient(nursery_keeper_id)
+
+ client = get_client (CONFIG.nursery_keeper_id)
return client
def get_nursery():
- return q.manage.nursery.getNursery(nursery_keeper_id)
+ ext = NurseryManagement.NurseryManagement()
+ return ext.getNursery(CONFIG.nursery_keeper_id)
def iterate_n_times (n, f, startSuffix = 0, failure_max=0, valid_exceptions=None ):
client = get_client ()
diff --git a/src/client/common.ml b/src/client/common.ml
index 401ef6cd..efb83054 100644
--- a/src/client/common.ml
+++ b/src/client/common.ml
@@ -391,14 +391,8 @@ let synced_sequence conn changes = _sequence conn changes SYNCED_SEQUENCE
let get_nursery_cfg (ic,oc) =
- let decode ic =
- Llio.input_string ic >>= fun s ->
- let input = Pack.make_input s 0 in
- let cfg = NCFG.ncfg_from input in
- Lwt.return cfg
- in
request oc get_nursery_cfg_to >>= fun () ->
- response_old ic decode
+ response_limited ic NCFG.ncfg_from
let set_nursery_cfg (ic,oc) clusterid cfg =
let outgoing buf =
diff --git a/src/client/python/ArakoonProtocol.py b/src/client/python/ArakoonProtocol.py
index ee7de358..1b0147dc 100644
--- a/src/client/python/ArakoonProtocol.py
+++ b/src/client/python/ArakoonProtocol.py
@@ -774,22 +774,22 @@ def decodeBoolResult( con ):
@staticmethod
def decodeNurseryCfgResult( con ):
- ArakoonProtocol._evaluateErrorCode(con)
+ input = ArakoonProtocol.readAnswer(con)
+
- offset = 0
- encoded = _recvString( con )
- routing, offset = RoutingInfo.unpack(encoded, offset, _unpackBool, _unpackString)
+ encoded = input.input_string()
+ routing, offset = RoutingInfo.unpack(encoded, 0, _unpackBool, _unpackString)
- cfgCount, offset = _unpackInt(encoded, offset)
+ cfgCount = input.input_vint()
resultCfgs = dict()
for i in range(cfgCount) :
- clusterId, offset = _unpackString(encoded, offset)
- clusterSize, offset = _unpackInt(encoded, offset)
+ clusterId = input.input_string()
+ clusterSize = input.input_vint()
cfg = dict()
for j in range(clusterSize):
- nodeId, offset = _unpackString(encoded, offset)
- ip, offset = _unpackString(encoded, offset)
- port, offset = _unpackInt(encoded, offset)
+ nodeId = input.input_string ()
+ ip = input.input_string ()
+ port = input.input_vint()
cfg[nodeId] = (ip,port)
cliCfg = ArakoonClientConfig(clusterId, cfg)
resultCfgs[clusterId] = cliCfg
diff --git a/src/client/remote_nodestream.ml b/src/client/remote_nodestream.ml
index bb6a4f04..9aafff85 100644
--- a/src/client/remote_nodestream.ml
+++ b/src/client/remote_nodestream.ml
@@ -20,7 +20,6 @@ GNU Affero General Public License along with this program (file "COPYING").
If not, see .
*)
-open Update
open Interval
open Routing
open Common
@@ -29,10 +28,6 @@ open Client_cfg
open Ncfg
class type nodestream = object
- method iterate:
- Sn.t -> (Sn.t * Update.t -> unit Lwt.t) ->
- Tlogcollection.tlog_collection ->
- head_saved_cb:(string -> unit Lwt.t) -> unit Lwt.t
method collapse: int -> unit Lwt.t
@@ -52,67 +47,9 @@ class type nodestream = object
end
class remote_nodestream ((ic,oc) as conn) = object(self :# nodestream)
- method iterate (i:Sn.t) (f: Sn.t * Update.t -> unit Lwt.t)
- (tlog_coll: Tlogcollection.tlog_collection)
- ~head_saved_cb
- =
- let outgoing buf =
- command_to buf LAST_ENTRIES;
- Sn.sn_to buf i
- in
- let incoming ic =
- let save_head () = tlog_coll # save_head ic in
- let last_seen = ref None in
- let rec loop_entries () =
- Sn.input_sn ic >>= fun i2 ->
- begin
- if i2 = (-1L)
- then
- begin
- Lwt_log.info_f "remote_nodestream :: last_seen = %s"
- (Log_extra.option_to_string Sn.string_of !last_seen)
- end
- else
- begin
- last_seen := Some i2;
- Llio.input_int32 ic >>= fun chksum ->
- Llio.input_string ic >>= fun entry ->
- let update,_ = Update.from_buffer entry 0 in
- f (i2, update) >>= fun () ->
- loop_entries ()
- end
- end
- in
- let rec loop_parts () =
- Llio.input_int ic >>= function
- | 1 -> Lwt_log.debug "loop_entries" >>= fun () -> loop_entries ()
- | 2 ->
- begin
- Lwt_log.debug "save_head" >>= fun ()->
- save_head () >>= fun () ->
- let hf_name = tlog_coll # get_head_name () in
- head_saved_cb hf_name >>= fun () ->
- loop_parts ()
- end
- | 3 ->
- begin
- Lwt_log.debug "save_file" >>= fun () ->
- Llio.input_string ic >>= fun name ->
- Llio.input_int64 ic >>= fun length ->
- Lwt_log.debug_f "got %s (%Li bytes)" name length >>= fun () ->
- tlog_coll # save_tlog_file name length ic >>= fun () ->
- loop_parts ()
- end
- | x -> Llio.lwt_failfmt "don't know what %i means" x
- in
- loop_parts()
- in
- request oc outgoing >>= fun () ->
- response ic incoming
-
-
- method collapse n =
- let outgoing buf =
+ method collapse n = failwith "todo"
+ (*
+ let outgoing buf =
command_to buf COLLAPSE_TLOGS;
Llio.int_to buf n
in
@@ -137,7 +74,8 @@ class remote_nodestream ((ic,oc) as conn) = object(self :# nodestream)
in
request oc outgoing >>= fun () ->
response ic incoming
-
+ *)
+
method set_interval iv = Common.set_interval conn iv
method get_interval () = Common.get_interval conn
@@ -158,7 +96,7 @@ class remote_nodestream ((ic,oc) as conn) = object(self :# nodestream)
Lwt_io.with_file ~mode:Lwt_io.output db_location (fun oc -> Llio.copy_stream ~length ~ic ~oc)
in
request oc outgoing >>= fun () ->
- response ic incoming
+ response_old ic incoming
method get_fringe (boundary:string option) direction= Common.get_fringe conn boundary direction
diff --git a/src/client/remote_nodestream.mli b/src/client/remote_nodestream.mli
index 4b850d69..3f6be65e 100644
--- a/src/client/remote_nodestream.mli
+++ b/src/client/remote_nodestream.mli
@@ -1,14 +1,10 @@
-open Update
+
open Routing
open Interval
open Client_cfg
open Ncfg
class type nodestream = object
- method iterate:
- Sn.t -> (Sn.t * Update.t -> unit Lwt.t) ->
- Tlogcollection.tlog_collection ->
- head_saved_cb:(string -> unit Lwt.t) -> unit Lwt.t
method collapse: int -> unit Lwt.t
diff --git a/src/hope/barakoon.ml b/src/hope/barakoon.ml
index d6b76b53..74d31c72 100644
--- a/src/hope/barakoon.ml
+++ b/src/hope/barakoon.ml
@@ -223,6 +223,8 @@ type action_type =
| ListTest
| Test
| OnlyTest
+ | InitNursery
+ | MigrateNursery
let split_cfgs cfg myname =
@@ -431,6 +433,9 @@ let main () =
and value_size = ref 1024
and is = ref ""
and ip = ref ""
+ and left = ref ""
+ and sep = ref ""
+ and right = ref ""
and port = ref 0
and cluster_id = ref ""
and test_refs = ref ([]:string list)
@@ -474,7 +479,11 @@ let main () =
("--list-test", set_action ListTest, "lists tests");
("--only-test", Arg.Tuple[set_action OnlyTest; Arg.String (fun str -> test_refs := str :: ! test_refs)],
"runs some tests");
-
+ ("--nursery-init", Arg.Tuple[set_action InitNursery; Arg.Set_string cluster_id],
+ ": Initialize the keeper so the nursery contains the single provided cluster");
+ ("--nursery-migrate",
+ Arg.Tuple[set_action MigrateNursery; Arg.Set_string left; Arg.Set_string sep; Arg.Set_string right],
+ " : Change the nursery cluster distribution.");
] in
Arg.parse actions
@@ -495,8 +504,8 @@ let main () =
| OnlyTest -> only_test !test_refs
| ShowUsage -> Arg.usage actions ""
| ShowVersion -> show_version()
-
-
+ | InitNursery -> Lwt_main.run (Nursery_main.init_nursery !config_file !cluster_id)
+ | MigrateNursery -> Lwt_main.run (Nursery_main.migrate_nursery_range !config_file !left !sep !right)
end
diff --git a/src/hope/bstore.ml b/src/hope/bstore.ml
index b4026175..f888da8f 100644
--- a/src/hope/bstore.ml
+++ b/src/hope/bstore.ml
@@ -173,11 +173,17 @@ module BStore = (struct
Lwt.return (List.map unpref_key ks)
- let prefix_keys t prefix max =
- let prefix' = pref_key prefix in
+ let __prefix_keys t prefix max p =
+ let prefix' = pref_key ~_pf:p prefix in
BS.prefix_keys_latest t.store prefix' max >>= fun keys ->
- Lwt.return (List.map unpref_key keys)
+ Lwt.return (List.map (unpref_key ~_pf:p) keys)
+
+ let prefix_keys t prefix max =
+ __prefix_keys t prefix max __prefix
+ let admin_prefix_keys t prefix =
+ __prefix_keys t prefix None __admin_prefix
+
let last_entries t (t0:Core.tick) (oc:Llio.lwtoc) =
let TICK i0 = t0 in
let f acc i actions =
diff --git a/src/hope/c.ml b/src/hope/c.ml
index 541913ce..7e647b75 100644
--- a/src/hope/c.ml
+++ b/src/hope/c.ml
@@ -3,17 +3,17 @@ open Modules
open Statistics
open Routing
open Interval
+open Client_cfg
let _MAGIC = 0xb1ff0000l
let _MASK = 0x0000ffffl
let _VERSION = 2
-
let __routing_key = "routing"
let __interval_key = "interval"
-
+let __nursery_cluster_prefix = "nursery_cluster."
+
let my_read_command (ic,oc) =
-
let s = 8 in
let h = String.create s in
Lwt_io.read_into_exactly ic h 0 s >>= fun () ->
@@ -479,6 +479,7 @@ module ProtocolHandler (S:Core.STORE) = struct
let o = Pack.make_output 16 in
Routing.routing_to o r;
let v = Pack.close_output o in
+ Lwt_log.debug_f "Setting routing key to %S" v >>= fun () ->
do_admin_set __routing_key v
| Common.SET_INTERVAL ->
let i = Interval.interval_from rest in
@@ -486,8 +487,12 @@ module ProtocolHandler (S:Core.STORE) = struct
Interval.interval_to o i;
let v = Pack.close_output o in
do_admin_set __interval_key v
- | Common.GET_INTERVAL -> do_admin_get __interval_key
- | Common.GET_ROUTING -> do_admin_get __routing_key
+ | Common.GET_INTERVAL ->
+ Lwt_log.debug "GET_INTERVAL" >>= fun () ->
+ do_admin_get __interval_key
+ | Common.GET_ROUTING ->
+ Lwt_log.debug "GET_ROUTING" >>= fun () ->
+ do_admin_get __routing_key
| Common.STATISTICS ->
Lwtc.log "STATISTICS" >>= fun () ->
output_ok_statistics stats
@@ -515,6 +520,60 @@ module ProtocolHandler (S:Core.STORE) = struct
let r = mo <> None in
output_ok_bool r
end
+ | Common.SET_NURSERY_CFG ->
+ begin
+ Lwtc.log "SET_NURSERY_CFG" >>= fun () ->
+ let cluster_id = Pack.input_string rest in
+ let cfg = ClientCfg.cfg_from rest in
+ let key = __nursery_cluster_prefix ^ cluster_id in
+ let out = Pack.make_output 16 in
+ ClientCfg.cfg_to out cfg;
+ let value = Pack.close_output out in
+ do_admin_set key value
+ end
+ | Common.GET_NURSERY_CFG ->
+ begin
+ Lwtc.log "GET_NURSERY_CFG" >>= fun () ->
+ _admin_get store __routing_key >>= fun v ->
+ let input = Pack.make_input v 0 in
+ let rsize = Pack.input_size input in
+ Lwt_log.debug "Decoding routing info" >>= fun () ->
+ let r = Routing.routing_from input in
+ let out = Pack.make_output 32 in
+ Pack.vint_to out 0;
+ Lwt_log.debug "Repacking routing info" >>= fun () ->
+ Routing.routing_to out r;
+ Lwt_log.debug "Fetching nursery clusters" >>= fun () ->
+ S.admin_prefix_keys store __nursery_cluster_prefix >>= fun clu_keys ->
+ let clusters = Hashtbl.create 2 in
+ let key_start = String.length __nursery_cluster_prefix in
+ Lwt_list.iter_s
+ (fun k ->
+ Lwt_log.debug_f "Fetch nursery cluster: %s" k >>= fun () ->
+ S.admin_get store k >>= function
+ | None -> failwith "nursery cluster disappeared??"
+ | Some v ->
+ begin
+ let tail_size = (String.length k) - key_start in
+ Lwt_log.debug_f "Sub %s %d %d" k key_start tail_size >>= fun () ->
+ let clu_id = String.sub k key_start tail_size in
+ let input = Pack.make_input v 0 in
+ let input_size = Pack.input_size input in
+ let cfg = ClientCfg.cfg_from input in
+ Hashtbl.replace clusters clu_id cfg;
+ Lwt.return ()
+ end
+ )
+ clu_keys
+ >>= fun () ->
+ let pack_one out k e =
+ Pack.string_to out k;
+ ClientCfg.cfg_to out e
+ in
+ Pack.hashtbl_to out pack_one clusters;
+ let s = Pack.close_output out in
+ Lwt_io.write oc s >>= fun () -> Lwt.return false
+ end
| Common.USER_FUNCTION ->
begin
let name = Pack.input_string rest in
diff --git a/src/hope/core.ml b/src/hope/core.ml
index 3c817766..cb233bb4 100644
--- a/src/hope/core.ml
+++ b/src/hope/core.ml
@@ -129,6 +129,7 @@ module type STORE = sig
val log : t -> bool -> update -> tx_result Lwt.t
val get : t -> k -> v option Lwt.t
val admin_get: t -> k -> v option Lwt.t
+
val range: t -> string option -> bool -> string option -> bool -> int option
-> string list Lwt.t
val range_entries: t -> string option -> bool -> string option -> bool -> int option
@@ -136,6 +137,7 @@ module type STORE = sig
val rev_range_entries: t -> string option -> bool -> string option -> bool -> int option
-> (string*string) list Lwt.t
+ val admin_prefix_keys: t -> string -> k list Lwt.t
val prefix_keys: t -> string -> int option -> string list Lwt.t
val is_read_only: t -> bool
diff --git a/src/hope/mem_store.ml b/src/hope/mem_store.ml
index bd7ba26c..523852ea 100644
--- a/src/hope/mem_store.ml
+++ b/src/hope/mem_store.ml
@@ -27,7 +27,7 @@ module MemStore = (struct
let get t k = let v = Hashtbl.find t.store k in Lwt.return (Some v)
let admin_get = get
-
+
let set_meta t s =
t.meta <- Some s;
Lwt.return ()
@@ -41,6 +41,7 @@ module MemStore = (struct
let range_entries t first finc last linc max = Lwtc.failfmt "todo"
let rev_range_entries t first finc last linc max = Lwtc.failfmt "todo"
let prefix_keys t prefix max = Lwtc.failfmt "todo"
+ let admin_prefix_keys t prefix = prefix_keys t prefix None
let last_update t = Lwtc.failfmt "todo"
diff --git a/src/nursery/nursery.ml b/src/nursery/nursery.ml
index 501d20d8..9a8f9d20 100644
--- a/src/nursery/nursery.ml
+++ b/src/nursery/nursery.ml
@@ -394,9 +394,9 @@ module NC = struct
end
end
-
+(*
let nursery_test_main () =
- All_test.configure_logging ();
+ (* All_test.configure_logging (); *)
let repr = [("left", "ZZ")], "right" in (* all in left *)
let routing = Routing.build repr in
let left_cfg = ClientCfg.make () in
@@ -440,3 +440,4 @@ let nursery_test_main () =
in
Lwt_main.run (t ())
+*)
\ No newline at end of file
diff --git a/src/nursery/nursery_main.ml b/src/nursery/nursery_main.ml
index 58f919c5..d5621b1e 100644
--- a/src/nursery/nursery_main.ml
+++ b/src/nursery/nursery_main.ml
@@ -20,6 +20,7 @@ GNU Affero General Public License along with this program (file "COPYING").
If not, see .
*)
+
open Lwt
open Node_cfg
open Nursery
@@ -132,18 +133,17 @@ let __delete_from_nursery config cluster_id sep =
NC.delete nc cluster_id m_sep
let __main_run log_file f =
- Lwt_main.run(
- Lwt.catch
- ( fun () ->
- setup_logger log_file >>= fun () ->
- f () >>= fun () ->
- File_system.unlink log_file
- )
- ( fun e ->
- let msg = Printexc.to_string e in
- Lwt_log.fatal msg >>= fun () ->
- Lwt.fail e)
- ) ; 0
+ Lwt.catch
+ ( fun () ->
+ setup_logger log_file >>= fun () ->
+ f () >>= fun () ->
+ File_system.unlink log_file
+ )
+ ( fun e ->
+ let msg = Printexc.to_string e in
+ Lwt_log.fatal msg >>= fun () ->
+ Lwt.fail e)
+
let migrate_nursery_range config left sep right =
__main_run "/tmp/nursery_migrate.log" ( fun() -> __migrate_nursery_range config left sep right )