Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

nursery: --get-interval and v2 protocol for get_fringe (wip)

  • Loading branch information...
commit 250ca7b4948c45ccf0898bb38d3cd22a67e40533 1 parent dd1ba6a
Romain Slootmaekers authored
11 cfg/keeper.ini
View
@@ -0,0 +1,11 @@
+[global]
+cluster = keeper_0
+cluster_id = keeper
+
+[keeper_0]
+ip = 127.0.0.1
+client_port = 8000
+messaging_port = 8010
+home = /tmp/keeper_0
+log_dir = /tmp/keeper_0
+log_level = debug
10 cfg/left.ini
View
@@ -2,6 +2,7 @@
cluster = left_0
cluster_id = left
+
[left_0]
ip = 127.0.0.1
client_port = 4000
@@ -9,3 +10,12 @@ messaging_port = 4010
home = /tmp/left_0
log_dir = /tmp/left_0
log_level = debug
+
+
+[nursery]
+cluster = keeper_0
+cluster_id = keeper
+
+[keeper_0]
+ip = 127.0.0.1
+client_port = 8000
8 cfg/right.ini
View
@@ -9,3 +9,11 @@ messaging_port = 5010
home = /tmp/right_0
log_dir = /tmp/right_0
log_level = debug
+
+[nursery]
+cluster = keeper_0
+cluster_id = keeper
+
+[keeper_0]
+ip = 127.0.0.1
+client_port = 8000
10 src/hope/barakoon.ml
View
@@ -230,6 +230,7 @@ type action_type =
| MigrateNursery
| WhoMaster
| GetRouting
+ | GetInterval
let split_cfgs cfg myname =
@@ -387,6 +388,11 @@ let get cfg_name k =
Lwt_io.printlf "%S" v
)
+let get_interval cfg_name =
+ Nursery_main.get_interval cfg_name
+
+
+
let who_master cfg_name =
Client_main.who_master cfg_name () >>= fun m ->
Lwt_io.printlf "%s" m
@@ -516,6 +522,9 @@ let main () =
Arg.Tuple[set_action MigrateNursery; Arg.Set_string left; Arg.Set_string sep; Arg.Set_string right],
"<left> <sep> <right>: Change the nursery cluster distribution.");
("--nursery-routing", set_action GetRouting, "returns the routing information of the nursery");
+ ("--get-interval",
+ set_action GetInterval,
+ " : returns the interval this cluster is responsible for (Nursery context)")
] in
Arg.parse actions
@@ -543,6 +552,7 @@ let main () =
| MigrateNursery -> Lwt_main.run (Nursery_main.migrate_nursery_range !config_file !left !sep !right)
| WhoMaster -> Lwt_main.run (who_master !config_file)
| GetRouting -> Lwt_main.run (Nursery_main.get_routing !config_file)
+ | GetInterval -> Lwt_main.run (get_interval !config_file)
end
7 src/hope/bstore.ml
View
@@ -3,7 +3,7 @@ open Lwt
open Baardskeerder
open Unix
open Interval
-
+open Routing
let action2update = function
| Set (k,v) -> Core.SET(unpref_key k,v)
@@ -321,6 +321,11 @@ module BStore = (struct
_do_range_entries BS.rev_range_entries_latest t first finc last linc max
+ let get_fringe t boundary direction =
+ let (r: (string * string) list) = [] in
+ match direction with
+ | Routing.LOWER_BOUND -> Lwt.return r
+ | Routing.UPPER_BOUND -> Lwt.return r
let raw_dump t (oc:Lwtc.oc) =
3  src/hope/core.ml
View
@@ -176,6 +176,8 @@ module ITickUtils = TickUtils(ITick)
type tx_result = (v option, Arakoon_exc.rc * k) Baardskeerder.result
+open Routing (* for range_direction TODO: maybe promote? *)
+
module type STORE = sig
type t
@@ -206,6 +208,7 @@ module type STORE = sig
val dump : t -> unit Lwt.t
val raw_dump : t -> Lwtc.oc -> unit Lwt.t
+ val get_fringe : t -> string -> Routing.range_direction -> ((string * string) list) Lwt.t
end
(* output_action & input action are only in last_entries *)
1  src/hope/mem_store.ml
View
@@ -69,5 +69,6 @@ module MemStore = (struct
let raw_dump t oc = Lwtc.failfmt "todo: MemStore.raw_dump"
let get_key_count t = Lwtc.failfmt "todo: MemStore.get_key_count"
+ let get_fringe t boundary dir = Lwtc.failfmt "todo: MemStore.get_fringe"
end: STORE)
18 src/hope/v2.ml
View
@@ -634,7 +634,23 @@ let _set driver k v =
| Common.GET_FRINGE ->
begin
- Lwt.fail (Common.XException(Arakoon_exc.E_UNKNOWN_FAILURE,"not implemented"))
+ let boundary = Pack.input_string rest in
+ let direction_i = Pack.input_vint rest in
+ let direction = match direction_i with
+ | 0 -> Routing.UPPER_BOUND
+ | 1 -> Routing.LOWER_BOUND
+
+ in
+ Lwtc.log "GET_FRINGE: %s %i" boundary direction_i >>= fun () ->
+ S.get_fringe store boundary direction >>= fun kvs ->
+ let out = Pack.make_output 4096 in
+ Pack.vint_to out 0;
+ Pack.list_to out
+ (fun out ((k:string),(v:string)) ->
+ Pack.string_to out k;
+ Pack.string_to out v;
+ ) kvs;
+ _close_write oc out
end
(*| _ -> Client_protocol.handle_exception oc (Failure "Command not implemented (yet)") *)
end
2  src/node/node_cfg.ml
View
@@ -133,6 +133,8 @@ module Node_cfg = struct
let log_dir t = t.log_dir
let log_level t = t.log_level
+ let cluster_id t = t.cluster_id
+
let tlog_file_name t =
t.home ^ "/" ^ t.node_name ^ ".tlog"
8 src/nursery/client_cfg.ml
View
@@ -85,10 +85,10 @@ module ClientCfg = struct
let nodes = _get section "cluster" Ini.p_string_list in
let () = List.iter
(fun n ->
- let ip = _get n "ip" Ini.p_string in
- let port = _get n "client_port" Ini.p_int in
- let sa = ip,port in
- add cfg n sa
+ let ip = _get n "ip" Ini.p_string in
+ let port = _get n "client_port" Ini.p_int in
+ let sa = ip,port in
+ add cfg n sa
) nodes
in
cfg
5 src/nursery/nursery.ml
View
@@ -173,8 +173,10 @@ module NC = struct
let i2s i = Interval.to_string i in
Lwt_log.debug_f "Getting initial interval from %s" from_cn >>= fun () ->
get_interval from_cn >>= fun from_i ->
+ Lwt_log.debug_f "from_i: %s" (Interval.to_string from_i) >>= fun () ->
Lwt_log.debug_f "Getting initial interval from %s" to_cn >>= fun () ->
get_interval to_cn >>= fun to_i ->
+ Lwt_log.debug_f "to_i: %s" (Interval.to_string to_i) >>= fun () ->
let rec loop from_i to_i =
pull () >>= fun fringe ->
match fringe with
@@ -240,6 +242,7 @@ module NC = struct
let migrate t clu_left (sep: string) clu_right =
+ Lwt_log.debug_f "migrate: %s %S %s" clu_left sep clu_right >>= fun () ->
let r = NCFG.get_routing t.rc in
let from_cn, to_cn, direction = Routing.get_diff r clu_left sep clu_right in
let publish sep left right =
@@ -440,4 +443,4 @@ let nursery_test_main () =
in
Lwt_main.run (t ())
-*)
+*)
20 src/nursery/nursery_main.ml
View
@@ -25,6 +25,7 @@ open Lwt
open Node_cfg
open Nursery
open Routing
+open Interval
let with_admin (cluster:string) cfg f =
let host,port = cfg in
@@ -153,7 +154,24 @@ let __main_run log_file f =
let msg = Printexc.to_string e in
Lwt_log.fatal msg >>= fun () ->
Lwt.fail e)
-
+
+let get_interval cfg_name =
+ let client_cfg = Client_cfg.ClientCfg.from_file "global" cfg_name in
+ with_master_admin "xxx" client_cfg
+ (fun admin ->
+ admin # get_interval () >>= fun interval ->
+ Lwt_io.printl (Interval.to_string interval) >>= fun () ->
+ Lwt.return ())
+ >>= fun () ->
+ (*
+ let cluster_cfg = Node_cfg.read_config cfg_name in
+ let cluster_id = Node_cfg.cluster_id cluster_cfg in
+
+ *)
+ Lwt.return ()
+ (*
+
+ *)
let migrate_nursery_range config left sep right =
__main_run "/tmp/nursery_migrate.log" ( fun() -> __migrate_nursery_range config left sep right )
Please sign in to comment.
Something went wrong with that request. Please try again.