Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

A simple way to store a PN-Counter in a riak_object #536

Merged
merged 25 commits into from

4 participants

@russelldb
Owner

Adds an API endpoint for PB and HTTP for incrementing (and getting the value of) a counter.

The counter is implemented as a PN-Counter. In fact the PN-Counter and G-Counter modules from riak_dt are copied into riak_kv (along with their quickcheck tests).

There is a wrapper module riak_kv_counter that mediates between riak_object and riak_kv_pn_counter.

The regular get/put fsms are used. Some small change is made to the vnode to allow counter siblings to merge to a single value before being written to disk.

The wm and pb endpoints also use the riak_kv_counter merge function to ensure a single value is returned to the user.

If (by accident, I hope) a bucket/key is written too as a regular riak_object write, the conflicting riak_object sibling is maintained.

To use store counters the bucket must be {allow_mult, true}. I recommend that you don't mix counter and non-counter objects in the same bucket.


Examples:

curl -v -X POST localhost:8098/buckets/my_counters/counters/c1 -d"1"
< Server: MochiWeb/1.1 WebMachine/1.9.2 (someone had painted it blue)
< Date: Thu, 25 Apr 2013 16:07:28 GMT
< Content-Type: text/plain
< Content-Length: 50
Counters require bucket property 'allow_mult=true'*
> curl -X PUT localhost:8098/buckets/my_counters/props -H"Content-Type: application/json" -d "{\"props\" : {\"allow_mult\": true}}"

curl -X POST localhost:8098/buckets/my_counters/counters/c1 -d"1"
curl localhost:8098/buckets/my_counters/counters/c2
1
> curl -X POST localhost:8098/buckets/my_counters/counters/c1 -d"100"
101
> curl -X POST localhost:8098/buckets/my_counters/counters/c1 -d"-1"
100

Etc.


Depends on:
https://github.com/basho/riak_pb/tree/rdb-kv-counter

Riak test at
https://github.com/basho/riak_test/tree/rdb-kv-counters

PB support at
https://github.com/basho/riak-erlang-client/tree/rdb-kv-counter


Obviously I'll create a squashed branch after the discussion / review.

@seancribbs

FWIW I used this branch for the demo at NoSQL Matters Cologne. wfm

@russelldb russelldb referenced this pull request in basho/riak_pb
Merged

Add counter messages #45

src/riak_kv_gcounter.erl
@@ -0,0 +1,187 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_gcounter: A state based, grow only, convergent counter
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
@jrwest
jrwest added a note

copyright update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_gcounter.erl
((8 lines not shown))
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%%% @doc
@jrwest
jrwest added a note

would be nice to add a reference similar to vclock.erl [1]

[1] https://github.com/basho/riak_core/blob/master/src/vclock.erl#L25

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_gcounter.erl
((26 lines not shown))
+
+-module(riak_kv_gcounter).
+
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-export([gen_op/0, update_expected/3, eqc_state_value/1]).
+-endif.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% EQC generator
+-ifdef(EQC).
@jrwest
jrwest added a note

know this is nitpicky but any reason this is at the top of the file instead of w/ all the other eqc stuff. found it a bit difficult reading the code. unless I'm missing some reason it needs to be defined here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_counter.erl
@@ -0,0 +1,140 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_counter: Counter logic to bridge riak_object and riak_kv_pncounter
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
@jrwest
jrwest added a note

copyright update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_counter.erl
((13 lines not shown))
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_kv_counter).
+
+-export([update/3, merge/1, value/1]).
+
+-include("riak_kv_wm_raw.hrl").
+
+%% @doc A counter is a two tuple of a `riak_kv_pncounter'
@jrwest
jrwest added a note

think this should be a module edoc but it would be considered the edoc for update/3?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_counter.erl
((21 lines not shown))
+%% -------------------------------------------------------------------
+-module(riak_kv_counter).
+
+-export([update/3, merge/1, value/1]).
+
+-include("riak_kv_wm_raw.hrl").
+
+%% @doc A counter is a two tuple of a `riak_kv_pncounter'
+%% stored in a `riak_object'
+%% with the tag `riak_kv_pncounter' as the first element.
+%% Since counters can be stored with any name, in any bucket, there is a
+%% chance that some sibling value for a counter is
+%% not a `riak_kv_pncounter' in that case, we keep the sibling
+%% for later resolution by the user.
+%%
+%% @TODO How do we let callers now about the sibling values?
@jrwest
jrwest added a note

@russelldb did you have any further thoughts on this? I can't think of a conceivable use case where this would happen on purpose. I think there is only so much we can do here. Forgetting about counters for a moment, if I write an application that accidentally uses the same key for two purposes Riak will never let me know. Errors would most likely (or hopefully) show up at the application level. With counters, the same holds but its even more likely I get an errror since my deserializer won't know what know what to do w/ the counter bytes. Since my assumption is this is an application-level bug I don't think its an issue.

tl;dr I don't think Riak needs to do anything special here (the TODO can be removed). Assuming the application wants to read what it wrote, when reading the sibling it will come across the counter bytes and highlight an application-level bug.

@russelldb Owner

I agree. And since I added the metadata content-type entry of application/riak-pncounter to all counter objects, client applications can detect a counter sibling and take some appropriate action.

@jrwest
jrwest added a note

cool, my vote would be to remove the TODO then. it has a spelling error anyways ;).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_pncounter.erl
@@ -0,0 +1,182 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_pncounter: A convergent, replicated, state based PN counter
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
@jrwest
jrwest added a note

copyright update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_pncounter.erl
((26 lines not shown))
+-include_lib("eqc/include/eqc.hrl").
+-endif.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% API
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2]).
+
+%% EQC API
+-ifdef(EQC).
+-export([gen_op/0, update_expected/3, eqc_state_value/1]).
+-endif.
+
+%% EQC generator
@jrwest
jrwest added a note

same comment here as the one on gcounter re: location of these definitions in file

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_pncounter.erl
((8 lines not shown))
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_kv_pncounter).
@jrwest
jrwest added a note

this module could use some typespecs. one for pncounter and all the exported functions.

@jrwest
jrwest added a note

this module could also use a reference to crdt paper

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_gcounter.erl
((12 lines not shown))
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%%% @doc
+%%% a G-Counter CRDT, borrows liberally from argv0 and Justin Sheehy's vclock module
+%%% @end
+
+-module(riak_kv_gcounter).
@jrwest
jrwest added a note

same here, re: typespecs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@jrwest jrwest commented on the diff
src/riak_kv_util.erl
@@ -332,6 +333,11 @@ mark_indexes_reformatted(Idx, 0, ForUpgrade) ->
mark_indexes_reformatted(_Idx, _ErrorCount, _ForUpgrade) ->
undefined.
+%% @Doc vtag creation function
+-spec make_vtag(erlang:timestamp()) -> list().
+make_vtag(Now) ->
@jrwest
jrwest added a note

The yessir backend had borrowed this function as well [1]. maybe replace its usage too?

[1] https://github.com/basho/riak_kv/blob/master/src/riak_kv_yessir_backend.erl#L319

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_vnode.erl
@@ -945,8 +949,18 @@ prepare_put(#state{idx=Idx,
IndexSpecs = []
end,
ObjToStore = case Coord of
- true ->
- riak_object:increment_vclock(RObj, VId, StartTime);
+ true ->
@jrwest
jrwest added a note

this was somewhat hard to grok w/ all the nested case statements. maybe pull out part into something like prepare_coord_put?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/riak_kv_wm_counter.erl
((8 lines not shown))
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc @TODO doc this
@jrwest
jrwest added a note

this should be finished w/ similar API docs to wm_object pre-merge if possible

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@jrwest

ran all the tests (including riak_test). code looks good. Was initially worried about upgrade path and future work but @russelldb has done a great job of making the changes to riak_kv minimal. It should be easy to integrate or even rip out his kv specific changes to make it work w/ DVVSets, etc. Most of the comments are code/style related not functional but a few things we discussed offline:

  • @russelldb is working on a branch that cleans up some of the duplication in the webmachine and pb code. going to look at this before merging (although if we want to get this in, imo it can wait if it has to, we have this type of duplication elsewhere, but its not ideal).
  • Adding a returnvalue option similar to returnbody on write that returns the counters value (also something that can be added later if we have to, but I think this would be good to include initially)
  • A more compact binary format for counters. Logically to users counters are numbers (regardless of implementation), having overhead similar to what we currently have for vector clocks is a lot. Also, this gets us binary versions for counters which is always a win.

I have not done any perf. testing w/ these changes. I believe @evanmcc was planning to do some if he has time. I can do some quick runs on EC2 spot instances or similar. I suspect the performance will be very similar to riak_kv since, as mentioned above, little is actually changed in the write path, but numbers are always better than my conjecture.

@russelldb russelldb Address review comments
Add `returnvalue` to both endpoints
Add specs + docs to counter modules
Add API docs to HTTP resource
Refactor prepare put code in vnode
Remove copied fun from yessir backend
1d7cbd9
@russelldb
Owner

Addressed them all except "compact binary format" which I will do after this scheduled break.

@russelldb
Owner

Heh, I tried copying the riak_object style binary encoding and it turns out a tad larger than t2b.

@jrwest

heh, I'm sure there are some gymnastics we could do but probably not worth it now. In that case: I have reviewed all the other changes and have taken returnvalue for a spin. Dunno if maybe someone from clients (e.g. @seancribbs) should take a loot at the interface side of things but :+1: from me! nice work @rdb.

@russelldb
Owner

Actually…I'm surprised. So a riak_object style scheme for the gcounter lead to a slightly larger size than t2b, as it did also for the pncounter (which just used the gcounter) but by the time you get up to riak_kv_counter (which use pncounter) the binary scheme is slightly better than t2b.

I wonder if there isn;'t some tweaking to be done here, to use t2b when it suits, and our own scheme elsewhere. I bet t2b is smarter about the size of the integers (for example) by using binary:encode_unsigned.

I pooshed the branch, in case you want to take a peek rdb-kv-counter...rdb-kv-counter_bin

@russelldb russelldb merged commit 2718992 into master
@seancribbs seancribbs was assigned
@thefosk

Can this feature be used to create atomic counters across the cluster?

@russelldb
Owner

No, it cannot. These are eventually consistent counters. Any replica / node can take an increment/decrement (even when partitioned) and that write will persist, and be correct when handoff / healing occurs. But no, not atomic across the cluster. Also be aware of the partial failure / partial success case: if you do an increment with W=2 and Riak reports a failure, maybe 1 replica did succeed. Re-attempting that operation may lead to 2 counts. That is to say these counters are not idempotent, either.

If you're familiar with Cassandra they're conceptually similar to those counters. Feel free to ask on the riak-users list[1] if you have more questions, please.

[1] http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

@seancribbs seancribbs deleted the rdb-kv-counter branch
@seancribbs seancribbs removed their assignment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 4, 2013
  1. @russelldb
Commits on Feb 7, 2013
  1. @russelldb
Commits on Feb 11, 2013
  1. @russelldb

    WIP commit

    russelldb authored
    EQC test
    Changes to counter because of it
    Export riak_object:index_specs
  2. @russelldb
  3. @russelldb
Commits on Feb 12, 2013
  1. @russelldb
  2. @russelldb
  3. @russelldb

    WIP commit

    russelldb authored
    EQC test
    Changes to counter because of it
    Export riak_object:index_specs
  4. @russelldb
  5. @russelldb
  6. @russelldb
  7. @russelldb

    Merge branch 'rdb-kv-counter' of beast:riak_kv into rdb-kv-counter

    russelldb authored
    Conflicts:
    	test/kv_counter_eqc.erl
Commits on Feb 18, 2013
  1. @russelldb

    WIP add PB handler

    russelldb authored
Commits on Apr 9, 2013
  1. @russelldb
  2. @russelldb

    Merge branch 'master' into rdb-kv-counter

    russelldb authored
    Conflicts:
    	src/riak_object.erl
Commits on Apr 10, 2013
  1. @russelldb

    Drop indexes from counters

    russelldb authored
    Add minimal required meta data to work with HTTP API
  2. @russelldb

    Respond with error if allow_mult is false on counter post

    russelldb authored
    Counters require sibling values to converge
Commits on May 9, 2013
  1. @russelldb
  2. @russelldb

    Add include for types

    russelldb authored
Commits on May 22, 2013
  1. @russelldb

    Address review comments

    russelldb authored
    Add `returnvalue` to both endpoints
    Add specs + docs to counter modules
    Add API docs to HTTP resource
    Refactor prepare put code in vnode
    Remove copied fun from yessir backend
Commits on May 23, 2013
  1. @russelldb

    Tentative addition of custom binary format for counters

    russelldb authored
    Counters all the way down.
Commits on May 24, 2013
  1. @russelldb

    Update binary format to use t2b for gcounter

    russelldb authored
    Fix eqc test ot work with binary encoded counters.
  2. @russelldb
  3. @russelldb

    Remove dbg statement

    russelldb authored
  4. @russelldb

    Merge pull request #563 from basho/rdb-kv-counter_bin

    russelldb authored
    Add binary format for counters
This page is out of date. Refresh to see the latest.
View
1  include/riak_kv_types.hrl
@@ -0,0 +1 @@
+-define(COUNTER_TYPE, "application/riak_pncounter").
View
3  src/riak_kv_app.erl
@@ -30,7 +30,8 @@
{riak_kv_pb_object, 9, 14}, %% Object requests
{riak_kv_pb_bucket, 15, 18}, %% Bucket requests
{riak_kv_pb_mapred, 23, 24}, %% MapReduce requests
- {riak_kv_pb_index, 25, 26} %% Secondary index requests
+ {riak_kv_pb_index, 25, 26}, %% Secondary index requests
+ {riak_kv_pb_counter, 50, 53} %% counter requests
]).
-define(MAX_FLUSH_PUT_FSM_RETRIES, 10).
View
187 src/riak_kv_counter.erl
@@ -0,0 +1,187 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_counter: Counter logic to bridge riak_object and riak_kv_pncounter
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc A counter is a two tuple of a `riak_kv_pncounter'
+%% stored in a `riak_object'
+%% with the tag `riak_kv_pncounter' as the first element.
+%% Since counters can be stored with any name, in any bucket, there is a
+%% chance that some sibling value for a counter is
+%% not a `riak_kv_pncounter' in that case, we keep the sibling
+%% for later resolution by the user.
+%%
+%% This module is the bridge between the `riak_kv_pncounter' data structure
+%% and riak_kv's `riak_object' and API endpoints.
+%%
+%% @see riak_kv_pncounter.erl
+%% @end
+
+-module(riak_kv_counter).
+
+-export([update/3, merge/1, value/1, new/2, to_binary/1, from_binary/1]).
+
+-include("riak_kv_wm_raw.hrl").
+-include_lib("riak_kv_types.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+-define(TAG, 69).
+-define(V1_VERS, 1).
+
+%% @doc Update `Actor's entry by `Amt' and store it in `RObj'.
+-spec update(riak_object:riak_object(), binary(), integer()) ->
+ riak_object:riak_object().
+update(RObj, Actor, Amt) ->
+ {Meta, Counter0, NonCounterSiblings} = merge_object(RObj),
+ Counter = case Amt of
+ 0 -> Counter0;
+ _ -> update_counter(Counter0, Actor, Amt)
+ end,
+ update_object(RObj, Meta, Counter, NonCounterSiblings).
+
+%% @doc Unlike regular, opaque `riak_object' values, conflicting
+%% counter writes can be merged by Riak, thanks to their internal
+%% CRDT PN-Counter structure.
+-spec merge(riak_object:riak_object()) ->
+ riak_object:riak_object().
+merge(RObj) ->
+ {Meta, Counter, NonCounterSiblings} = merge_object(RObj),
+ update_object(RObj, Meta, Counter, NonCounterSiblings).
+
+%% @doc Currently _IGNORES_ all non-counter sibling values
+-spec value(riak_object:riak_object()) ->
+ integer().
+value(RObj) ->
+ Contents = riak_object:get_contents(RObj),
+ {_Meta, Counter, _NonCounterSiblings} = merge_contents(Contents),
+ case Counter of
+ undefined -> 0;
+ _ ->
+ riak_kv_pncounter:value(Counter)
+ end.
+
+%% Merge contents _AND_ meta
+merge_object(RObj) ->
+ Contents = riak_object:get_contents(RObj),
+ merge_contents(Contents).
+
+%% Only merge the values of actual PN-Counters
+%% If a non-CRDT datum is present, keep it as a sibling value
+merge_contents(Contents) ->
+ lists:foldl(fun merge_value/2,
+ {undefined, undefined, []},
+ Contents).
+
+%% worker for `merge_contents/1'
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {undefined, undefined, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
+ {MD, Counter, NonCounterSiblings};
+merge_value({MD, <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>},
+ {MergedMeta, Mergedest, NonCounterSiblings}) ->
+ Counter = riak_kv_pncounter:from_binary(CounterBin),
+ {merge_meta(MD, MergedMeta), riak_kv_pncounter:merge(Counter, Mergedest), NonCounterSiblings};
+merge_value(NonCounter, {MD, Mergedest, NonCounterSiblings}) ->
+ {MD, Mergedest, [NonCounter | NonCounterSiblings]}.
+
+update_counter(undefined, Actor, Amt) ->
+ update_counter(riak_kv_pncounter:new(), Actor, Amt);
+update_counter(Counter, Actor, Amt) ->
+ Op = counter_op(Amt),
+ riak_kv_pncounter:update(Op, Actor, Counter).
+
+counter_op(Amt) when Amt < 0 ->
+ {decrement, Amt * -1};
+counter_op(Amt) ->
+ {increment, Amt}.
+
+%% This uses an exported but marked INTERNAL
+%% function of `riak_object:set_contents' to preserve
+%% non-counter sibling values and Metadata
+%% NOTE: if `Meta' is `undefined' then this
+%% is a new counter.
+update_object(RObj, _, undefined, _Siblings) ->
+ RObj;
+update_object(RObj, Meta, Counter, []) ->
+ RObj2 = riak_object:update_value(RObj, to_binary(Counter)),
+ RObj3 = riak_object:update_metadata(RObj2, counter_meta(Meta)),
+ riak_object:apply_updates(RObj3);
+update_object(RObj, Meta, Counter, SiblingValues) ->
+ %% keep non-counter siblings, too
+ riak_object:set_contents(RObj, [{counter_meta(Meta), to_binary(Counter)} | SiblingValues]).
+
+counter_meta(undefined) ->
+ Now = os:timestamp(),
+ M = dict:new(),
+ M2 = dict:store(?MD_LASTMOD, Now, M),
+ dict:store(?MD_VTAG, riak_kv_util:make_vtag(Now), M2);
+counter_meta(Meta) ->
+ Meta.
+
+%% Just a simple take the largest for meta values based on last mod
+merge_meta(Meta1, Meta2) ->
+ case later(lastmod(Meta1), lastmod(Meta2)) of
+ true ->
+ Meta1;
+ false ->
+ Meta2
+ end.
+
+lastmod(Meta) ->
+ dict:fetch(?MD_LASTMOD, Meta).
+
+later(TS1, TS2) ->
+ case timer:now_diff(TS1, TS2) of
+ Before when Before < 0 ->
+ false;
+ _ ->
+ true
+ end.
+
+new(B, K) ->
+ Bin = to_binary(riak_kv_pncounter:new()),
+ Doc0 = riak_object:new(B, K, Bin, ?COUNTER_TYPE),
+ riak_object:set_vclock(Doc0, vclock:fresh()).
+
+to_binary(Counter) ->
+ CounterBin = riak_kv_pncounter:to_binary(Counter),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, CounterBin/binary>>.
+
+from_binary(<<?TAG:8/integer,?V1_VERS:8/integer,CounterBin/binary>>) ->
+ riak_kv_pncounter:from_binary(CounterBin).
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+roundtrip_bin_test() ->
+ PN = riak_kv_pncounter:new(),
+ PN1 = riak_kv_pncounter:update({increment, 2}, <<"a1">>, PN),
+ PN2 = riak_kv_pncounter:update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = riak_kv_pncounter:update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = riak_kv_pncounter:update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ ?assert(byte_size(Bin) < term_to_binary({riak_kv_pncounter, PN4})).
+
+-endif.
View
256 src/riak_kv_gcounter.erl
@@ -0,0 +1,256 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_gcounter: A state based, grow only, convergent counter
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc
+%% A G-Counter CRDT. A G-Counter is a Grow-only counter. Modeled as a list of
+%% two-tuples. Each entry in the list is an {actor, count} pair. The value of the counter
+%% is the sum of all entries in the list. An actor may only update its own entry. An entry
+%% can only be incremented. Borrows liberally from argv0 and Justin Sheehy's vclock module
+%% in implementation.
+%%
+%% @see riak_kv_pncounter.erl for a counter that can be decremented
+%%
+%% @reference Marc Shapiro, Nuno Preguiça, Carlos Baquero, Marek Zawirski (2011) A comprehensive study of
+%% Convergent and Commutative Replicated Data Types. http://hal.upmc.fr/inria-00555588/
+%%
+%% @end
+
+-module(riak_kv_gcounter).
+
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
+
+%% EQC API
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-export([gen_op/0, update_expected/3, eqc_state_value/1]).
+-endif.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+-export_type([gcounter/0, gcounter_op/0]).
+
+-opaque gcounter() :: [entry()].
+
+-type entry() :: {Actor::term(), Count::pos_integer()}.
+-type gcounter_op() :: increment | {increment, pos_integer()}.
+
+%% @doc Create a new, empty `gcounter()'
+-spec new() -> gcounter().
+new() ->
+ [].
+
+%% @doc Create a `gcounter()' with an initial update
+-spec new(term(), pos_integer()) -> gcounter().
+new(Id, Count) when is_integer(Count), Count > 0 ->
+ update({increment, Count}, Id, new()).
+
+%% @doc The single total value of a `gcounter()'.
+-spec value(gcounter()) -> non_neg_integer().
+value(GCnt) ->
+ lists:sum([ Cnt || {_Act, Cnt} <- GCnt]).
+
+%% @doc `increment' the entry in `GCnt' for `Actor' by 1 or `{increment, Amt}'.
+%% returns an updated `gcounter()' or error if `Amt' is not a `pos_integer()'
+-spec update(gcounter_op(), term(), gcounter()) ->
+ gcounter().
+update(increment, Actor, GCnt) ->
+ increment_by(1, Actor, GCnt);
+update({increment, Amount}, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
+ increment_by(Amount, Actor, GCnt).
+
+%% @doc Merge two `gcounter()'s to a single `gcounter()'. This is the Least Upper Bound
+%% function described in the literature.
+-spec merge(gcounter(), gcounter()) -> gcounter().
+merge(GCnt1, GCnt2) ->
+ merge(GCnt1, GCnt2, []).
+
+%% @private merge two counters.
+-spec merge(gcounter(), gcounter(), gcounter()) -> gcounter().
+merge([], [], Acc) ->
+ lists:reverse(Acc);
+merge(LeftOver, [], Acc) ->
+ lists:reverse(Acc, LeftOver);
+merge([], LeftOver, Acc) ->
+ lists:reverse(Acc, LeftOver);
+merge([{Actor1, Cnt1}=AC1|Rest], Clock2, Acc) ->
+ case lists:keytake(Actor1, 1, Clock2) of
+ {value, {Actor1, Cnt2}, RestOfClock2} ->
+ merge(Rest, RestOfClock2, [{Actor1, max(Cnt1, Cnt2)}|Acc]);
+ false ->
+ merge(Rest, Clock2, [AC1|Acc])
+ end.
+
+%% @doc Are two `gcounter()'s structurally equal? This is not `value/1' equality.
+%% Two counters might represent the total `42', and not be `equal/2'. Equality here is
+%% that both counters contain the same actors and those actors have the same count.
+-spec equal(gcounter(), gcounter()) -> boolean().
+equal(VA,VB) ->
+ lists:sort(VA) =:= lists:sort(VB).
+
+%% @private peform the increment.
+-spec increment_by(pos_integer(), term(), gcounter()) -> gcounter().
+increment_by(Amount, Actor, GCnt) when is_integer(Amount), Amount > 0 ->
+ {Ctr, NewGCnt} = case lists:keytake(Actor, 1, GCnt) of
+ false ->
+ {Amount, GCnt};
+ {value, {_N, C}, ModGCnt} ->
+ {C + Amount, ModGCnt}
+ end,
+ [{Actor, Ctr}|NewGCnt].
+
+-define(TAG, 70).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of a `gcounter()'
+-spec to_binary(gcounter()) -> binary().
+to_binary(GCnt) ->
+ EntriesBin = term_to_binary(GCnt),
+ <<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>.
+
+%% @doc Decode binary G-Counter
+-spec from_binary(binary()) -> gcounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, EntriesBin/binary>>) ->
+ binary_to_term(EntriesBin).
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+-ifdef(EQC).
+%% EQC generator
+gen_op() ->
+ oneof([increment, {increment, gen_pos()}]).
+
+gen_pos()->
+ ?LET(X, int(), 1+abs(X)).
+
+update_expected(_ID, increment, Prev) ->
+ Prev+1;
+update_expected(_ID, {increment, By}, Prev) ->
+ Prev+By;
+update_expected(_ID, _Op, Prev) ->
+ Prev.
+
+eqc_state_value(S) ->
+ S.
+
+eqc_value_test_() ->
+ {timeout, 120, [?_assert(crdt_statem_eqc:prop_converge(0, 1000, ?MODULE))]}.
+-endif.
+
+new_test() ->
+ ?assertEqual([], new()).
+
+value_test() ->
+ GC1 = [{1, 1}, {2, 13}, {3, 1}],
+ GC2 = [],
+ ?assertEqual(15, value(GC1)),
+ ?assertEqual(0, value(GC2)).
+
+update_increment_test() ->
+ GC0 = new(),
+ GC1 = update(increment, 1, GC0),
+ GC2 = update(increment, 2, GC1),
+ GC3 = update(increment, 1, GC2),
+ ?assertEqual([{1, 2}, {2, 1}], GC3).
+
+update_increment_by_test() ->
+ GC0 = new(),
+ GC = update({increment, 7}, 1, GC0),
+ ?assertEqual([{1, 7}], GC).
+
+merge_test() ->
+ GC1 = [{<<"1">>, 1},
+ {<<"2">>, 2},
+ {<<"4">>, 4}],
+ GC2 = [{<<"3">>, 3},
+ {<<"4">>, 3}],
+ ?assertEqual([], merge(new(), new())),
+ ?assertEqual([{<<"1">>,1},{<<"2">>,2},{<<"3">>,3},{<<"4">>,4}],
+ lists:sort( merge(GC1, GC2))).
+
+merge_less_left_test() ->
+ GC1 = [{<<"5">>, 5}],
+ GC2 = [{<<"6">>, 6}, {<<"7">>, 7}],
+ ?assertEqual([{<<"5">>, 5},{<<"6">>,6}, {<<"7">>, 7}],
+ merge(GC1, GC2)).
+
+merge_less_right_test() ->
+ GC1 = [{<<"6">>, 6}, {<<"7">>,7}],
+ GC2 = [{<<"5">>, 5}],
+ ?assertEqual([{<<"5">>,5},{<<"6">>,6}, {<<"7">>, 7}],
+ lists:sort( merge(GC1, GC2)) ).
+
+merge_same_id_test() ->
+ GC1 = [{<<"1">>, 2},{<<"2">>,4}],
+ GC2 = [{<<"1">>, 3},{<<"3">>,5}],
+ ?assertEqual([{<<"1">>, 3},{<<"2">>,4},{<<"3">>,5}],
+ lists:sort( merge(GC1, GC2)) ).
+
+equal_test() ->
+ GC1 = [{1, 2}, {2, 1}, {4, 1}],
+ GC2 = [{1, 1}, {2, 4}, {3, 1}],
+ GC3 = [{1, 2}, {2, 1}, {4, 1}],
+ GC4 = [{4, 1}, {1, 2}, {2, 1}],
+ ?assertNot(equal(GC1, GC2)),
+ ?assert(equal(GC1, GC3)),
+ ?assert(equal(GC1, GC4)).
+
+usage_test() ->
+ GC1 = new(),
+ GC2 = new(),
+ ?assert(equal(GC1, GC2)),
+ GC1_1 = update({increment, 2}, a1, GC1),
+ GC2_1 = update(increment, a2, GC2),
+ GC3 = merge(GC1_1, GC2_1),
+ GC2_2 = update({increment, 3}, a3, GC2_1),
+ GC3_1 = update(increment, a4, GC3),
+ GC3_2 = update(increment, a1, GC3_1),
+ ?assertEqual([{a1, 3}, {a2, 1}, {a3, 3}, {a4, 1}],
+ lists:sort(merge(GC3_2, GC2_2))).
+
+roundtrip_bin_test() ->
+ GC = new(),
+ GC1 = update({increment, 2}, <<"a1">>, GC),
+ GC2 = update({increment, 4}, a2, GC1),
+ GC3 = update(increment, "a4", GC2),
+ GC4 = update({increment, 10000000000000000000000000000000000000000}, {complex, "actor", [<<"term">>, 2]}, GC3),
+ Bin = to_binary(GC4),
+ Decoded = from_binary(Bin),
+ ?assert(equal(GC4, Decoded)).
+
+lots_of_actors_test() ->
+ GC = lists:foldl(fun(_, GCnt) ->
+ ActorLen = crypto:rand_uniform(1, 1000),
+ Actor = crypto:rand_bytes(ActorLen),
+ Cnt = crypto:rand_uniform(1, 10000),
+ riak_kv_gcounter:update({increment, Cnt}, Actor, GCnt) end,
+ new(),
+ lists:seq(1, 1000)),
+ Bin = to_binary(GC),
+ Decoded = from_binary(Bin),
+ ?assert(equal(GC, Decoded)).
+
+-endif.
View
150 src/riak_kv_pb_counter.erl
@@ -0,0 +1,150 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_pb_counter: Expose counters over Protocol Buffers
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc <p>The Counter PB service for Riak KV. This covers the
+%% following request messages:</p>
+%%
+%% <pre>
+%% 29 - RpbCounterUpdateReq
+%% 31 - RpbCounterGetReq
+%% </pre>
+%%
+%% <p>This service produces the following responses:</p>
+%%
+%% <pre>
+%% 30 - RpbCounterUpdateResp - 0 length
+%% 32 - RpbCounterGetResp
+%% </pre>
+%%
+%% @end
+
+-module(riak_kv_pb_counter).
+
+-include_lib("riak_pb/include/riak_kv_pb.hrl").
+-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").
+
+-behaviour(riak_api_pb_service).
+
+-export([init/0,
+ decode/2,
+ encode/1,
+ process/2,
+ process_stream/3]).
+
+-import(riak_pb_kv_codec, [decode_quorum/1]).
+
+-record(state, {client}).
+
+-define(DEFAULT_TIMEOUT, 60000).
+
+%% The empty counter that is the body of all new counter objects
+-define(NEW_COUNTER, {riak_kv_pncounter, riak_kv_pncounter:new()}).
+
+%% @doc init/0 callback. Returns the service internal start
+%% state.
+-spec init() -> any().
+init() ->
+ {ok, C} = riak:local_client(),
+ #state{client=C}.
+
+%% @doc decode/2 callback. Decodes an incoming message.
+decode(Code, Bin) ->
+ {ok, riak_pb_codec:decode(Code, Bin)}.
+
+%% @doc encode/1 callback. Encodes an outgoing response message.
+encode(Message) ->
+ {ok, riak_pb_codec:encode(Message)}.
+
+%% @doc process/2 callback. Handles an incoming request message.
+process(#rpbcountergetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
+ basic_quorum=BQ}, #state{client=C} = State) ->
+ R = decode_quorum(R0),
+ PR = decode_quorum(PR0),
+ case C:get(B, K, make_option(r, R) ++
+ make_option(pr, PR) ++
+ make_option(notfound_ok, NFOk) ++
+ make_option(basic_quorum, BQ)) of
+ {ok, O} ->
+ Value = riak_kv_counter:value(O),
+ {reply, #rpbcountergetresp{value = Value}, State};
+ {error, notfound} ->
+ {reply, #rpbcountergetresp{}, State};
+ {error, Reason} ->
+ {error, {format,Reason}, State}
+ end;
+process(#rpbcounterupdatereq{bucket=B, key=K, w=W0, dw=DW0, pw=PW0, amount=CounterOp,
+ returnvalue=RetVal},
+ #state{client=C} = State) ->
+ case allow_mult(B) of
+ true ->
+ O = riak_kv_counter:new(B, K),
+
+ %% erlang_protobuffs encodes as 1/0/undefined
+ W = decode_quorum(W0),
+ DW = decode_quorum(DW0),
+ PW = decode_quorum(PW0),
+ Options = [{counter_op, CounterOp}] ++ return_value(RetVal),
+ case C:put(O, make_option(w, W) ++ make_option(dw, DW) ++
+ make_option(pw, PW) ++ [{timeout, default_timeout()} | Options]) of
+ ok ->
+ {reply, #rpbcounterupdateresp{}, State};
+ {ok, RObj} ->
+ Value = riak_kv_counter:value(RObj),
+ {reply, #rpbcounterupdateresp{value=Value}, State};
+ {error, notfound} ->
+ {reply, #rpbcounterupdateresp{}, State};
+ {error, Reason} ->
+ {error, {format, Reason}, State}
+ end;
+ false ->
+ {error, {format, "Counters require bucket property 'allow_mult=true'"}, State}
+ end.
+
+return_value(true) ->
+ [returnbody];
+return_value(_) ->
+ [].
+
+allow_mult(Bucket) ->
+ proplists:get_value(allow_mult, riak_core_bucket:get_bucket(Bucket)).
+
+%% @doc process_stream/3 callback. This service does not create any
+%% streaming responses and so ignores all incoming messages.
+process_stream(_,_,State) ->
+ {ignore, State}.
+
+%% ===================================================================
+%% Internal functions
+%% ===================================================================
+
+%% return a key/value tuple that we can ++ to other options so long as the
+%% value is not default or undefined -- those values are pulled from the
+%% bucket by the get/put FSMs.
+make_option(_, undefined) ->
+ [];
+make_option(_, default) ->
+ [];
+make_option(K, V) ->
+ [{K, V}].
+
+default_timeout() ->
+ ?DEFAULT_TIMEOUT.
View
245 src/riak_kv_pncounter.erl
@@ -0,0 +1,245 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_pncounter: A convergent, replicated, state based PN counter
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc
+%% A PN-Counter CRDT. A PN-Counter is essentially two G-Counters: one for increments and
+%% one for decrements. The value of the counter is the difference between the value of the
+%% Positive G-Counter and the value of the Negative G-Counter.
+%%
+%% @see riak_kv_gcounter.erl
+%%
+%% @reference Marc Shapiro, Nuno Preguiça, Carlos Baquero, Marek Zawirski (2011) A comprehensive study of
+%% Convergent and Commutative Replicated Data Types. http://hal.upmc.fr/inria-00555588/
+%%
+%% @end
+
+-module(riak_kv_pncounter).
+
+-export([new/0, new/2, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1]).
+
+%% EQC API
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-export([gen_op/0, update_expected/3, eqc_state_value/1]).
+-endif.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+-export_type([pncounter/0, pncounter_op/0]).
+
+-opaque pncounter() :: {riak_kv_gcounter:gcounter(), riak_kv_gcounter:gcounter()}.
+-type pncounter_op() :: riak_kv_gcounter:gcounter_op() | decrement_op().
+-type decrement_op() :: decrement | {decrement, pos_integer()}.
+
+%% @doc Create a new, empty `pncounter()'
+-spec new() -> pncounter().
+new() ->
+ {riak_kv_gcounter:new(), riak_kv_gcounter:new()}.
+
+%% @doc Create a `pncounter()' with an initial `Value' for `Actor'.
+-spec new(term(), integer()) -> pncounter().
+new(Actor, Value) when Value > 0 ->
+ update({increment, Value}, Actor, new());
+new(Actor, Value) when Value < 0 ->
+ update({decrement, Value * -1}, Actor, new());
+new(_Actor, _Zero) ->
+ new().
+
+%% @doc The single, total value of a `pncounter()'
+-spec value(pncounter()) -> integer().
+value({Incr, Decr}) ->
+ riak_kv_gcounter:value(Incr) - riak_kv_gcounter:value(Decr).
+
+%% @doc Update a `pncounter()'. The first argument is either the atom
+%% `increment' or `decrement' or the two tuples `{increment, pos_integer()}' or
+%% `{decrement, pos_integer()}'. In the case of the former, the operation's amount
+%% is `1'. Otherwise it is the value provided in the tuple's second element.
+%% `Actor' is any term, and the 3rd argument is the `pncounter()' to update.
+%%
+%% returns the updated `pncounter()'
+-spec update(pncounter_op(), term(), pncounter()) -> pncounter().
+update(increment, Actor, {Incr, Decr}) ->
+ {riak_kv_gcounter:update(increment, Actor, Incr), Decr};
+update({increment, By}, Actor, {Incr, Decr}) when is_integer(By), By > 0 ->
+ {riak_kv_gcounter:update({increment, By}, Actor, Incr), Decr};
+update(decrement, Actor, {Incr, Decr}) ->
+ {Incr, riak_kv_gcounter:update(increment, Actor, Decr)};
+update({decrement, By}, Actor, {Incr, Decr}) when is_integer(By), By > 0 ->
+ {Incr, riak_kv_gcounter:update({increment, By}, Actor, Decr)}.
+
+%% @doc Merge two `pncounter()'s to a single `pncounter()'. This is the Least Upper Bound
+%% function described in the literature.
+-spec merge(pncounter(), pncounter()) -> pncounter().
+merge({Incr1, Decr1}, {Incr2, Decr2}) ->
+ MergedIncr = riak_kv_gcounter:merge(Incr1, Incr2),
+ MergedDecr = riak_kv_gcounter:merge(Decr1, Decr2),
+ {MergedIncr, MergedDecr}.
+
+%% @doc Are two `pncounter()'s structurally equal? This is not `value/1' equality.
+%% Two counters might represent the total `-42', and not be `equal/2'. Equality here is
+%% that both counters represent exactly the same information.
+-spec equal(pncounter(), pncounter()) -> boolean().
+equal({Incr1, Decr1}, {Incr2, Decr2}) ->
+ riak_kv_gcounter:equal(Incr1, Incr2) andalso riak_kv_gcounter:equal(Decr1, Decr2).
+
+-define(TAG, 71).
+-define(V1_VERS, 1).
+
+%% @doc Encode an effecient binary representation of `pncounter()'
+-spec to_binary(pncounter()) -> binary().
+to_binary({P, N}) ->
+ PBin = riak_kv_gcounter:to_binary(P),
+ NBin = riak_kv_gcounter:to_binary(N),
+ PBinLen = byte_size(PBin),
+ NBinLen = byte_size(NBin),
+ <<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>.
+
+%% @doc Decode a binary encoded PN-Counter
+-spec from_binary(binary()) -> pncounter().
+from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer,
+ PBinLen:32/integer, PBin:PBinLen/binary,
+ NBinLen:32/integer, NBin:NBinLen/binary>>) ->
+ {riak_kv_gcounter:from_binary(PBin), riak_kv_gcounter:from_binary(NBin)}.
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+
+-ifdef(EQC).
+%% EQC generator
+gen_op() ->
+ oneof([increment, {increment, gen_pos()}, decrement, {decrement, gen_pos()} ]).
+
+gen_pos()->
+ ?LET(X, int(), 1+abs(X)).
+
+update_expected(_ID, increment, Prev) ->
+ Prev+1;
+update_expected(_ID, decrement, Prev) ->
+ Prev-1;
+update_expected(_ID, {increment, By}, Prev) ->
+ Prev+By;
+update_expected(_ID, {decrement, By}, Prev) ->
+ Prev-By;
+update_expected(_ID, _Op, Prev) ->
+ Prev.
+
+eqc_state_value(S) ->
+ S.
+
+eqc_value_test_() ->
+ {timeout, 120, [?_assert(crdt_statem_eqc:prop_converge(0, 1000, ?MODULE))]}.
+-endif.
+
+new_test() ->
+ ?assertEqual({[], []}, new()).
+
+value_test() ->
+ PNCnt1 = {[{1, 1}, {2, 13}, {3, 1}], [{2, 10}, {4, 1}]},
+ PNCnt2 = {[], []},
+ PNCnt3 = {[{1, 3}, {2, 1}, {3, 1}], [{1, 3}, {2, 1}, {3, 1}]},
+ ?assertEqual(4, value(PNCnt1)),
+ ?assertEqual(0, value(PNCnt2)),
+ ?assertEqual(0, value(PNCnt3)).
+
+update_increment_test() ->
+ PNCnt0 = new(),
+ PNCnt1 = update(increment, 1, PNCnt0),
+ PNCnt2 = update(increment, 2, PNCnt1),
+ PNCnt3 = update(increment, 1, PNCnt2),
+ ?assertEqual({[{1, 2}, {2, 1}], []}, PNCnt3).
+
+update_increment_by_test() ->
+ PNCnt0 = new(),
+ PNCnt1 = update({increment, 7}, 1, PNCnt0),
+ ?assertEqual({[{1, 7}], []}, PNCnt1).
+
+update_decrement_test() ->
+ PNCnt0 = new(),
+ PNCnt1 = update(increment, 1, PNCnt0),
+ PNCnt2 = update(increment, 2, PNCnt1),
+ PNCnt3 = update(increment, 1, PNCnt2),
+ PNCnt4 = update(decrement, 1, PNCnt3),
+ ?assertEqual({[{1, 2}, {2, 1}], [{1, 1}]}, PNCnt4).
+
+update_decrement_by_test() ->
+ PNCnt0 = new(),
+ PNCnt1 = update({increment, 7}, 1, PNCnt0),
+ PNCnt2 = update({decrement, 5}, 1, PNCnt1),
+ ?assertEqual({[{1, 7}], [{1, 5}]}, PNCnt2).
+
+merge_test() ->
+ PNCnt1 = {[{<<"1">>, 1},
+ {<<"2">>, 2},
+ {<<"4">>, 4}], []},
+ PNCnt2 = {[{<<"3">>, 3},
+ {<<"4">>, 3}], []},
+ ?assertEqual({[], []}, merge(new(), new())),
+ ?assertEqual({[{<<"1">>,1},{<<"2">>,2},{<<"4">>,4},{<<"3">>,3}], []},
+ merge(PNCnt1, PNCnt2)).
+
+merge_too_test() ->
+ PNCnt1 = {[{<<"5">>, 5}], [{<<"7">>, 4}]},
+ PNCnt2 = {[{<<"6">>, 6}, {<<"7">>, 7}], [{<<"5">>, 2}]},
+ ?assertEqual({[{<<"5">>, 5},{<<"6">>,6}, {<<"7">>, 7}], [{<<"7">>, 4}, {<<"5">>, 2}]},
+ merge(PNCnt1, PNCnt2)).
+
+equal_test() ->
+ PNCnt1 = {[{1, 2}, {2, 1}, {4, 1}], [{1, 1}, {3, 1}]},
+ PNCnt2 = {[{1, 1}, {2, 4}, {3, 1}], []},
+ PNCnt3 = {[{1, 2}, {2, 1}, {4, 1}], [{3, 1}, {1, 1}]},
+ PNCnt4 = {[{4, 1}, {1, 2}, {2, 1}], [{1, 1}, {3, 1}]},
+ ?assertNot(equal(PNCnt1, PNCnt2)),
+ ?assert(equal(PNCnt3, PNCnt4)),
+ ?assert(equal(PNCnt1, PNCnt3)).
+
+usage_test() ->
+ PNCnt1 = new(),
+ PNCnt2 = new(),
+ ?assert(equal(PNCnt1, PNCnt2)),
+ PNCnt1_1 = update({increment, 2}, a1, PNCnt1),
+ PNCnt2_1 = update(increment, a2, PNCnt2),
+ PNCnt3 = merge(PNCnt1_1, PNCnt2_1),
+ PNCnt2_2 = update({increment, 3}, a3, PNCnt2_1),
+ PNCnt3_1 = update(increment, a4, PNCnt3),
+ PNCnt3_2 = update(increment, a1, PNCnt3_1),
+ PNCnt3_3 = update({decrement, 2}, a5, PNCnt3_2),
+ PNCnt2_3 = update(decrement, a2, PNCnt2_2),
+ ?assertEqual({[{a1, 3}, {a4, 1}, {a2, 1}, {a3, 3}], [{a5, 2}, {a2, 1}]},
+ merge(PNCnt3_3, PNCnt2_3)).
+
+roundtrip_bin_test() ->
+ PN = new(),
+ PN1 = update({increment, 2}, <<"a1">>, PN),
+ PN2 = update({decrement, 1000000000000000000000000}, douglas_Actor, PN1),
+ PN3 = update(increment, [{very, ["Complex"], <<"actor">>}, honest], PN2),
+ PN4 = update(decrement, "another_acotr", PN3),
+ Bin = to_binary(PN4),
+ Decoded = from_binary(Bin),
+ ?assert(equal(PN4, Decoded)).
+
+-endif.
View
21 src/riak_kv_put_fsm.erl
@@ -611,6 +611,9 @@ handle_options([{returnbody, false}|T], State = #state{postcommit = Postcommit})
dw=erlang:max(1,State#state.dw),
returnbody=false})
end;
+handle_options([{counter_op, _Amt}=COP|T], State) ->
+ VNodeOpts = [COP | State#state.vnode_options],
+ handle_options(T, State#state{vnode_options=VNodeOpts});
handle_options([{_,_}|T], State) -> handle_options(T, State).
init_putcore(State = #state{n = N, w = W, pw = PW, dw = DW, allowmult = AllowMult,
@@ -660,14 +663,10 @@ update_last_modified(RObj) ->
%% objects with the same vclock on 0.14.2 if the same clientid was used in
%% the same second. It can be revisited post-1.0.0.
Now = os:timestamp(),
- NewMD = dict:store(?MD_VTAG, make_vtag(Now),
+ NewMD = dict:store(?MD_VTAG, riak_kv_util:make_vtag(Now),
dict:store(?MD_LASTMOD, Now, MD0)),
riak_object:update_metadata(RObj, NewMD).
-make_vtag(Now) ->
- <<HashAsNum:128/integer>> = crypto:md5(term_to_binary({node(), Now})),
- riak_core_util:integer_to_list(HashAsNum,62).
-
%% Invokes the hook and returns a tuple of
%% {Lang, Called, Result}
%% Where Called = {Mod, Fun} if Lang = erlang
@@ -863,15 +862,3 @@ atom2list(P) when is_pid(P)->
dtrace_errstr(Term) ->
io_lib:format("~P", [Term, 12]).
-
-%% ===================================================================
-%% EUnit tests
-%% ===================================================================
--ifdef(TEST).
-
-make_vtag_test() ->
- crypto:start(),
- ?assertNot(make_vtag(now()) =:=
- make_vtag(now())).
-
--endif.
View
13 src/riak_kv_util.erl
@@ -38,7 +38,8 @@
fix_incorrect_index_entries/1,
fix_incorrect_index_entries/0,
responsible_preflists/1,
- responsible_preflists/2]).
+ responsible_preflists/2,
+ make_vtag/1]).
-include_lib("riak_kv_vnode.hrl").
@@ -332,6 +333,11 @@ mark_indexes_reformatted(Idx, 0, ForUpgrade) ->
mark_indexes_reformatted(_Idx, _ErrorCount, _ForUpgrade) ->
undefined.
+%% @Doc vtag creation function
+-spec make_vtag(erlang:timestamp()) -> list().
+make_vtag(Now) ->
@jrwest
jrwest added a note

The yessir backend had borrowed this function as well [1]. maybe replace its usage too?

[1] https://github.com/basho/riak_kv/blob/master/src/riak_kv_yessir_backend.erl#L319

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ <<HashAsNum:128/integer>> = crypto:md5(term_to_binary({node(), Now})),
+ riak_core_util:integer_to_list(HashAsNum,62).
%% ===================================================================
%% EUnit tests
@@ -359,4 +365,9 @@ deleted_test() ->
O, dict:store(<<"X-Riak-Deleted">>, true, MD))),
true = is_x_deleted(O1).
+make_vtag_test() ->
+ crypto:start(),
+ ?assertNot(make_vtag(now()) =:=
+ make_vtag(now())).
+
-endif.
View
72 src/riak_kv_vnode.erl
@@ -110,7 +110,8 @@
bprops :: maybe_improper_list(),
starttime :: non_neg_integer(),
prunetime :: undefined| non_neg_integer(),
- is_index=false :: boolean() %% set if the b/end supports indexes
+ is_index=false :: boolean(), %% set if the b/end supports indexes
+ counter_op = undefined :: undefined | integer() %% if set this is a counter operation
}).
-spec maybe_create_hashtrees(state()) -> state().
@@ -845,6 +846,7 @@ do_put(Sender, {Bucket,_Key}=BKey, RObj, ReqID, StartTime, Options, State) ->
PruneTime = StartTime
end,
Coord = proplists:get_value(coord, Options, false),
+ CounterOp = proplists:get_value(counter_op, Options, undefined),
PutArgs = #putargs{returnbody=proplists:get_value(returnbody,Options,false) orelse Coord,
coord=Coord,
lww=proplists:get_value(last_write_wins, BProps, false),
@@ -853,7 +855,8 @@ do_put(Sender, {Bucket,_Key}=BKey, RObj, ReqID, StartTime, Options, State) ->
reqid=ReqID,
bprops=BProps,
starttime=StartTime,
- prunetime=PruneTime},
+ prunetime=PruneTime,
+ counter_op = CounterOp},
{PrepPutRes, UpdPutArgs} = prepare_put(State, PutArgs),
{Reply, UpdState} = perform_put(PrepPutRes, State, UpdPutArgs),
riak_core_vnode:reply(Sender, Reply),
@@ -920,7 +923,8 @@ prepare_put(#state{idx=Idx,
coord=Coord,
lww=LWW,
starttime=StartTime,
- prunetime=PruneTime},
+ prunetime=PruneTime,
+ counter_op = CounterOp},
IndexBackend) ->
GetReply =
case Mod:get(Bucket, Key, ModState) of
@@ -944,12 +948,7 @@ prepare_put(#state{idx=Idx,
false ->
IndexSpecs = []
end,
- ObjToStore = case Coord of
- true ->
- riak_object:increment_vclock(RObj, VId, StartTime);
- false ->
- RObj
- end,
+ ObjToStore = prepare_new_put(Coord, RObj, VId, StartTime, CounterOp),
{{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}};
{ok, Val} ->
OldObj = object_from_binary(Bucket, Key, Val),
@@ -959,29 +958,50 @@ prepare_put(#state{idx=Idx,
{newobj, NewObj} ->
VC = riak_object:vclock(NewObj),
AMObj = enforce_allow_mult(NewObj, BProps),
- case IndexBackend of
- true ->
- IndexSpecs =
- riak_object:diff_index_specs(AMObj,
+ IndexSpecs = case IndexBackend of
+ true ->
+ riak_object:diff_index_specs(AMObj,
OldObj);
- false ->
- IndexSpecs = []
+ false ->
+ []
end,
- case PruneTime of
- undefined ->
- ObjToStore = AMObj;
- _ ->
- ObjToStore =
- riak_object:set_vclock(AMObj,
- vclock:prune(VC,
- PruneTime,
- BProps))
+ ObjToStore = case PruneTime of
+ undefined ->
+ AMObj;
+ _ ->
+ riak_object:set_vclock(AMObj,
+ vclock:prune(VC,
+ PruneTime,
+ BProps))
end,
- {{true, ObjToStore},
+ ObjToStore2 = handle_counter(Coord, CounterOp, VId, ObjToStore),
+ {{true, ObjToStore2},
PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}}
end
end.
+%% @Doc in the case that this a co-ordinating put, prepare the object.
+prepare_new_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
+ VClockUp = riak_object:increment_vclock(RObj, VId, StartTime),
+ %% coordinating a _NEW_ counter operation means
+ %% creating + incrementing the counter.
+ %% Make a new counter, stuff it in the riak_object
+ riak_kv_counter:update(VClockUp, VId, CounterOp);
+prepare_new_put(true, RObj, VId, StartTime, _CounterOp) ->
+ riak_object:increment_vclock(RObj, VId, StartTime);
+prepare_new_put(false, RObj, _VId, _StartTime, _CounterOp) ->
+ RObj.
+
+handle_counter(true, CounterOp, VId, RObj) when is_integer(CounterOp) ->
+ riak_kv_counter:update(RObj, VId, CounterOp);
+handle_counter(false, CounterOp, _Vid, RObj) when is_integer(CounterOp) ->
+ %% non co-ord put, merge the values if there are siblings
+ %% 'cos that is the point of CRDTs / counters: no siblings
+ riak_kv_counter:merge(RObj);
+handle_counter(_Coord, _CounterOp, _VId, RObj) ->
+ %% i.e. not a counter op
+ RObj.
+
perform_put({false, Obj},
#state{idx=Idx}=State,
#putargs{returnbody=true,
@@ -1076,7 +1096,7 @@ put_merge(false, false, CurObj, UpdObj, _VId, _StartTime) -> % coord=false, LWW=
false ->
{newobj, ResObj}
end;
-put_merge(true, true, _CurObj, UpdObj, VId, StartTime) -> % coord=false, LWW=true
+put_merge(true, true, _CurObj, UpdObj, VId, StartTime) -> % coord=true, LWW=true
{newobj, riak_object:increment_vclock(UpdObj, VId, StartTime)};
put_merge(true, false, CurObj, UpdObj, VId, StartTime) ->
UpdObj1 = riak_object:increment_vclock(UpdObj, VId, StartTime),
View
6 src/riak_kv_web.erl
@@ -94,7 +94,11 @@ raw_dispatch(Name) ->
riak_kv_wm_link_walker, Props2},
{["buckets", bucket, "index", field, '*'],
- riak_kv_wm_index, Props2}
+ riak_kv_wm_index, Props2},
+
+ %% counters
+ {["buckets", bucket, "counters", key],
+ riak_kv_wm_counter, Props2}
].
is_post(Req) ->
View
366 src/riak_kv_wm_counter.erl
@@ -0,0 +1,366 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_wm_counter: Webmachine resource for counters
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Resource for serving Counters over HTTP.
+%%
+%% Available operations:
+%%
+%% POST /buckets/Bucket/counters/Key
+%% Increment the counter at `Bucket', `Key' by
+%% the integer amount of the request body. If the request body cannot be
+%% parsed (by erlang `list_to_integer/1') then a `400 bad request'
+%% is the result.
+%% The following query params are accepted (@see `riak_kv_wm_object' docs, too):
+%%
+%% <dl>
+%% <dt>w</dt><dd>The write quorum. See below for defaults and values.</dd>
+%% <dt>pw</dt><dd>The primary write quorum. See below for defaults and values.</dd>
+%% <dt>dw</dt><dd>The durable write quorum. See below for default and values.</dd>
+%% <dt>returnvalue</dt><dd>Boolean. Default is `false' if not provided. When `true'
+%% the response body will be the value of the counter.</dd>
+%% </dl>
+%%
+%% GET /buckets/Bucket/counters/Key
+%% Get the current value of the counter at `Bucket', `Key'. Result is a text/plain
+%% body with an integer value, or `not_found' if no counter exists at that resource location.
+%% The following query params are accepted:
+%%
+%% <dl>
+%% <dt>r</dt><dd>Read quorum. See below for defaults and values.</dd>
+%% <dt>pr</dt><dd>Primary read quorum. See below for defaults and values.</dd>
+%% <dt>basic_quorum</dt><dd>Boolean. Return as soon as a quorum of responses are received
+%% if true. Default is the bucket default, if absent.</dd>
+%% <dt>notfound_ok</dt><dd>Boolean. A `not_found` response from a vnode counts toward
+%% `r' quorum if true. Default is the bucket default, if absent.</dd>
+%% </dl>
+%%
+%% Quorum values (r/pr/w/pw/dw):
+%% <dl>
+%% <dt>default</dt<dd>Whatever the bucket default is. This is the value used
+%% for any absent value.</dd>
+%% <dt>quorum</dt><dd>(Bucket N val / 2) + 1</dd>
+%% <dt>all</dt><dd>All replicas must respond</dd>
+%% <dt>one</dt><dd>Any one response is enough</dd>
+%% <dt>Integer</dt><dd>That specific number of vnodes must respond. Must be =< N</dd>
+%% </dl>
+%% Please see http://docs.basho.com for details of all the quorum values and there effect.
+
+
+
+-module(riak_kv_wm_counter).
+
+%% webmachine resource exports
+-export([
+ init/1,
+ service_available/2,
+ forbidden/2,
+ allowed_methods/2,
+ malformed_request/2,
+ resource_exists/2,
+ content_types_provided/2,
+ post_is_create/2,
+ process_post/2,
+ accept_doc_body/2,
+ to_text/2
+ ]).
+%% The empty counter that is the body of all new counter objects
+-define(NEW_COUNTER, {riak_kv_pncounter, riak_kv_pncounter:new()}).
+
+%% @type context() = term()
+-record(ctx, {api_version, %% integer() - Determine which version of the API to use.
+ bucket, %% binary() - Bucket name (from uri)
+ key, %% binary() - Key (from uri)
+ client, %% riak_client() - the store client
+ r, %% integer() - r-value for reads
+ w, %% integer() - w-value for writes
+ dw, %% integer() - dw-value for writes
+ rw, %% integer() - rw-value for deletes
+ pr, %% integer() - number of primary nodes required in preflist on read
+ pw, %% integer() - number of primary nodes required in preflist on write
+ basic_quorum, %% boolean() - whether to use basic_quorum
+ notfound_ok, %% boolean() - whether to treat notfounds as successes
+ prefix, %% string() - prefix for resource uris
+ riak, %% local | {node(), atom()} - params for riak client
+ doc, %% {ok, riak_object()}|{error, term()} - the object found
+ bucketprops, %% proplist() - properties of the bucket
+ method, %% atom() - HTTP method for the request
+ counter_op :: integer() | undefined %% The amount to add to the counter
+ }).
+%% @type link() = {{Bucket::binary(), Key::binary()}, Tag::binary()}
+%% @type index_field() = {Key::string(), Value::string()}
+
+-include_lib("webmachine/include/webmachine.hrl").
+-include("riak_kv_wm_raw.hrl").
+
+%% @spec init(proplist()) -> {ok, context()}
+%% @doc Initialize this resource. This function extracts the
+%% 'prefix' and 'riak' properties from the dispatch args.
+init(Props) ->
+ {ok, #ctx{api_version=proplists:get_value(api_version, Props),
+ prefix=proplists:get_value(prefix, Props),
+ riak=proplists:get_value(riak, Props)}}.
+
+service_available(RD, Ctx=#ctx{riak=RiakProps}) ->
+ case riak_kv_wm_utils:get_riak_client(RiakProps, riak_kv_wm_utils:get_client_id(RD)) of
+ {ok, C} ->
+ {true,
+ RD,
+ Ctx#ctx{
+ method=wrq:method(RD),
+ client=C,
+ bucket=case wrq:path_info(bucket, RD) of
+ undefined -> undefined;
+ B -> list_to_binary(riak_kv_wm_utils:maybe_decode_uri(RD, B))
+ end,
+ key=case wrq:path_info(key, RD) of
+ undefined -> undefined;
+ K -> list_to_binary(riak_kv_wm_utils:maybe_decode_uri(RD, K))
+ end
+ }};
+ Error ->
+ {false,
+ wrq:set_resp_body(
+ io_lib:format("Unable to connect to Riak: ~p~n", [Error]),
+ wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)),
+ Ctx}
+ end.
+
+forbidden(RD, Ctx) ->
+ {riak_kv_wm_utils:is_forbidden(RD), RD, Ctx}.
+
+allowed_methods(RD, Ctx) ->
+ {['GET', 'POST'], RD, Ctx}.
+
+malformed_request(RD, Ctx0) when Ctx0#ctx.method =:= 'POST' ->
+ case catch list_to_integer(binary_to_list(wrq:req_body(RD))) of
+ {'EXIT', _} ->
+ {true, RD, Ctx0};
+ Change ->
+ Ctx = Ctx0#ctx{counter_op = Change},
+ case malformed_rw_params(RD, Ctx) of
+ Result={true, _, _} ->
+ Result;
+ {false, RWRD, RWCtx} ->
+ {false, RWRD, RWCtx}
+ end
+ end;
+malformed_request(RD, Ctx) ->
+ case malformed_rw_params(RD, Ctx) of
+ Result = {true, _, _} ->
+ Result;
+ {false, ResRD, ResCtx} ->
+ DocCtx = ensure_doc(ResCtx),
+ case DocCtx#ctx.doc of
+ {error, Reason} ->
+ handle_common_error(Reason, ResRD, DocCtx);
+ _ ->
+ {false, ResRD, DocCtx}
+ end
+ end.
+
+malformed_rw_params(RD, Ctx) ->
+ Res =
+ lists:foldl(fun malformed_rw_param/2,
+ {false, RD, Ctx},
+ [{#ctx.r, "r", "default"},
+ {#ctx.w, "w", "default"},
+ {#ctx.dw, "dw", "default"},
+ {#ctx.pw, "pw", "default"},
+ {#ctx.pr, "pr", "default"}]),
+ lists:foldl(fun malformed_boolean_param/2,
+ Res,
+ [{#ctx.basic_quorum, "basic_quorum", "default"},
+ {#ctx.notfound_ok, "notfound_ok", "default"}]).
+
+malformed_rw_param({Idx, Name, Default}, {Result, RD, Ctx}) ->
+ case catch normalize_rw_param(wrq:get_qs_value(Name, Default, RD)) of
+ P when (is_atom(P) orelse is_integer(P)) ->
+ {Result, RD, setelement(Idx, Ctx, P)};
+ _ ->
+ {true,
+ wrq:append_to_resp_body(
+ io_lib:format("~s query parameter must be an integer or "
+ "one of the following words: 'one', 'quorum' or 'all'~n",
+ [Name]),
+ wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)),
+ Ctx}
+ end.
+
+malformed_boolean_param({Idx, Name, Default}, {Result, RD, Ctx}) ->
+ case string:to_lower(wrq:get_qs_value(Name, Default, RD)) of
+ "true" ->
+ {Result, RD, setelement(Idx, Ctx, true)};
+ "false" ->
+ {Result, RD, setelement(Idx, Ctx, false)};
+ "default" ->
+ {Result, RD, setelement(Idx, Ctx, default)};
+ _ ->
+ {true,
+ wrq:append_to_resp_body(
+ io_lib:format("~s query parameter must be true or false~n",
+ [Name]),
+ wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)),
+ Ctx}
+ end.
+
+normalize_rw_param("default") -> default;
+normalize_rw_param("one") -> one;
+normalize_rw_param("quorum") -> quorum;
+normalize_rw_param("all") -> all;
+normalize_rw_param(V) -> list_to_integer(V).
+
+content_types_provided(RD, Ctx) ->
+ {[{"text/plain", to_text}], RD, Ctx}.
+
+resource_exists(RD, Ctx0) when Ctx0#ctx.method =:= 'GET' ->
+ DocCtx = ensure_doc(Ctx0),
+ case DocCtx#ctx.doc of
+ {ok, _Doc} ->
+ {true, RD, DocCtx};
+ {error, _} ->
+ %% This should never actually be reached because all the error
+ %% conditions from ensure_doc are handled up in malformed_request.
+ {false, RD, DocCtx}
+ end;
+resource_exists(RD, Ctx) ->
+ {true, RD, Ctx}.
+
+post_is_create(RD, Ctx) ->
+ {false, RD, Ctx}.
+
+process_post(RD, Ctx) -> accept_doc_body(RD, Ctx).
+
+accept_doc_body(RD, Ctx=#ctx{bucket=B, key=K, client=C,
+ counter_op=CounterOp}) ->
+ case allow_mult(B) of
+ true ->
+ Doc = riak_kv_counter:new(B, K),
+ Options = [{counter_op, CounterOp}] ++ return_value(RD),
+ case C:put(Doc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw}, {timeout, 60000} |
+ Options]) of
+ {error, Reason} ->
+ handle_common_error(Reason, RD, Ctx);
+ ok ->
+ {true, RD, Ctx#ctx{doc={ok, Doc}}};
+ {ok, RObj} ->
+ Body = produce_doc_body(RObj),
+ {true, wrq:append_to_resp_body(Body, RD), Ctx#ctx{doc={ok, RObj}}}
+ end;
+ false ->
+ handle_common_error(allow_mult_false, RD, Ctx)
+ end.
+
+return_value(RD) ->
+ case wrq:get_qs_value(?Q_RETURNVALUE, RD) of
+ ?Q_TRUE ->
+ [returnbody];
+ _ ->
+ []
+end.
+
+allow_mult(Bucket) ->
+ proplists:get_value(allow_mult, riak_core_bucket:get_bucket(Bucket)).
+
+to_text(RD, Ctx=#ctx{doc={ok, Doc}}) ->
+ {produce_doc_body(Doc), RD, Ctx}.
+
+produce_doc_body(Doc) ->
+ Value = riak_kv_counter:value(Doc),
+ integer_to_list(Value).
+
+ensure_doc(Ctx=#ctx{doc=undefined, key=undefined}) ->
+ Ctx#ctx{doc={error, notfound}};
+ensure_doc(Ctx=#ctx{doc=undefined, bucket=B, key=K, client=C, r=R,
+ pr=PR, basic_quorum=Quorum, notfound_ok=NotFoundOK}) ->
+ Ctx#ctx{doc=C:get(B, K, [{r, R}, {pr, PR},
+ {basic_quorum, Quorum}, {notfound_ok, NotFoundOK}])};
+ensure_doc(Ctx) -> Ctx.
+
+handle_common_error(Reason, RD, Ctx) ->
+ case {error, Reason} of
+ {error, too_many_fails} ->
+ {{halt, 503}, wrq:append_to_response_body("Too Many write failures"
+ " to satisfy W/DW\n", RD), Ctx};
+ {error, timeout} ->
+ {{halt, 503},
+ wrq:set_resp_header("Content-Type", "text/plain",
+ wrq:append_to_response_body(
+ io_lib:format("request timed out~n",[]),
+ RD)),
+ Ctx};
+ {error, notfound} ->
+ {{halt, 404},
+ wrq:set_resp_header("Content-Type", "text/plain",
+ wrq:append_to_response_body(
+ io_lib:format("not found~n",[]),
+ RD)),
+ Ctx};
+ {error, {deleted, _VClock}} ->
+ {{halt, 404},
+ wrq:set_resp_header("Content-Type", "text/plain",
+ wrq:append_to_response_body(
+ io_lib:format("not found~n",[]),
+ RD)),
+ Ctx};
+ {error, {n_val_violation, N}} ->
+ Msg = io_lib:format("Specified w/dw/pw values invalid for bucket"
+ " n value of ~p~n", [N]),
+ {{halt, 400}, wrq:append_to_response_body(Msg, RD), Ctx};
+ {error, allow_mult_false} ->
+ Msg = "Counters require bucket property 'allow_mult=true'",
+ {{halt, 409}, wrq:append_to_response_body(Msg, RD), Ctx};
+ {error, {r_val_unsatisfied, Requested, Returned}} ->
+ {{halt, 503},
+ wrq:set_resp_header("Content-Type", "text/plain",
+ wrq:append_to_response_body(
+ io_lib:format("R-value unsatisfied: ~p/~p~n",
+ [Returned, Requested]),
+ RD)),
+ Ctx};
+ {error, {w_val_unsatisfied, NumW, NumDW, W, DW}} ->
+ {{halt, 503},
+ wrq:set_resp_header("Content-Type", "text/plain",
+ wrq:append_to_response_body(
+ io_lib:format("W/DW-value unsatisfied: w=~p/~p dw=~p/~p~n",
+ [NumW, W, NumDW, DW]),
+ RD)),
+ Ctx};
+ {error, {pr_val_unsatisfied, Requested, Returned}} ->
+ {{halt, 503},
+ wrq:set_resp_header("Content-Type", "text/plain",
+ wrq:append_to_response_body(
+ io_lib:format("PR-value unsatisfied: ~p/~p~n",
+ [Returned, Requested]),
+ RD)),
+ Ctx};
+ {error, {pw_val_unsatisfied, Requested, Returned}} ->
+ Msg = io_lib:format("PW-value unsatisfied: ~p/~p~n", [Returned,
+ Requested]),
+ {{halt, 503}, wrq:append_to_response_body(Msg, RD), Ctx};
+ {error, Err} ->
+ {{halt, 500},
+ wrq:set_resp_header("Content-Type", "text/plain",
+ wrq:append_to_response_body(
+ io_lib:format("Error:~n~p~n",[Err]),
+ RD)),
+ Ctx}
+ end.
View
3  src/riak_kv_wm_raw.hrl
@@ -53,8 +53,6 @@
-define(JSON_EXTRACT, <<"search_extractor">>).
-define(JSON_EXTRACT_LEGACY, <<"rs_extractfun">>).
-
-
%% Names of HTTP query parameters
-define(Q_PROPS, "props").
-define(Q_BUCKETS, "buckets").
@@ -64,3 +62,4 @@
-define(Q_STREAM, "stream").
-define(Q_VTAG, "vtag").
-define(Q_RETURNBODY, "returnbody").
+-define(Q_RETURNVALUE, "returnvalue").
View
9 src/riak_kv_yessir_backend.erl
@@ -166,7 +166,7 @@ get(Bucket, Key, #state{op_get = Gets} = S) ->
end,
Meta = dict:new(),
Meta1 = dict:store(<<"X-Riak-Last-Modified">>, erlang:now(), Meta),
- Meta2 = dict:store(<<"X-Riak-VTag">>, make_vtag(erlang:now()), Meta1),
+ Meta2 = dict:store(<<"X-Riak-VTag">>, riak_kv_util:make_vtag(erlang:now()), Meta1),
O = riak_object:increment_vclock(riak_object:new(Bucket, Key, Bin, Meta2),
<<"yessir!">>, 1),
{ok, riak_object:to_binary(v0, O), S#state{op_get = Gets + 1}}.
@@ -308,7 +308,7 @@ fold_objects_fun(FoldObjectsFun, Bucket, Size) ->
Bin = value_for_random(VR, Size),
Meta = dict:new(),
Meta1 = dict:store(<<"X-Riak-Last-Modified">>, erlang:now(), Meta),
- Meta2 = dict:store(<<"X-Riak-VTag">>, make_vtag(erlang:now()), Meta1),
+ Meta2 = dict:store(<<"X-Riak-VTag">>, riak_kv_util:make_vtag(erlang:now()), Meta1),
O = riak_object:increment_vclock(riak_object:new(Bucket, Key, Bin, Meta2),
<<"yessir!">>, 1),
FoldObjectsFun(Bucket, Key, riak_object:to_binary(v0, O), Acc);
@@ -316,11 +316,6 @@ fold_objects_fun(FoldObjectsFun, Bucket, Size) ->
Acc
end.
-%% borrowed from kv get_fsm...
-make_vtag(Now) ->
- <<HashAsNum:128/integer>> = crypto:md5(term_to_binary({node(), Now})),
- riak_core_util:integer_to_list(HashAsNum,62).
-
get_binsize(<<"yessir.", Rest/binary>>) ->
get_binsize(Rest, 0);
get_binsize(_) ->
View
123 test/crdt_statem_eqc.erl
@@ -0,0 +1,123 @@
+%% -------------------------------------------------------------------
+%%
+%% crdt_statem_eqc: Quickcheck statem test for riak_dt modules
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(crdt_statem_eqc).
+
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-include_lib("eqc/include/eqc_statem.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-record(state,{vnodes=[], mod_state, vnode_id=0, mod}).
+
+-define(NUMTESTS, 1000).
+-define(QC_OUT(P),
+ eqc:on_output(fun(Str, Args) ->
+ io:format(user, Str, Args) end, P)).
+
+%% Initialize the state
+initial_state() ->
+ #state{}.
+
+%% Command generator, S is the state
+command(#state{vnodes=VNodes, mod=Mod}) ->
+ oneof([{call, ?MODULE, create, [Mod]}] ++
+ [{call, ?MODULE, update, [Mod, Mod:gen_op(), elements(VNodes)]} || length(VNodes) > 0] ++ %% If a vnode exists
+ [{call, ?MODULE, merge, [Mod, elements(VNodes), elements(VNodes)]} || length(VNodes) > 0] ++
+ [{call, ?MODULE, crdt_equals, [Mod, elements(VNodes), elements(VNodes)]} || length(VNodes) > 0]
+).
+
+%% Next state transformation, S is the current state
+next_state(#state{vnodes=VNodes, mod=Mod, vnode_id=ID, mod_state=Expected0}=S,V,{call,?MODULE,create,_}) ->
+ Expected = Mod:update_expected(ID, create, Expected0),
+ S#state{vnodes=VNodes++[{ID, V}], vnode_id=ID+1, mod_state=Expected};
+next_state(#state{vnodes=VNodes0, mod_state=Expected, mod=Mod}=S,V,{call,?MODULE, update, [Mod, Op, {ID, _C}]}) ->
+ VNodes = lists:keyreplace(ID, 1, VNodes0, {ID, V}),
+ S#state{vnodes=VNodes, mod_state=Mod:update_expected(ID, Op, Expected)};
+next_state(#state{vnodes=VNodes0, mod_state=Expected0, mod=Mod}=S,V,{call,?MODULE, merge, [_Mod, {IDS, _C}=_Source, {ID, _C}=_Dest]}) ->
+ VNodes = lists:keyreplace(ID, 1, VNodes0, {ID, V}),
+ Expected = Mod:update_expected(ID, {merge, IDS}, Expected0),
+ S#state{vnodes=VNodes, mod_state=Expected};
+next_state(S, _V, _C) ->
+ S.
+
+%% Precondition, checked before command is added to the command sequence
+precondition(_S,{call,_,_,_}) ->
+ true.
+
+%% Postcondition, checked after command has been evaluated
+%% OBS: S is the state before next_state(S,_,<command>)
+postcondition(_S,{call,?MODULE, crdt_equals, _},Res) ->
+ Res == true;
+postcondition(_S,{call,_,_,_},_Res) ->
+ true.
+
+prop_converge(InitialValue, NumTests, Mod) ->
+ eqc:quickcheck(eqc:numtests(NumTests, ?QC_OUT(prop_converge(InitialValue, Mod)))).
+
+prop_converge(InitialValue, Mod) ->
+ ?FORALL(Cmds,commands(?MODULE, #state{mod=Mod, mod_state=InitialValue}),
+ begin
+ {H,S,Res} = run_commands(?MODULE,Cmds),
+ Merged = merge_crdts(Mod, S#state.vnodes),
+ MergedVal = Mod:value(Merged),
+ ExpectedValue = Mod:eqc_state_value(S#state.mod_state),
+ ?WHENFAIL(
+ %% History: ~p\nState: ~p\ H,S,
+ io:format("History: ~p\nState: ~p", [H,S]),
+ conjunction([{res, equals(Res, ok)},
+ {total, equals(sort(MergedVal), sort(ExpectedValue))}]))
+ end).