Skip to content

Commit

Permalink
multiple ip's for nursery clients too
Browse files Browse the repository at this point in the history
  • Loading branch information
toolslive committed Apr 26, 2012
1 parent 20e5aa2 commit 348d536
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 43 deletions.
4 changes: 2 additions & 2 deletions cfg/arakoon.ini
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ cluster_id = ricky

[arakoon_0]
#comma separated list of all ip addresses used to identify that node
ip = 192.168.248.133, 127.0.0.1
ip = 192.168.0.162, 127.0.0.1
client_port = 4000
messaging_port = 4010
home = /tmp/arakoon/arakoon_0
#optional different directory for .tlog, .tlc and friends
#tlog_dir = /tmp/arakoon/arakoon_0
#log_dir = /tmp/arakoon/arakoon_0
# available levels are: debug info notice warning error fatal
log_level = info
log_level = debug
# for debugging
#laggy = true
# report every x seconds (default = 300)
Expand Down
10 changes: 6 additions & 4 deletions src/client/python/ArakoonExceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ def __init__ (self, node):
ArakoonException.__init__( self, self._msg )

class ArakoonNotConnected( ArakoonException ):
_msgF = "No connection available to node at '%s:%s'"
_msgF = "No connection available to node at %s on port %s"

def __init__ (self, location):
self._msg = ArakoonNotConnected._msgF % ( location[0], location[1] )
def __init__ (self, t):
ips = t[0]
port = t[1]
self._msg = ArakoonNotConnected._msgF % ( ips, port )
ArakoonException.__init__( self, self._msg )

class ArakoonNoMaster( ArakoonException ):
Expand Down Expand Up @@ -111,4 +113,4 @@ class NurseryRangeError( NurseryException ):
pass

class NurseryInvalidConfig(NurseryException):
pass
pass
9 changes: 6 additions & 3 deletions src/node/node_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ let _main_2
begin
let ccfg = ClientCfg.make () in
let add_one cfg =
let node_address = List.hd cfg.ips, cfg.client_port in
let node_address = cfg.ips, cfg.client_port in
ClientCfg.add ccfg cfg.node_name node_address
in
List.iter add_one in_cluster_cfgs;
Expand Down Expand Up @@ -348,10 +348,13 @@ let _main_2
| true -> Lwt.return true
| false ->
begin
let (ip,port) = ClientCfg.get cfg node_id in
let (ips, port) = ClientCfg.get cfg node_id in
let ipss = Log_extra.string_of_list (fun s -> s) ips in
Lwt_log.debug_f "upload_cfg_to_keeper (%s,%i)" ipss port >>= fun () ->
Lwt.catch(
fun () ->
let address = Network.make_address ip port in
let ip0 = List.hd ips in
let address = Network.make_address ip0 port in
let upload connection =
Remote_nodestream.make_remote_nodestream n_cluster_id connection >>= fun (client) ->
client # store_cluster_cfg cluster_id my_clicfg
Expand Down
30 changes: 16 additions & 14 deletions src/nursery/client_cfg.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,42 @@ If not, see <http://www.gnu.org/licenses/>.
open Lwt

module ClientCfg = struct
type sa = string * int
type t = (string, sa) Hashtbl.t
type node_address = string list * int
type t = (string, node_address) Hashtbl.t

let cfg_to buf (t:t) =
let entry2 buf k (ip,p) =
let entry2 buf k (ips,p) =
Llio.string_to buf k;
Llio.string_to buf ip;
Llio.string_list_to buf ips;
Llio.int_to buf p
in
Llio.hashtbl_to buf entry2 t

let cfg_from buf pos =
let entry_from buf pos =
let k,p1 = Llio.string_from buf pos in
let ip,p2 = Llio.string_from buf p1 in
let ips,p2 = Llio.string_list_from buf p1 in
let p,p3 = Llio.int_from buf p2 in
let (sa:sa) = ip,p in
(k,sa),p3
let (na:node_address) = ips,p in
(k,na),p3
in
Llio.hashtbl_from buf entry_from pos

let to_string t =
let buffer = Buffer.create 127 in
Hashtbl.iter (fun s (ip,p) ->
Buffer.add_string buffer (Printf.sprintf "(%s,(%s,%i))" s ip p)) t;
Hashtbl.iter (fun s (ips,p) ->
let ipss = Printf.sprintf "[%s]" (String.concat ";" ips) in
Buffer.add_string buffer (Printf.sprintf "(%s,(%s,%i))" s ipss p)) t;
Buffer.contents buffer

let input_cfg ic =
let key_from ic =
Llio.input_string ic
in
let value_from ic =
Llio.input_string ic >>= fun ip ->
Llio.input_string_list ic >>= fun ips ->
Llio.input_int ic >>= fun port ->
Lwt.return (ip,port)
Lwt.return (ips,port)

in
Llio.input_hashtbl key_from value_from ic
Expand All @@ -79,14 +81,14 @@ module ClientCfg = struct
let from_file section fn = (* This is the format as defined in the extension *)
let inifile = new Inifiles.inifile fn in
let cfg = make () in
let _ips node_name = Ini.get inifile node_name "ip" Ini.p_string_list Ini.required in
let _get s n p = Ini.get inifile s n p Ini.required in
let nodes = _get section "cluster" Ini.p_string_list in
let () = List.iter
(fun n ->
let ip = _get n "ip" Ini.p_string in
let ips = _ips n in
let port = _get n "client_port" Ini.p_int in
let sa = ip,port in
add cfg n sa
add cfg n (ips,port)
) nodes
in
cfg
Expand Down
2 changes: 1 addition & 1 deletion src/nursery/client_cfg_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ let test_parsing () =
let () = close_out oc in
let cfg = ClientCfg.from_file "global" fn in
let sa0 = ClientCfg.get cfg "sturdy_0" in
OUnit.assert_equal sa0 ("127.0.0.1",7080)
OUnit.assert_equal sa0 (["127.0.0.1"],7080)

let suite = "client_cfg" >:::[
"parsing" >:: test_parsing;
Expand Down
9 changes: 5 additions & 4 deletions src/nursery/nursery.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ let try_connect (ips, port) =
module NC = struct
type connection = Lwt_io.input_channel * Lwt_io.output_channel
type lc =
| Address of (string * int)
| Address of (string list * int)
| Connection of connection

type nn = string * string (* cluster_name,node_name *)
Expand Down Expand Up @@ -70,8 +70,9 @@ module NC = struct
let _get_connection t nn =
let (cn,node) = nn in
match Hashtbl.find t.connections nn with
| Address (ip,port) ->
| Address (ips,port) ->
begin
let ip = List.hd ips in
try_connect (ip,port) >>= function
| Some conn ->
Common.prologue cn conn >>= fun () ->
Expand Down Expand Up @@ -401,8 +402,8 @@ let nursery_test_main () =
let routing = Routing.build repr in
let left_cfg = ClientCfg.make () in
let right_cfg = ClientCfg.make () in
let () = ClientCfg.add left_cfg "left_0" ("127.0.0.1", 4000) in
let () = ClientCfg.add right_cfg "right_0" ("127.0.0.1", 5000) in
let () = ClientCfg.add left_cfg "left_0" (["127.0.0.1"], 4000) in
let () = ClientCfg.add right_cfg "right_0" (["127.0.0.1"], 5000) in
let nursery_cfg = NCFG.make routing in
let () = NCFG.add_cluster nursery_cfg "left" left_cfg in
let () = NCFG.add_cluster nursery_cfg "right" right_cfg in
Expand Down
23 changes: 14 additions & 9 deletions src/nursery/nursery_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,28 @@ open Lwt
open Node_cfg
open Nursery
open Routing
open Client_cfg

type lookup = (string, ClientCfg.node_address) Hashtbl.t

let with_remote_stream (cluster:string) cfg f =
let host,port = cfg in
let sa = Network.make_address host port in
let ips,port = cfg in
let ip0 = List.hd ips in
let sa = Network.make_address ip0 port in
let do_it connection =
Remote_nodestream.make_remote_nodestream cluster connection
>>= fun (client) ->
f client
in
Lwt_io.with_connection sa do_it

let find_master cluster_id cli_cfg =
let check_node node_name node_cfg acc =
let find_master cluster_id (cli_cfg:lookup) =
let check_node node_name (node_cfg:ClientCfg.node_address) acc =
begin
Lwt_log.info_f "node=%s" node_name >>= fun () ->
let (ip,port) = node_cfg in
let sa = Network.make_address ip port in
let (ips,port) = node_cfg in
let ip0 = List.hd ips in
let sa = Network.make_address ip0 port in
Lwt.catch
(fun () ->
Lwt_io.with_connection sa
Expand All @@ -60,11 +65,11 @@ let find_master cluster_id cli_cfg =
)
end
in
Hashtbl.fold check_node cli_cfg (Lwt.return None) >>= function
Hashtbl.fold check_node (cli_cfg:lookup) (Lwt.return None) >>= function
| None -> failwith "No master found"
| Some m -> Lwt.return m

let with_master_remote_stream cluster_id cfg f =
let with_master_remote_stream cluster_id (cfg:lookup) f =
find_master cluster_id cfg >>= fun master_name ->
let master_cfg = Hashtbl.find cfg master_name in
with_remote_stream cluster_id master_cfg f
Expand Down Expand Up @@ -153,4 +158,4 @@ let init_nursery config cluster_id =

let delete_nursery_cluster config cluster_id sep =
__main_run "/tmp/nursery_delete.log" ( fun () -> __delete_from_nursery config cluster_id sep )


5 changes: 4 additions & 1 deletion src/tools/llio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,17 @@ let list_to buf e_to list =
int_to buf (List.length list);
List.iter (e_to buf) (List.rev list)

let string_list_to buf sl = list_to buf string_to sl

let list_from s e_from pos =
let size,p0 = int_from s pos in
let rec loop acc p = function
| 0 -> acc
| 0 -> acc,p
| i -> let e,p' = e_from s p in
loop (e::acc) p' (i-1)
in loop [] p0 size

let string_list_from s pos = list_from s string_from pos

let output_string_option oc = function
| None -> output_bool oc false
Expand Down
5 changes: 4 additions & 1 deletion src/tools/llio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ val string_to: Buffer.t -> string -> unit
val option_to: (Buffer.t -> 'a -> unit) -> Buffer.t -> 'a option -> unit
val string_option_to: Buffer.t -> string option -> unit
val named_field_to: Buffer.t -> namedValue -> unit

val list_to : Buffer.t -> (Buffer.t -> 'a -> unit) -> 'a list -> unit
val string_list_to : Buffer.t -> string list -> unit
val hashtbl_to: Buffer.t -> (Buffer.t -> 'a -> 'b -> unit) ->
('a, 'b) Hashtbl.t -> unit

Expand All @@ -54,6 +55,8 @@ val float_from: string -> int -> float * int
val string_from: string -> int -> string * int
val option_from: (string -> int -> 'a * int) -> string -> int -> 'a option * int
val string_option_from: string -> int -> string option * int
val list_from: string -> (string -> int -> 'a * int) -> int -> ('a list) * int
val string_list_from: string -> int -> string list * int
val named_field_from: string -> int -> namedValue * int

val hashtbl_from: string -> (string -> int -> ('a * 'b) * int) -> int ->
Expand Down
11 changes: 7 additions & 4 deletions src/tools/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let deny (ic,oc) =
Llio.output_string oc "too many clients"


let session_thread protocol fd =
let session_thread (sid:string) protocol fd =
Lwt.catch
(fun () ->
let ic = Lwt_io.of_fd ~mode:Lwt_io.input fd
Expand All @@ -47,7 +47,7 @@ let session_thread protocol fd =
)
(function
| FOOBAR as foobar-> Lwt.fail foobar
| exn -> info ~exn "exiting session")
| exn -> info_f ~exn "exiting session (%s)" sid)
>>= fun () ->
Lwt.catch
( fun () -> Lwt_unix.close fd )
Expand Down Expand Up @@ -85,12 +85,15 @@ let make_server_thread
Lwt_unix.accept listening_socket >>= fun (fd, _) ->
begin
match maybe_take () with
| None -> Lwt.ignore_result (session_thread deny fd)
| None -> Lwt.ignore_result (session_thread "--" deny fd)
| Some id ->
Lwt.ignore_result
(
Lwt_log.info_f "%s:session (%i)" name id >>= fun () ->
session_thread protocol fd
let sid = string_of_int id in
session_thread sid protocol fd >>= fun () ->
release();
Lwt.return()
)
end;
Lwt.return ()
Expand Down

0 comments on commit 348d536

Please sign in to comment.