Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Respond with error if allow_mult is false on counter post

Counters require sibling values to converge
  • Loading branch information...
commit 31e60848af24c69d335b685ebb591dc2ab69d271 1 parent dfab047
@russelldb russelldb authored
Showing with 43 additions and 25 deletions.
  1. +23 −16 src/riak_kv_pb_counter.erl
  2. +20 −9 src/riak_kv_wm_counter.erl
View
39 src/riak_kv_pb_counter.erl
@@ -93,24 +93,31 @@ process(#rpbcountergetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
end;
process(#rpbcounterupdatereq{bucket=B, key=K, w=W0, dw=DW0, pw=PW0, amount=CounterOp},
#state{client=C} = State) ->
-
- O0 = riak_object:new(B, K, ?NEW_COUNTER),
- O = riak_object:set_vclock(O0, vclock:fresh()),
- %% erlang_protobuffs encodes as 1/0/undefined
- W = decode_quorum(W0),
- DW = decode_quorum(DW0),
- PW = decode_quorum(PW0),
- Options = [{counter_op, CounterOp}],
- case C:put(O, make_option(w, W) ++ make_option(dw, DW) ++
- make_option(pw, PW) ++ [{timeout, default_timeout()} | Options]) of
- ok ->
- {reply, #rpbcounterupdateresp{}, State};
- {error, notfound} ->
- {reply, #rpbcounterupdateresp{}, State};
- {error, Reason} ->
- {error, {format, Reason}, State}
+ case allow_mult(B) of
+ true ->
+ O0 = riak_object:new(B, K, ?NEW_COUNTER),
+ O = riak_object:set_vclock(O0, vclock:fresh()),
+ %% erlang_protobuffs encodes as 1/0/undefined
+ W = decode_quorum(W0),
+ DW = decode_quorum(DW0),
+ PW = decode_quorum(PW0),
+ Options = [{counter_op, CounterOp}],
+ case C:put(O, make_option(w, W) ++ make_option(dw, DW) ++
+ make_option(pw, PW) ++ [{timeout, default_timeout()} | Options]) of
+ ok ->
+ {reply, #rpbcounterupdateresp{}, State};
+ {error, notfound} ->
+ {reply, #rpbcounterupdateresp{}, State};
+ {error, Reason} ->
+ {error, {format, Reason}, State}
+ end;
+ false ->
+ {error, {format, "Counters require bucket property 'allow_mult=true'"}, State}
end.
+allow_mult(Bucket) ->
+ proplists:get_value(allow_mult, riak_core_bucket:get_bucket(Bucket)).
+
%% @doc process_stream/3 callback. This service does not create any
%% streaming responses and so ignores all incoming messages.
process_stream(_,_,State) ->
View
29 src/riak_kv_wm_counter.erl
@@ -208,17 +208,25 @@ process_post(RD, Ctx) -> accept_doc_body(RD, Ctx).
accept_doc_body(RD, Ctx=#ctx{bucket=B, key=K, client=C,
counter_op=CounterOp}) ->
- Doc0 = riak_object:new(B, K, ?NEW_COUNTER),
- VclockDoc = riak_object:set_vclock(Doc0, vclock:fresh()),
- Options = [{counter_op, CounterOp}],
- case C:put(VclockDoc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
- Options]) of
- {error, Reason} ->
- handle_common_error(Reason, RD, Ctx);
- ok ->
- {true, RD, Ctx#ctx{doc={ok, VclockDoc}}}
+ case allow_mult(B) of
+ true ->
+ Doc0 = riak_object:new(B, K, ?NEW_COUNTER),
+ VclockDoc = riak_object:set_vclock(Doc0, vclock:fresh()),
+ Options = [{counter_op, CounterOp}],
+ case C:put(VclockDoc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
+ Options]) of
+ {error, Reason} ->
+ handle_common_error(Reason, RD, Ctx);
+ ok ->
+ {true, RD, Ctx#ctx{doc={ok, VclockDoc}}}
+ end;
+ false ->
+ handle_common_error(allow_mult_false, RD, Ctx)
end.
+allow_mult(Bucket) ->
+ proplists:get_value(allow_mult, riak_core_bucket:get_bucket(Bucket)).
+
to_text(RD, Ctx=#ctx{doc={ok, Doc}}) ->
Value = riak_kv_counter:value(Doc),
{integer_to_list(Value), RD, Ctx}.
@@ -261,6 +269,9 @@ handle_common_error(Reason, RD, Ctx) ->
Msg = io_lib:format("Specified w/dw/pw values invalid for bucket"
" n value of ~p~n", [N]),
{{halt, 400}, wrq:append_to_response_body(Msg, RD), Ctx};
+ {error, allow_mult_false} ->
+ Msg = "Counters require bucket property 'allow_mult=true'",
+ {{halt, 409}, wrq:append_to_response_body(Msg, RD), Ctx};
{error, {r_val_unsatisfied, Requested, Returned}} ->
{{halt, 503},
wrq:set_resp_header("Content-Type", "text/plain",
Please sign in to comment.
Something went wrong with that request. Please try again.