Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

WIP add PB handler

  • Loading branch information...
commit eeddf9b298f0ddd3f8cc0c145614087f68935cf0 1 parent 807df14
@russelldb russelldb authored
View
3  src/riak_kv_app.erl
@@ -30,7 +30,8 @@
{riak_kv_pb_object, 9, 14}, %% Object requests
{riak_kv_pb_bucket, 15, 22}, %% Bucket requests
{riak_kv_pb_mapred, 23, 24}, %% MapReduce requests
- {riak_kv_pb_index, 25, 26} %% Secondary index requests
+ {riak_kv_pb_index, 25, 26}, %% Secondary index requests
+ {riak_kv_pb_counter, 29, 32} %% counter requests
]).
-define(MAX_FLUSH_PUT_FSM_RETRIES, 10).
View
6 src/riak_kv_counter.erl
@@ -80,9 +80,9 @@ merge_contents(Contents) ->
{undefined, []},
Contents).
-%% worker for `do_merge/1'
-merge_value({_MD, {riak_kv_pncounter, _Counter}}=PNCount, {undefined, NonCounterSiblings}) ->
- merge_value(PNCount, {riak_kv_pncounter:new(), NonCounterSiblings});
+%% worker for `merge_contents/1'
+merge_value({_MD, {riak_kv_pncounter, Counter}}, {undefined, NonCounterSiblings}) ->
+ {Counter, NonCounterSiblings};
merge_value({_MD, {riak_kv_pncounter, Counter}}, {Mergedest, NonCounterSiblings}) ->
{riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
merge_value(NonCounter, {Mergedest, NonCounterSiblings}) ->
View
137 src/riak_kv_pb_counter.erl
@@ -0,0 +1,137 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_pb_counter: Expose counters over Protocol Buffers
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc <p>The Counter PB service for Riak KV. This covers the
+%% following request messages:</p>
+%%
+%% <pre>
+%% 29 - RpbCounterUpdateReq
+%% 31 - RpbCounterGetReq
+%% </pre>
+%%
+%% <p>This service produces the following responses:</p>
+%%
+%% <pre>
+%% 30 - RpbCounterUpdateResp - 0 length
+%% 32 - RpbCounterGetResp
+%% </pre>
+%%
+%% @end
+
+-module(riak_kv_pb_counter).
+
+-include_lib("riak_pb/include/riak_kv_pb.hrl").
+-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").
+
+-behaviour(riak_api_pb_service).
+
+-export([init/0,
+ decode/2,
+ encode/1,
+ process/2,
+ process_stream/3]).
+
+-import(riak_pb_kv_codec, [decode_quorum/1]).
+
+-record(state, {client}).
+
+-define(DEFAULT_TIMEOUT, 60000).
+
+%% The empty counter that is the body of all new counter objects
+-define(NEW_COUNTER, {riak_kv_pncounter, riak_kv_pncounter:new()}).
+
+%% @doc init/0 callback. Returns the service internal start
+%% state.
+-spec init() -> any().
+init() ->
+ {ok, C} = riak:local_client(),
+ #state{client=C}.
+
+%% @doc decode/2 callback. Decodes an incoming message.
+decode(Code, Bin) ->
+ {ok, riak_pb_codec:decode(Code, Bin)}.
+
+%% @doc encode/1 callback. Encodes an outgoing response message.
+encode(Message) ->
+ {ok, riak_pb_codec:encode(Message)}.
+
+%% @doc process/2 callback. Handles an incoming request message.
+process(#rpbcountergetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
+ basic_quorum=BQ}, #state{client=C} = State) ->
+ R = decode_quorum(R0),
+ PR = decode_quorum(PR0),
+ case C:get(B, K, make_option(r, R) ++
+ make_option(pr, PR) ++
+ make_option(notfound_ok, NFOk) ++
+ make_option(basic_quorum, BQ)) of
+ {ok, O} ->
+ Value = riak_kv_counter:value(O),
+ {reply, #rpbcountergetresp{value = Value}, State};
+ {error, notfound} ->
+ {reply, #rpbcountergetresp{}, State};
+ {error, Reason} ->
+ {error, {format,Reason}, State}
+ end;
+process(#rpbcounterupdatereq{bucket=B, key=K, w=W0, dw=DW0, pw=PW0, amount=CounterOp,
+ indexes=Indexes},
+ #state{client=C} = State) ->
+
+ O0 = riak_object:new(B, K, ?NEW_COUNTER),
+ O = riak_object:set_vclock(O0, vclock:fresh()),
+ %% erlang_protobuffs encodes as 1/0/undefined
+ W = decode_quorum(W0),
+ DW = decode_quorum(DW0),
+ PW = decode_quorum(PW0),
+ IndexMeta = riak_pb_kv_codec:decode_content_meta(indexes, Indexes, undefined),
+ O2 = riak_object:update_metadata(O, dict:from_list(IndexMeta)),
+ Options = [{counter_op, CounterOp}],
+ case C:put(O2, make_option(w, W) ++ make_option(dw, DW) ++
+ make_option(pw, PW) ++ [{timeout, default_timeout()} | Options]) of
+ ok ->
+ {reply, #rpbcounterupdateresp{}, State};
+ {error, notfound} ->
+ {reply, #rpbcounterupdateresp{}, State};
+ {error, Reason} ->
+ {error, {format, Reason}, State}
+ end.
+
+%% @doc process_stream/3 callback. This service does not create any
+%% streaming responses and so ignores all incoming messages.
+process_stream(_,_,State) ->
+ {ignore, State}.
+
+%% ===================================================================
+%% Internal functions
+%% ===================================================================
+
+%% return a key/value tuple that we can ++ to other options so long as the
+%% value is not default or undefined -- those values are pulled from the
+%% bucket by the get/put FSMs.
+make_option(_, undefined) ->
+ [];
+make_option(_, default) ->
+ [];
+make_option(K, V) ->
+ [{K, V}].
+
+default_timeout() ->
+ ?DEFAULT_TIMEOUT.
View
3  src/riak_kv_wm_counter.erl
@@ -229,7 +229,6 @@ extract_index_fields(RD) ->
content_types_provided(RD, Ctx) ->
{[{"text/plain", to_text}], RD, Ctx}.
-
resource_exists(RD, Ctx0) when Ctx0#ctx.method =:= 'GET' ->
DocCtx = ensure_doc(Ctx0),
case DocCtx#ctx.doc of
@@ -271,7 +270,7 @@ ensure_doc(Ctx=#ctx{doc=undefined, key=undefined}) ->
Ctx#ctx{doc={error, notfound}};
ensure_doc(Ctx=#ctx{doc=undefined, bucket=B, key=K, client=C, r=R,
pr=PR, basic_quorum=Quorum, notfound_ok=NotFoundOK}) ->
- Ctx#ctx{doc=C:get(B, K, [deletedvclock, {r, R}, {pr, PR},
+ Ctx#ctx{doc=C:get(B, K, [{r, R}, {pr, PR},
{basic_quorum, Quorum}, {notfound_ok, NotFoundOK}])};
ensure_doc(Ctx) -> Ctx.
Please sign in to comment.
Something went wrong with that request. Please try again.