Browse files

admin sequence & reinstatement of interval checks

  • Loading branch information...
1 parent 63ac0ac commit 7b36940b0e81a2b12cded1b6033a3d3aa0bc4fa2 Romain Slootmaekers committed Mar 4, 2013
Showing with 108 additions and 67 deletions.
  1. +1 −1 src/client/arakoon_remote_client.ml
  2. +19 −7 src/client/common.ml
  3. +27 −25 src/hope/bstore.ml
  4. +39 −28 src/hope/core.ml
  5. +18 −2 src/hope/v2.ml
  6. +4 −4 src/nursery/nursery.ml
View
2 src/client/arakoon_remote_client.ml
@@ -86,7 +86,7 @@ object(self: #Arakoon_client.client)
method sequence changes = Common.sequence conn changes
- method synced_sequence changes = Common.synced_sequence conn changes
+ method synced_sequence changes = Common.synced_sequence conn changes
method who_master () = Common.who_master conn
View
26 src/client/common.ml
@@ -73,6 +73,7 @@ type client_command =
| COMPACT
| DELETE_PREFIX
| VERSION
+ | ADMIN_SEQUENCE
let code2int = [
@@ -111,6 +112,7 @@ let code2int = [
DELETE_PREFIX , 0x27l;
VERSION , 0x28l;
ASSERT_EXISTS , 0x29l;
+ ADMIN_SEQUENCE , 0x2al;
]
let int2code =
@@ -376,7 +378,7 @@ let set_routing_delta (ic,oc) left sep right =
response_limited ic input_nothing
-let _build_sequence_request output changes =
+let _build_sequence_request output changes is_admin =
let update_buf = Buffer.create (32 * List.length changes) in
let rec c2u = function
| Arakoon_client.Set (k,v) -> Core.SET(k,v)
@@ -386,7 +388,11 @@ let _build_sequence_request output changes =
| Arakoon_client.Assert_exists(k) -> Core.ASSERT_EXISTS(k)
in
let updates = List.map c2u changes in
- let seq = Core.SEQUENCE updates in
+ let seq =
+ if is_admin
+ then Core.ADMIN_SEQUENCE updates
+ else Core.SEQUENCE updates
+ in
let () = Core.update_to update_buf seq in
let () = Pack.string_to output (Buffer.contents update_buf)
in ()
@@ -396,7 +402,7 @@ let migrate_range (ic,oc) interval changes =
let outgoing out =
command_to out MIGRATE_RANGE;
Interval.interval_to out interval;
- _build_sequence_request out changes
+ _build_sequence_request out changes true
in
request oc (fun buf -> outgoing buf)
>>= fun () ->
@@ -405,17 +411,23 @@ let migrate_range (ic,oc) interval changes =
Lwtc.log "end of migrate_range request"
-let _sequence (ic,oc) changes cmd =
+
+let _sequence (ic,oc) changes cmd is_admin =
let outgoing buf =
command_to buf cmd;
- _build_sequence_request buf changes
+ _build_sequence_request buf changes is_admin
in
request oc (fun buf -> outgoing buf) >>= fun () ->
response_limited ic input_nothing
-let sequence conn changes = _sequence conn changes SEQUENCE
+let sequence conn changes=
+ let cmd = SEQUENCE in
+ _sequence conn changes cmd false
+
+let admin_delete conn changes =
+ _sequence conn changes ADMIN_SEQUENCE true
-let synced_sequence conn changes = _sequence conn changes SYNCED_SEQUENCE
+let synced_sequence conn changes = _sequence conn changes SYNCED_SEQUENCE false
let get_nursery_cfg (ic,oc) =
View
52 src/hope/bstore.ml
@@ -64,11 +64,11 @@ module BStore = (struct
let _outside k = Lwt.return (NOK (Arakoon_exc.E_OUTSIDE_INTERVAL, k))
let _not_found k = Lwt.return (NOK (Arakoon_exc.E_NOT_FOUND , k))
- let _check_interval ?(nok=_outside)
+ let _check_interval is_admin ?(nok=_outside)
t k (ok: k -> 'a) =
let iv,_ = t.interval in
let f =
- if Interval.is_ok iv k
+ if is_admin || Interval.is_ok iv k
then ok
else nok
in
@@ -82,29 +82,28 @@ module BStore = (struct
let log t d u =
let _exec tx :tx_result Lwt.t=
begin
- let rec _inner (tx: BS.tx) = function
+ let rec _inner (is_admin:bool) (tx: BS.tx) = function
| Core.SET (k,v) ->
begin
- (* Lwtc.log "set: %s" k >>= fun () ->
- _check_interval t k
- (fun k ->
- *)
- BS.set tx (pref_key k) v >>= fun () -> Lwt.return (OK None)
- (* ) *)
+ Lwtc.log "set: %s" k >>= fun () ->
+ _check_interval is_admin t k
+ (fun k ->
+ BS.set tx (pref_key k) v >>= fun () -> Lwt.return (OK None)
+ )
end
| Core.DELETE k ->
begin
Lwtc.log "del: %s" k >>= fun () ->
- (*_check_interval t k (* TODO: re-enable this check, but do admin_sequence *)
- (fun k ->*)
+ _check_interval is_admin t k
+ (fun k ->
BS.delete tx (pref_key k) >>= function
| NOK _ -> _not_found k
| a -> Lwt.return (OK None)
- (* ) *)
+ )
end
| Core.TEST_AND_SET (k,eo,wo) ->
begin
- _check_interval t k
+ _check_interval is_admin t k
(fun k ->
let pk = pref_key k in
let delete () =
@@ -220,17 +219,10 @@ module BStore = (struct
in
Lwt.return r'
end
- | Core.SEQUENCE s ->
- begin
- Lwt_list.fold_left_s
- (fun a u ->
- match a with
- | OK _ -> _inner tx u (* throws away the intermediate result *)
- | a -> Lwt.return a
- )
- (OK None)
- s
- end
+ | Core.SEQUENCE s -> _sequence is_admin tx s
+ | Core.ADMIN_SEQUENCE s ->
+ Lwtc.log "ADMIN_SEQUENCE" >>= fun () ->
+ _sequence true tx s
| Core.USER_FUNCTION (name, po) ->
begin
let f = Userdb.Registry.lookup name in
@@ -242,7 +234,17 @@ module BStore = (struct
in
Lwt.return a
end
- in _inner tx u
+ and _sequence is_admin tx s =
+ Lwt_list.fold_left_s
+ (fun a u ->
+ match a with
+ | OK _ -> _inner is_admin tx u (* throws away the intermediate result *)
+ | a -> Lwt.return a
+ )
+ (OK None)
+ s
+ in _inner false tx u
+
end
in
Lwt_mutex.with_lock t.m (fun () -> BS.log_update t.store ~diff:d _exec)
View
67 src/hope/core.ml
@@ -25,6 +25,7 @@ type update =
| SEQUENCE of update list
| DELETE_PREFIX of k
| SET_ROUTING_DELTA of (string * string * string)
+ | ADMIN_SEQUENCE of update list
let update2s = function
@@ -38,7 +39,7 @@ let update2s = function
| SEQUENCE s -> Printf.sprintf "U_SEQ (...)"
| DELETE_PREFIX k -> Printf.sprintf "U_DELETE_PREFIX %S" k
| SET_ROUTING_DELTA (l,s,r) -> Printf.sprintf "U_SET_ROUTING_DELTA (%S,%S,%S)" l s r
-
+ | ADMIN_SEQUENCE s -> Printf.sprintf "U_ADMIN_SEQUENCE (...)"
let rec update_to buf = function
| SET (k,v) ->
@@ -73,20 +74,36 @@ let rec update_to buf = function
| DELETE_PREFIX k ->
Llio.int_to buf 14;
Llio.string_to buf k
- | ASSERT_EXISTS (k) ->
+ | ASSERT_EXISTS k ->
Llio.int_to buf 15;
Llio.string_to buf k
+ | ADMIN_SEQUENCE s ->
+ Llio.int_to buf 16;
+ Llio.int_to buf (List.length s);
+ List.iter (fun u -> update_to buf u) s
+
let rec update_from buf off =
let kind, off = Llio.int_from buf off in
+ let _sequence buf off =
+ let slen, off = Llio.int_from buf off in
+ let rec loop acc off = function
+ | 0 -> (List.rev acc), off
+ | i ->
+ let u, off = update_from buf off in
+ loop (u :: acc) off (i-1)
+ in
+ loop [] off slen
+ in
+
match kind with
| 1 ->
- let k, off = Llio.string_from buf off in
- let v, off = Llio.string_from buf off in
- SET(k,v), off
+ let k, off = Llio.string_from buf off in
+ let v, off = Llio.string_from buf off in
+ SET(k,v), off
| 2 ->
- let k, off = Llio.string_from buf off in
- DELETE k, off
+ let k, off = Llio.string_from buf off in
+ DELETE k, off
| 3 ->
begin
let k,o2 = Llio.string_from buf off in
@@ -96,36 +113,30 @@ let rec update_from buf off =
u, o4
end
| 5 ->
- let slen, off = Llio.int_from buf off in
- begin
- let rec loop acc off = function
- | 0 -> acc, off
- | i ->
- let u, off = update_from buf off in
- loop (u :: acc) off (i-1)
- in
- let ups, off = loop [] off slen in
- SEQUENCE (List.rev ups), off
- end
+ let ups, off = _sequence buf off in
+ SEQUENCE ups, off
| 8 ->
- let k, off = Llio.string_from buf off in
- let m_v, off = Llio.string_option_from buf off in
- ASSERT (k, m_v), off
+ let k, off = Llio.string_from buf off in
+ let m_v, off = Llio.string_option_from buf off in
+ ASSERT (k, m_v), off
| 9 ->
- let k, off = Llio.string_from buf off in
- let m_v, off = Llio.string_option_from buf off in
- ADMIN_SET (k, m_v), off
+ let k, off = Llio.string_from buf off in
+ let m_v, off = Llio.string_option_from buf off in
+ ADMIN_SET (k, m_v), off
| 12 ->
let l, off = Llio.string_from buf off in
let s, off = Llio.string_from buf off in
let r, off = Llio.string_from buf off in
SET_ROUTING_DELTA(l,s,r), off
| 14 ->
- let k, off = Llio.string_from buf off in
- DELETE_PREFIX k, off
+ let k, off = Llio.string_from buf off in
+ DELETE_PREFIX k, off
| 15 ->
- let k, off = Llio.string_from buf off in
- ASSERT_EXISTS (k), off
+ let k, off = Llio.string_from buf off in
+ ASSERT_EXISTS k, off
+ | 16 ->
+ let ups, off = _sequence buf off in
+ ADMIN_SEQUENCE ups, off
| i -> let msg = Printf.sprintf "Unknown update type %d" i in failwith msg
View
20 src/hope/v2.ml
@@ -437,6 +437,22 @@ module V2(S:Core.STORE) (A:MP_ACTION_DISPATCHER) = struct
_unit_or_f oc a
in _do_write_op rest oc me store inner
end
+ | Common.ADMIN_SEQUENCE ->
+ begin
+ Lwtc.log "ADMIN_SEQUENCE" >>= fun () ->
+ let inner () =
+ let t0 = Unix.gettimeofday() in
+ let data = Pack.input_string rest in
+ let probably_sequence,_ = Core.update_from data 0 in
+ let sequence = match probably_sequence with
+ | Core.ADMIN_SEQUENCE _ -> probably_sequence
+ | _ -> raise (Failure "should be admin sequence")
+ in
+ D.push_cli_req driver sequence >>= fun a ->
+ Statistics.new_sequence stats t0;
+ _unit_or_f oc a
+ in _do_write_op rest oc me store inner
+ end
| Common.MULTI_GET ->
begin
let allow_dirty = Pack.input_bool rest in
@@ -680,7 +696,7 @@ module V2(S:Core.STORE) (A:MP_ACTION_DISPATCHER) = struct
let data = Pack.input_string rest in
let probably_sequence,_ = Core.update_from data 0 in
let sequence = match probably_sequence with
- | Core.SEQUENCE us ->
+ | Core.ADMIN_SEQUENCE us ->
let extra =
let v =
let out = Pack.make_output 32 in
@@ -690,7 +706,7 @@ module V2(S:Core.STORE) (A:MP_ACTION_DISPATCHER) = struct
Core.ADMIN_SET (Core.__interval_key, Some v)
in
let xus = extra :: us in
- Core.SEQUENCE xus
+ Core.ADMIN_SEQUENCE xus
| _ -> raise (Failure "should be sequence")
in
let inner () =
View
8 src/nursery/nursery.ml
@@ -160,19 +160,19 @@ module NC = struct
(fun conn -> Common.get_fringe conn sep direction )
in
let push fringe i =
- let seq = List.map (fun (k,v) -> Arakoon_client.Set(k,v)) fringe in
+ let ups = List.map (fun (k,v) -> Arakoon_client.Set(k,v)) fringe in
Lwt_log.debug "push" >>= fun () ->
_with_master_connection t to_cn
- (fun conn -> Common.migrate_range conn i seq)
+ (fun conn -> Common.migrate_range conn i ups)
>>= fun () ->
Lwt_log.debug "done pushing"
in
let delete fringe =
- let seq = List.map (fun (k,_) -> Arakoon_client.Delete k) fringe in
+ let ups = List.map (fun (k,_) -> Arakoon_client.Delete k) fringe in
Lwt_log.debug "delete" >>= fun () ->
_with_master_connection t from_cn
- (fun conn -> Common.sequence conn seq)
+ (fun conn -> Common.admin_delete conn ups)
in
let get_next_key k =
k ^ (String.make 1 (Char.chr 1))

0 comments on commit 7b36940

Please sign in to comment.