Skip to content

Commit

Permalink
Merge branch '2.0' of github.com:Incubaid/arakoon into 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Romain Slootmaekers committed May 30, 2012
2 parents b9b75f7 + 963cbc6 commit 61c93b3
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 120 deletions.
10 changes: 6 additions & 4 deletions pylabs/test/server/system_tests_common.py
Expand Up @@ -21,7 +21,7 @@
"""

import sys

import logging


from nose.tools import *
Expand Down Expand Up @@ -623,7 +623,7 @@ def basic_teardown( removeDirs ):
X.logging.info( "Teardown complete" )

def nursery_teardown( removeDirs ):
common_teardown(removeDirs, nursery_cluster_ids)
common_teardown(removeDirs, CONFIG.nursery_cluster_ids)

def get_client ( c_id = None):
if c_id is None:
Expand All @@ -634,11 +634,13 @@ def get_client ( c_id = None):
return c

def get_nursery_client():
client = ArakoonClient.getClient(nursery_keeper_id)

client = get_client (CONFIG.nursery_keeper_id)
return client

def get_nursery():
return q.manage.nursery.getNursery(nursery_keeper_id)
ext = NurseryManagement.NurseryManagement()
return ext.getNursery(CONFIG.nursery_keeper_id)

def iterate_n_times (n, f, startSuffix = 0, failure_max=0, valid_exceptions=None ):
client = get_client ()
Expand Down
8 changes: 1 addition & 7 deletions src/client/common.ml
Expand Up @@ -391,14 +391,8 @@ let synced_sequence conn changes = _sequence conn changes SYNCED_SEQUENCE


let get_nursery_cfg (ic,oc) =
let decode ic =
Llio.input_string ic >>= fun s ->
let input = Pack.make_input s 0 in
let cfg = NCFG.ncfg_from input in
Lwt.return cfg
in
request oc get_nursery_cfg_to >>= fun () ->
response_old ic decode
response_limited ic NCFG.ncfg_from

let set_nursery_cfg (ic,oc) clusterid cfg =
let outgoing buf =
Expand Down
20 changes: 10 additions & 10 deletions src/client/python/ArakoonProtocol.py
Expand Up @@ -774,22 +774,22 @@ def decodeBoolResult( con ):

@staticmethod
def decodeNurseryCfgResult( con ):
ArakoonProtocol._evaluateErrorCode(con)
input = ArakoonProtocol.readAnswer(con)


offset = 0
encoded = _recvString( con )
routing, offset = RoutingInfo.unpack(encoded, offset, _unpackBool, _unpackString)
encoded = input.input_string()
routing, offset = RoutingInfo.unpack(encoded, 0, _unpackBool, _unpackString)

cfgCount, offset = _unpackInt(encoded, offset)
cfgCount = input.input_vint()
resultCfgs = dict()
for i in range(cfgCount) :
clusterId, offset = _unpackString(encoded, offset)
clusterSize, offset = _unpackInt(encoded, offset)
clusterId = input.input_string()
clusterSize = input.input_vint()
cfg = dict()
for j in range(clusterSize):
nodeId, offset = _unpackString(encoded, offset)
ip, offset = _unpackString(encoded, offset)
port, offset = _unpackInt(encoded, offset)
nodeId = input.input_string ()
ip = input.input_string ()
port = input.input_vint()
cfg[nodeId] = (ip,port)
cliCfg = ArakoonClientConfig(clusterId, cfg)
resultCfgs[clusterId] = cliCfg
Expand Down
74 changes: 6 additions & 68 deletions src/client/remote_nodestream.ml
Expand Up @@ -20,7 +20,6 @@ GNU Affero General Public License along with this program (file "COPYING").
If not, see <http://www.gnu.org/licenses/>.
*)

open Update
open Interval
open Routing
open Common
Expand All @@ -29,10 +28,6 @@ open Client_cfg
open Ncfg

class type nodestream = object
method iterate:
Sn.t -> (Sn.t * Update.t -> unit Lwt.t) ->
Tlogcollection.tlog_collection ->
head_saved_cb:(string -> unit Lwt.t) -> unit Lwt.t

method collapse: int -> unit Lwt.t

Expand All @@ -52,67 +47,9 @@ class type nodestream = object
end

class remote_nodestream ((ic,oc) as conn) = object(self :# nodestream)
method iterate (i:Sn.t) (f: Sn.t * Update.t -> unit Lwt.t)
(tlog_coll: Tlogcollection.tlog_collection)
~head_saved_cb
=
let outgoing buf =
command_to buf LAST_ENTRIES;
Sn.sn_to buf i
in
let incoming ic =
let save_head () = tlog_coll # save_head ic in
let last_seen = ref None in
let rec loop_entries () =
Sn.input_sn ic >>= fun i2 ->
begin
if i2 = (-1L)
then
begin
Lwt_log.info_f "remote_nodestream :: last_seen = %s"
(Log_extra.option_to_string Sn.string_of !last_seen)
end
else
begin
last_seen := Some i2;
Llio.input_int32 ic >>= fun chksum ->
Llio.input_string ic >>= fun entry ->
let update,_ = Update.from_buffer entry 0 in
f (i2, update) >>= fun () ->
loop_entries ()
end
end
in
let rec loop_parts () =
Llio.input_int ic >>= function
| 1 -> Lwt_log.debug "loop_entries" >>= fun () -> loop_entries ()
| 2 ->
begin
Lwt_log.debug "save_head" >>= fun ()->
save_head () >>= fun () ->
let hf_name = tlog_coll # get_head_name () in
head_saved_cb hf_name >>= fun () ->
loop_parts ()
end
| 3 ->
begin
Lwt_log.debug "save_file" >>= fun () ->
Llio.input_string ic >>= fun name ->
Llio.input_int64 ic >>= fun length ->
Lwt_log.debug_f "got %s (%Li bytes)" name length >>= fun () ->
tlog_coll # save_tlog_file name length ic >>= fun () ->
loop_parts ()
end
| x -> Llio.lwt_failfmt "don't know what %i means" x
in
loop_parts()
in
request oc outgoing >>= fun () ->
response ic incoming


method collapse n =
let outgoing buf =
method collapse n = failwith "todo"
(*
let outgoing buf =
command_to buf COLLAPSE_TLOGS;
Llio.int_to buf n
in
Expand All @@ -137,7 +74,8 @@ class remote_nodestream ((ic,oc) as conn) = object(self :# nodestream)
in
request oc outgoing >>= fun () ->
response ic incoming

*)


method set_interval iv = Common.set_interval conn iv
method get_interval () = Common.get_interval conn
Expand All @@ -158,7 +96,7 @@ class remote_nodestream ((ic,oc) as conn) = object(self :# nodestream)
Lwt_io.with_file ~mode:Lwt_io.output db_location (fun oc -> Llio.copy_stream ~length ~ic ~oc)
in
request oc outgoing >>= fun () ->
response ic incoming
response_old ic incoming

method get_fringe (boundary:string option) direction= Common.get_fringe conn boundary direction

Expand Down
6 changes: 1 addition & 5 deletions src/client/remote_nodestream.mli
@@ -1,14 +1,10 @@
open Update

open Routing
open Interval
open Client_cfg
open Ncfg

class type nodestream = object
method iterate:
Sn.t -> (Sn.t * Update.t -> unit Lwt.t) ->
Tlogcollection.tlog_collection ->
head_saved_cb:(string -> unit Lwt.t) -> unit Lwt.t

method collapse: int -> unit Lwt.t

Expand Down
15 changes: 12 additions & 3 deletions src/hope/barakoon.ml
Expand Up @@ -223,6 +223,8 @@ type action_type =
| ListTest
| Test
| OnlyTest
| InitNursery
| MigrateNursery


let split_cfgs cfg myname =
Expand Down Expand Up @@ -431,6 +433,9 @@ let main () =
and value_size = ref 1024
and is = ref ""
and ip = ref ""
and left = ref ""
and sep = ref ""
and right = ref ""
and port = ref 0
and cluster_id = ref ""
and test_refs = ref ([]:string list)
Expand Down Expand Up @@ -474,7 +479,11 @@ let main () =
("--list-test", set_action ListTest, "lists tests");
("--only-test", Arg.Tuple[set_action OnlyTest; Arg.String (fun str -> test_refs := str :: ! test_refs)],
"runs some tests");

("--nursery-init", Arg.Tuple[set_action InitNursery; Arg.Set_string cluster_id],
"<cluster_id>: Initialize the keeper so the nursery contains the single provided cluster");
("--nursery-migrate",
Arg.Tuple[set_action MigrateNursery; Arg.Set_string left; Arg.Set_string sep; Arg.Set_string right],
"<left> <sep> <right>: Change the nursery cluster distribution.");
] in

Arg.parse actions
Expand All @@ -495,8 +504,8 @@ let main () =
| OnlyTest -> only_test !test_refs
| ShowUsage -> Arg.usage actions ""
| ShowVersion -> show_version()


| InitNursery -> Lwt_main.run (Nursery_main.init_nursery !config_file !cluster_id)
| MigrateNursery -> Lwt_main.run (Nursery_main.migrate_nursery_range !config_file !left !sep !right)

end

Expand Down
12 changes: 9 additions & 3 deletions src/hope/bstore.ml
Expand Up @@ -173,11 +173,17 @@ module BStore = (struct
Lwt.return (List.map unpref_key ks)


let prefix_keys t prefix max =
let prefix' = pref_key prefix in
let __prefix_keys t prefix max p =
let prefix' = pref_key ~_pf:p prefix in
BS.prefix_keys_latest t.store prefix' max >>= fun keys ->
Lwt.return (List.map unpref_key keys)
Lwt.return (List.map (unpref_key ~_pf:p) keys)

let prefix_keys t prefix max =
__prefix_keys t prefix max __prefix

let admin_prefix_keys t prefix =
__prefix_keys t prefix None __admin_prefix

let last_entries t (t0:Core.tick) (oc:Llio.lwtoc) =
let TICK i0 = t0 in
let f acc i actions =
Expand Down
69 changes: 64 additions & 5 deletions src/hope/c.ml
Expand Up @@ -3,17 +3,17 @@ open Modules
open Statistics
open Routing
open Interval
open Client_cfg

let _MAGIC = 0xb1ff0000l
let _MASK = 0x0000ffffl
let _VERSION = 2


let __routing_key = "routing"
let __interval_key = "interval"

let __nursery_cluster_prefix = "nursery_cluster."

let my_read_command (ic,oc) =

let s = 8 in
let h = String.create s in
Lwt_io.read_into_exactly ic h 0 s >>= fun () ->
Expand Down Expand Up @@ -479,15 +479,20 @@ module ProtocolHandler (S:Core.STORE) = struct
let o = Pack.make_output 16 in
Routing.routing_to o r;
let v = Pack.close_output o in
Lwt_log.debug_f "Setting routing key to %S" v >>= fun () ->
do_admin_set __routing_key v
| Common.SET_INTERVAL ->
let i = Interval.interval_from rest in
let o = Pack.make_output 16 in
Interval.interval_to o i;
let v = Pack.close_output o in
do_admin_set __interval_key v
| Common.GET_INTERVAL -> do_admin_get __interval_key
| Common.GET_ROUTING -> do_admin_get __routing_key
| Common.GET_INTERVAL ->
Lwt_log.debug "GET_INTERVAL" >>= fun () ->
do_admin_get __interval_key
| Common.GET_ROUTING ->
Lwt_log.debug "GET_ROUTING" >>= fun () ->
do_admin_get __routing_key
| Common.STATISTICS ->
Lwtc.log "STATISTICS" >>= fun () ->
output_ok_statistics stats
Expand Down Expand Up @@ -515,6 +520,60 @@ module ProtocolHandler (S:Core.STORE) = struct
let r = mo <> None in
output_ok_bool r
end
| Common.SET_NURSERY_CFG ->
begin
Lwtc.log "SET_NURSERY_CFG" >>= fun () ->
let cluster_id = Pack.input_string rest in
let cfg = ClientCfg.cfg_from rest in
let key = __nursery_cluster_prefix ^ cluster_id in
let out = Pack.make_output 16 in
ClientCfg.cfg_to out cfg;
let value = Pack.close_output out in
do_admin_set key value
end
| Common.GET_NURSERY_CFG ->
begin
Lwtc.log "GET_NURSERY_CFG" >>= fun () ->
_admin_get store __routing_key >>= fun v ->
let input = Pack.make_input v 0 in
let rsize = Pack.input_size input in
Lwt_log.debug "Decoding routing info" >>= fun () ->
let r = Routing.routing_from input in
let out = Pack.make_output 32 in
Pack.vint_to out 0;
Lwt_log.debug "Repacking routing info" >>= fun () ->
Routing.routing_to out r;
Lwt_log.debug "Fetching nursery clusters" >>= fun () ->
S.admin_prefix_keys store __nursery_cluster_prefix >>= fun clu_keys ->
let clusters = Hashtbl.create 2 in
let key_start = String.length __nursery_cluster_prefix in
Lwt_list.iter_s
(fun k ->
Lwt_log.debug_f "Fetch nursery cluster: %s" k >>= fun () ->
S.admin_get store k >>= function
| None -> failwith "nursery cluster disappeared??"
| Some v ->
begin
let tail_size = (String.length k) - key_start in
Lwt_log.debug_f "Sub %s %d %d" k key_start tail_size >>= fun () ->
let clu_id = String.sub k key_start tail_size in
let input = Pack.make_input v 0 in
let input_size = Pack.input_size input in
let cfg = ClientCfg.cfg_from input in
Hashtbl.replace clusters clu_id cfg;
Lwt.return ()
end
)
clu_keys
>>= fun () ->
let pack_one out k e =
Pack.string_to out k;
ClientCfg.cfg_to out e
in
Pack.hashtbl_to out pack_one clusters;
let s = Pack.close_output out in
Lwt_io.write oc s >>= fun () -> Lwt.return false
end
| Common.USER_FUNCTION ->
begin
let name = Pack.input_string rest in
Expand Down
2 changes: 2 additions & 0 deletions src/hope/core.ml
Expand Up @@ -129,13 +129,15 @@ module type STORE = sig
val log : t -> bool -> update -> tx_result Lwt.t
val get : t -> k -> v option Lwt.t
val admin_get: t -> k -> v option Lwt.t

val range: t -> string option -> bool -> string option -> bool -> int option
-> string list Lwt.t
val range_entries: t -> string option -> bool -> string option -> bool -> int option
-> (string*string) list Lwt.t
val rev_range_entries: t -> string option -> bool -> string option -> bool -> int option
-> (string*string) list Lwt.t

val admin_prefix_keys: t -> string -> k list Lwt.t
val prefix_keys: t -> string -> int option -> string list Lwt.t
val is_read_only: t -> bool

Expand Down

0 comments on commit 61c93b3

Please sign in to comment.