Permalink
Browse files

first pass at update_request and vnode state machine

  • Loading branch information...
1 parent d37d535 commit 47f1047c86dca9b277efd89f25d613f9ffb8d3d0 @argv0 argv0 committed Apr 26, 2011
Showing with 923 additions and 0 deletions.
  1. +483 −0 src/riak_kv_update_fsm.erl
  2. +72 −0 src/riak_kv_update_funs.erl
  3. +184 −0 src/riak_kv_vnode_op.erl
  4. +120 −0 src/riak_kv_vnode_op_ctx.erl
  5. +64 −0 src/riak_kv_vnode_op_update.erl
@@ -0,0 +1,72 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_update_funs
+%%
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Useful built-in update functions.
+-module(riak_kv_update_funs).
+-include_lib("riak_kv_vnode.hrl").
+-export([update_counter/3,
+ update_counter/4,
+ add_measurement/3,
+ add_measurement/4]).
+-export([update_counter_v/4,
+ add_measurement_v/4]).
+
+
+%% client helpers
+update_counter(Client, BKey, IncBy) ->
+ update_counter(Client, BKey, IncBy, [{returnbody, true}]).
+update_counter(Client, {Bucket, Key}, IncBy, Options) ->
+ Client:update({Bucket, Key},{?MODULE,update_counter_v,[IncBy]},Options).
+
+%% vnode funs
+update_counter_v({Bucket, Key}, {error, notfound}, IncBy, _Ctx) ->
+ {ok, riak_object:new(Bucket, Key, IncBy)};
+update_counter_v({_, _}, Obj, IncBy, _Ctx) ->
+ Val = riak_object:get_value(Obj),
+ {ok,riak_object:update_value(Obj, Val+IncBy)}.
+
+%% client helpers
+add_measurement(Client, BKey, {Time, Value}) ->
+ add_measurement(Client, BKey, {Time, Value}, [{returnbody, true}]).
+add_measurement(Client, {Bucket, Key}, {Time, Value}, Options) ->
+ Client:update({Bucket, Key},{?MODULE,add_measurement_v,[{Time,Value}]},
+ Options).
+
+%% vnode funs
+add_measurement_v({Bucket, Key}, {error, notfound}, {Time,Value}, _Ctx) ->
+ Series = [{Time, Value}],
+ Stats = calc_series_stats(Series),
+ {ok, riak_object:new(Bucket, Key, {Series, Stats})};
+add_measurement_v({_, _}, Obj, {Time, Value}, _Ctx) ->
+ {OldSeries, _OldStats} = riak_object:get_value(Obj),
+ NewSeries = [{Time, Value}|OldSeries],
+ NewStats = calc_series_stats(NewSeries),
+ {ok, riak_object:update_value(Obj, {NewSeries, NewStats})}.
+
+calc_series_stats(Series) ->
+ Vals = [V || {_T, V} <- Series],
+ [{min, lists:min(Vals)},
+ {max, lists:max(Vals)},
+ {sum, lists:sum(Vals)},
+ {avg, lists:sum(Vals) / length(Vals)}].
+
+
@@ -0,0 +1,184 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_vnode_op: State machine driver for vnode ops
+%%
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc State machine driver for vnode ops.
+-module(riak_kv_vnode_op).
+-include_lib("riak_kv_vnode.hrl").
+-export([new/7,
+ execute/1]).
+-record(rkv_vop, {mod, modstate, ctx}).
+
+new(Mod, StoreCall, Index, Sender, BKey, Object, Options) ->
+ Ctx = ?CTX:new(Mod, StoreCall, Index, Sender, BKey, Object, Options),
+ {ok, NewCtx, ModState} = Mod:initialize(Ctx),
+ {ok, #rkv_vop{mod=Mod, modstate=ModState, ctx=NewCtx}}.
+
+execute(O) -> decision(validate, O).
+
+decision(validate, O) ->
+ decision_test(op_call(validate, O), true, maybe_fetch_existing, abort);
+decision(maybe_fetch_existing, O=#rkv_vop{ctx=Ctx}) ->
+ ?CTX:reply(Ctx, w),
+ decision_test(op_call(needs_existing_object,O),true,fetch_existing,abort);
+decision(fetch_existing, O) ->
+ decision_test_fn(O, fun fetch_existing/1, propose, abort);
+decision(propose, O) ->
+ decision_test(op_call(propose, O), true, maybe_merge, abort);
+decision(maybe_merge, O) ->
+ decision_test(op_call(needs_syntactic_merge,O),true,prepare_merge,finalize);
+decision(prepare_merge, O) ->
+ case ctx_get(existing_object, O) of
+ undefined ->
+ decision_test_fn(O, fun fetch_existing/1, prepare_merge, abort);
+ {error, notfound} ->
+ decision(finalize, ctx_set(set_merged_object,
+ ctx_get(proposed_object, O), O));
+ ExistObj ->
+ NewO = do_syntactic_merge(ExistObj, ctx_get(proposed_object, O),
+ ctx_get(req_id, O),
+ ctx_get(prunetime, O), O),
+ decision_test({ctx_get(decision, NewO), NewO},
+ commit, do_merge, maybe_abort)
+ end;
+
+decision(maybe_abort, O) ->
+ decision_test({ctx_get(decision, O), O}, ignore, ignore, abort);
+decision(ignore, O) ->
+ decision_test(op_call(ignored, O), true, reply, reply);
+decision(do_merge, O) ->
+ decision_test(op_call(merge, O), true, finalize, abort);
+decision(finalize, O) ->
+ {FinalObj, NewO0} = do_finalize(O),
+ PutArgs = [ctx_get(bkey, O), term_to_binary(FinalObj)],
+ case store_call(put, PutArgs, NewO0) of
+ {ok, NewO1} ->
+ {_, NewO2} = op_call(committed, NewO1),
+ decision(reply, ctx_set(set_decision, commit,NewO2));
+ _ ->
+ decision(abort, O)
+ end;
+decision(abort, O) ->
+ decision_test(op_call(aborted, O), true, reply, reply);
+decision(reply, O) ->
+ case ctx_get(decision, O) of
+ D when D =:= ignore orelse D =:= commit ->
+ case ctx_get(returnbody, O) of
+ true ->
+ do_reply(dw, ctx_get(final_object, O), O);
+ false ->
+ do_reply(dw, O)
+ end;
+ abort ->
+ do_reply(fail, O)
+ end.
+
+%%% operation helpers
+
+set_existing({{ok, ExistingBin}, O}) ->
+ {true, ctx_set(set_existing_object, binary_to_term(ExistingBin), O)};
+set_existing({{error, notfound}, O}) ->
+ {true, ctx_set(set_existing_object, {error, notfound}, O)};
+set_existing({Other, O}) ->
+ {false, ctx_set(set_existing_object, Other, O)}.
+fetch_existing(O=#rkv_vop{ctx=Ctx}) ->
+ set_existing(store_call(get, [?CTX:bkey(Ctx)], O)).
+
+
+do_finalize(O) ->
+ Obj = ctx_get(merged_object, O),
+ PruneTime = ctx_get(prunetime, O),
+ VC = riak_object:vclock(Obj),
+ BProps = ctx_get(get_bucket_props, O),
+ AMObj = riak_kv_vnode:enforce_allow_mult(Obj, BProps),
+ case PruneTime of
+ undefined ->
+ {AMObj, ctx_set(set_final_object, AMObj, O)};
+ _ ->
+ StoreObj = riak_object:set_vclock(
+ AMObj,vclock:prune(VC, PruneTime, BProps)),
+ {StoreObj, ctx_set(set_final_object, StoreObj, O)}
+ end.
+
+do_reply(Type, #rkv_vop{ctx=Ctx}) ->
+ ?CTX:reply(Ctx, Type).
+
+do_reply(Type, Arg, #rkv_vop{ctx=Ctx}) ->
+ ?CTX:reply(Ctx, Type, Arg).
+
+do_syntactic_merge(Existing, Proposed, ReqId, PruneTime, O) ->
+ MergedObj = riak_object:syntactic_merge(Existing, Proposed,
+ term_to_binary(ReqId),
+ PruneTime),
+ case riak_object:vclock(MergedObj) =:= riak_object:vclock(Existing) of
+ true ->
+ ctx_set(set_merged_object,MergedObj,ctx_set(set_decision,ignore,O));
+ false ->
+ ctx_set(set_merged_object,MergedObj,ctx_set(set_decision,commit,O))
+ end.
+
+%%% call helpers
+
+store_call(Fun, Args, O=#rkv_vop{ctx=Ctx}) ->
+ {?CTX:store_call(Ctx, Fun, Args), O}.
+
+op_call(Fun, O=#rkv_vop{mod=Mod, modstate=ModState, ctx=Ctx}) ->
+ {Result, NewCtx, NewModState} = Mod:Fun(Ctx, ModState),
+ {Result, O#rkv_vop{modstate=NewModState, ctx=NewCtx}}.
+
+ctx_set(Fun, Arg, O=#rkv_vop{ctx=Ctx}) ->
+ O#rkv_vop{ctx=apply(?CTX, Fun, [Ctx,Arg])}.
+
+ctx_get(Fun, #rkv_vop{ctx=Ctx}) ->
+ apply(?CTX, Fun, [Ctx]).
+
+
+%%% state-flow drivers
+
+decision_test(Test,TestVal,TrueFlow,FalseFlow) ->
+ case Test of
+ {{propose, O}, NewO} ->
+ decision_flow(TrueFlow, Test, ctx_set(set_proposed_object, O, NewO));
+ {{merge, O}, NewO} ->
+ decision_flow(TrueFlow, Test, ctx_set(set_merged_object, O, NewO));
+ {{abort, NewO}} ->
+ decision(abort, NewO);
+ {TestVal,NewO} -> decision_flow(TrueFlow, Test, NewO);
+ {_Other,NewO} -> decision_flow(FalseFlow, Test, NewO)
+ end.
+
+decision_test_fn({{error, Reason}, NewO}, _TestFn, _TrueFlow, _FalseFlow) ->
+ decision(abort, ctx_set(set_decision, abort, ctx_set(set_abort_reason, Reason, NewO)));
+decision_test_fn({{abort, Reason}, NewO}, _TestFn, _TrueFlow, _FalseFlow) ->
+ decision(abort, ctx_set(set_decision, abort, ctx_set(set_abort_reason, Reason, NewO)));
+decision_test_fn({{propose, O}, NewO}, TestFn, TrueFlow, _FalseFlow) ->
+ decision_flow(TrueFlow, TestFn, ctx_set(set_proposed_object, O, NewO));
+decision_test_fn({{merge, O}, NewO}, TestFn, TrueFlow, _FalseFlow) ->
+ decision_flow(TrueFlow, TestFn, ctx_set(set_merged_object, O, NewO));
+decision_test_fn(Test,TestFn,TrueFlow,FalseFlow) ->
+ case TestFn(Test) of
+ {true, NewO} -> decision_flow(TrueFlow, Test, NewO);
+ {false, NewO} -> decision_flow(FalseFlow, Test, NewO)
+ end.
+
+decision_flow(X, _TestResult, O) -> decision(X, O).
+
+
@@ -0,0 +1,120 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_vnode_op_ctx: Request/transaction context for vnode ops
+%%
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Request/transaction context for vnode ops
+-module(riak_kv_vnode_op_ctx).
+-export([new/7,
+ type/1,
+ index/1,
+ sender/1,
+ bkey/1,
+ request_object/1,
+ existing_object/1,
+ set_existing_object/2,
+ proposed_object/1,
+ set_proposed_object/2,
+ merged_object/1,
+ set_merged_object/2,
+ final_object/1,
+ set_final_object/2,
+ decision/1,
+ set_decision/2,
+ abort_reason/1,
+ set_abort_reason/2,
+ options/1,
+ get_option/2,
+ get_option/3,
+ returnbody/1,
+ last_write_wins/1,
+ req_id/1,
+ prunetime/1,
+ get_bucket_props/1,
+ get_bucket_prop/2,
+ get_bucket_prop/3,
+ store_call/3,
+ reply/2,
+ reply/3]).
+
+-record(riak_kv_vnode_op_ctx, {type,
+ idx,
+ storecall,
+ sender,
+ bkey,
+ request_object,
+ options,
+ existing_object,
+ merged_object,
+ proposed_object,
+ final_object,
+ decision = abort :: commit | ignore | abort ,
+ abort_reason}).
+
+new(Type, StoreCall, Idx, Sender, BKey, Object, Options) ->
+ #riak_kv_vnode_op_ctx{type=Type,
+ idx=Idx,
+ storecall=StoreCall,
+ sender=Sender,
+ bkey=BKey,
+ request_object=Object,
+ options=Options}.
+
+type(#riak_kv_vnode_op_ctx{type=Type}) -> Type.
+index(#riak_kv_vnode_op_ctx{idx=Idx}) -> Idx.
+sender(#riak_kv_vnode_op_ctx{sender=Sender}) -> Sender.
+bkey(#riak_kv_vnode_op_ctx{bkey=BKey}) -> BKey.
+request_object(#riak_kv_vnode_op_ctx{request_object=Object}) -> Object.
+existing_object(#riak_kv_vnode_op_ctx{existing_object=Object}) -> Object.
+set_existing_object(O,Object) -> O#riak_kv_vnode_op_ctx{existing_object=Object}.
+proposed_object(#riak_kv_vnode_op_ctx{proposed_object=Object}) -> Object.
+set_proposed_object(O,Object) -> O#riak_kv_vnode_op_ctx{proposed_object=Object}.
+merged_object(#riak_kv_vnode_op_ctx{merged_object=Object}) -> Object.
+set_merged_object(O, Object) -> O#riak_kv_vnode_op_ctx{merged_object=Object}.
+final_object(#riak_kv_vnode_op_ctx{final_object=FinalObject}) -> FinalObject.
+set_final_object(O, Object) -> O#riak_kv_vnode_op_ctx{final_object=Object}.
+decision(#riak_kv_vnode_op_ctx{decision=Decision}) -> Decision.
+set_decision(O, Decision) -> O#riak_kv_vnode_op_ctx{decision=Decision}.
+abort_reason(#riak_kv_vnode_op_ctx{abort_reason=Reason}) -> Reason.
+set_abort_reason(O, Reason) -> O#riak_kv_vnode_op_ctx{abort_reason=Reason}.
+options(#riak_kv_vnode_op_ctx{options=Options}) -> Options.
+get_option(K, O=#riak_kv_vnode_op_ctx{}) -> get_option(K, O, undefined).
+get_option(K,O=#riak_kv_vnode_op_ctx{},D) -> proplists:get_value(K,options(O),D).
+returnbody(O) -> get_option(returnbody, O, false).
+last_write_wins(O) -> get_option(lww, O, false).
+req_id(O) -> get_option(reqid, O, 0).
+prunetime(O) -> get_option(prunetime, O, undefined).
+get_bucket_props(O) -> get_option(bprops, O, []).
+get_bucket_prop(K, O) -> get_bucket_prop(K, O, undefined).
+get_bucket_prop(K, O, D) -> proplists:get_value(K, get_bucket_props(O), D).
+
+store_call(#riak_kv_vnode_op_ctx{storecall=StoreCall}, Function, Args) ->
+ StoreCall(Function, Args).
+
+reply(O, Type) ->
+ riak_core_vnode:reply(sender(O), {Type, index(O), req_id(O)}).
+
+reply(O, Type, Arg) ->
+ riak_core_vnode:reply(sender(O), {Type, index(O), Arg, req_id(O)}).
+
+
+
+
+
Oops, something went wrong. Retry.

0 comments on commit 47f1047

Please sign in to comment.