Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' into rdb-kv-counter

Conflicts:
	src/riak_object.erl
  • Loading branch information...
commit 9c6d99d599be54febc0f8122e8b7cb38b392b1eb 2 parents c07ed65 + 19f19a9
@russelldb russelldb authored
Showing with 2,600 additions and 1,296 deletions.
  1. +3 −1 .gitignore
  2. +1 −4 rebar.config
  3. +34 −1 src/hashtree.erl
  4. +4 −2 src/riak_client.erl
  5. +3 −3 src/riak_kv.app.src
  6. +8 −1 src/riak_kv_app.erl
  7. +8 −4 src/riak_kv_backup.erl
  8. +8 −1 src/riak_kv_buckets_fsm_sup.erl
  9. +68 −0 src/riak_kv_console.erl
  10. +222 −22 src/riak_kv_eleveldb_backend.erl
  11. +226 −0 src/riak_kv_env.erl
  12. +258 −63 src/riak_kv_get_core.erl
  13. +11 −20 src/riak_kv_get_fsm.erl
  14. +8 −1 src/riak_kv_index_fsm_sup.erl
  15. +10 −11 src/riak_kv_index_hashtree.erl
  16. +8 −1 src/riak_kv_keys_fsm_sup.erl
  17. +4 −1 src/riak_kv_mrc_pipe.erl
  18. +136 −28 src/riak_kv_multi_backend.erl
  19. +1 −23 src/riak_kv_pb_bucket.erl
  20. +83 −9 src/riak_kv_pb_object.erl
  21. +187 −38 src/riak_kv_put_core.erl
  22. +19 −10 src/riak_kv_put_fsm.erl
  23. +86 −0 src/riak_kv_reformat.erl
  24. +258 −11 src/riak_kv_stat.erl
  25. +61 −34 src/riak_kv_stat_bc.erl
  26. +103 −0 src/riak_kv_util.erl
  27. +186 −16 src/riak_kv_vnode.erl
  28. +5 −14 src/riak_kv_wm_buckets.erl
  29. +7 −20 src/riak_kv_wm_keylist.erl
  30. +76 −54 src/riak_kv_wm_object.erl
  31. +11 −9 src/riak_kv_wm_utils.erl
  32. +2 −2 src/riak_kv_yessir_backend.erl
  33. +237 −47 src/riak_object.erl
  34. +0 −1  test/fsm_eqc_vnode.erl
  35. +90 −28 test/{get_fsm_qc.erl → get_fsm_eqc.erl}
  36. +8 −6 test/hashtree_eqc.erl
  37. +0 −761 test/mapred_test.erl
  38. +89 −49 test/put_fsm_eqc.erl
  39. +71 −0 test/riak_object_eqc.erl
View
4 .gitignore
@@ -5,4 +5,6 @@ priv/*
*.beam
doc
test.*-temp-data
-ebin
+ebin
+/.eqc-info
+/current_counterexample.eqc
View
5 rebar.config
@@ -4,16 +4,13 @@
{eunit_opts, [verbose]}.
{erl_first_files, [
- "src/riak_kv_backend.erl",
- "src/riak_index_backend.erl"
+ "src/riak_kv_backend.erl"
]}.
{deps, [
{riak_core, ".*", {git, "git://github.com/basho/riak_core", "master"}},
{erlang_js, ".*", {git, "git://github.com/basho/erlang_js", "master"}},
{bitcask, ".*", {git, "git://github.com/basho/bitcask", "master"}},
- {merge_index, ".*", {git, "git://github.com/basho/merge_index",
- "master"}},
{ebloom, ".*", {git, "git://github.com/basho/ebloom", "master"}},
{eper, ".*", {git, "git://github.com/basho/eper.git", "master"}},
{eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git",
View
35 src/hashtree.erl
@@ -506,7 +506,27 @@ share_segment_store(State, #state{ref=Ref, path=Path}) ->
-spec hash(term()) -> binary().
hash(X) ->
%% erlang:phash2(X).
- crypto:sha(term_to_binary(X)).
+ sha(term_to_binary(X)).
+
+sha(Bin) ->
+ Chunk = app_helper:get_env(riak_kv, anti_entropy_sha_chunk, 4096),
+ sha(Chunk, Bin).
+
+sha(Chunk, Bin) ->
+ Ctx1 = crypto:sha_init(),
+ Ctx2 = sha(Chunk, Bin, Ctx1),
+ SHA = crypto:sha_final(Ctx2),
+ SHA.
+
+sha(Chunk, Bin, Ctx) ->
+ case Bin of
+ <<Data:Chunk/binary, Rest/binary>> ->
+ Ctx2 = crypto:sha_update(Ctx, Data),
+ sha(Chunk, Rest, Ctx2);
+ Data ->
+ Ctx2 = crypto:sha_update(Ctx, Data),
+ Ctx2
+ end.
-spec update_levels(integer(),
[{integer(), [{integer(), binary()}]}],
@@ -1026,6 +1046,19 @@ delta_test() ->
%%%===================================================================
-ifdef(EQC).
+sha_test_() ->
+ {timeout, 60,
+ fun() ->
+ ?assert(eqc:quickcheck(eqc:testing_time(4, prop_sha())))
+ end
+ }.
+
+prop_sha() ->
+ ?FORALL(Size, choose(256, 1024*1024),
+ ?FORALL(Chunk, choose(1, Size),
+ ?FORALL(Bin, binary(Size),
+ sha(Chunk, Bin) =:= crypto:sha(Bin)))).
+
eqc_test_() ->
{timeout, 5,
fun() ->
View
6 src/riak_client.erl
@@ -250,7 +250,8 @@ delete(Bucket,Key,Options,Timeout) when is_list(Options) ->
ReqId = mk_reqid(),
riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, Options, Timeout,
Me, ClientId]),
- wait_for_reqid(ReqId, Timeout);
+ RTimeout = recv_timeout(Options),
+ wait_for_reqid(ReqId, erlang:min(Timeout, RTimeout));
delete(Bucket,Key,RW,Timeout) ->
delete(Bucket,Key,[{rw, RW}], Timeout).
@@ -295,7 +296,8 @@ delete_vclock(Bucket,Key,VClock,Options,Timeout) when is_list(Options) ->
ReqId = mk_reqid(),
riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, Options, Timeout,
Me, ClientId, VClock]),
- wait_for_reqid(ReqId, Timeout);
+ RTimeout = recv_timeout(Options),
+ wait_for_reqid(ReqId, erlang:min(Timeout, RTimeout));
delete_vclock(Bucket,Key,VClock,RW,Timeout) ->
delete_vclock(Bucket,Key,VClock,[{rw, RW}],Timeout).
View
6 src/riak_kv.app.src
@@ -3,7 +3,7 @@
{application, riak_kv,
[
{description, "Riak Key/Value Store"},
- {vsn, "1.3.0"},
+ {vsn, "1.3.1"},
{applications, [
kernel,
stdlib,
@@ -36,8 +36,8 @@
%% use the legacy routines for tracking kv stats
{legacy_stats, true},
- %% Enable active anti-entropy
- {anti_entropy, {on, []}},
+ %% Disable active anti-entropy by default
+ {anti_entropy, {off, []}},
%% Allow Erlang MapReduce functions to be specified as
%% strings.
View
9 src/riak_kv_app.erl
@@ -28,7 +28,7 @@
-define(SERVICES, [{riak_kv_pb_object, 3, 6}, %% ClientID stuff
{riak_kv_pb_object, 9, 14}, %% Object requests
- {riak_kv_pb_bucket, 15, 22}, %% Bucket requests
+ {riak_kv_pb_bucket, 15, 18}, %% Bucket requests
{riak_kv_pb_mapred, 23, 24}, %% MapReduce requests
{riak_kv_pb_index, 25, 26}, %% Secondary index requests
{riak_kv_pb_counter, 29, 32} %% counter requests
@@ -89,6 +89,9 @@ start(_Type, _StartArgs) ->
%% the app is missing or packaging is broken.
catch cluster_info:register_app(riak_kv_cinfo),
+ %% print out critical env limits for support/debugging purposes
+ catch riak_kv_env:doc_env(),
+
%% Spin up supervisor
case riak_kv_sup:start_link() of
{ok, Pid} ->
@@ -138,6 +141,10 @@ start(_Type, _StartArgs) ->
[enabled_v1, disabled],
disabled),
+ riak_core_capability:register({riak_kv, object_format},
+ [v1, v0],
+ v0),
+
%% Go ahead and mark the riak_kv service as up in the node watcher.
%% The riak_core_ring_handler blocks until all vnodes have been started
%% synchronously.
View
12 src/riak_kv_backup.erl
@@ -93,16 +93,20 @@ backup_vnodes([Partition|T], Node) ->
receive stop -> stop end,
backup_vnodes(T, Node).
-backup_folder(_, V, Pid) ->
- Pid ! {backup, V},
+backup_folder(K, V, Pid) ->
+ Pid ! {backup, {K, V}},
Pid.
result_collector(PPid) ->
receive
stop ->
PPid ! stop;
- {backup, M} when is_binary(M) ->
- disk_log:log(?TABLE, M),
+ {backup, {{B, K}, M}} when is_binary(M) ->
+ %% make sure binary is encoded using term_to_binary (v0)
+ %% not v1 format. restore does not have access to bucket/key
+ %% so must include them in encoded format, which v1 does not
+ ObjBin = riak_object:to_binary_version(v0, B, K, M),
+ disk_log:log(?TABLE, ObjBin),
result_collector(PPid)
end.
View
9 src/riak_kv_buckets_fsm_sup.erl
@@ -31,7 +31,14 @@
-export([init/1]).
start_buckets_fsm(Node, Args) ->
- supervisor:start_child({?MODULE, Node}, Args).
+ case supervisor:start_child({?MODULE, Node}, Args) of
+ {ok, Pid} ->
+ riak_kv_stat:update({list_create, Pid}),
+ {ok, Pid};
+ Error ->
+ riak_kv_stat:update(list_create_error),
+ Error
+ end.
%% @spec start_link() -> ServerRet
%% @doc API for starting the supervisor.
View
68 src/riak_kv_console.erl
@@ -36,6 +36,7 @@
cluster_info/1,
down/1,
aae_status/1,
+ reformat_indexes/1,
reload_code/1]).
%% Arrow is 24 chars wide
@@ -421,6 +422,73 @@ format_timestamp(_Now, undefined) ->
format_timestamp(Now, TS) ->
riak_core_format:human_time_fmt("~.1f", timer:now_diff(Now, TS)).
+parse_int(IntStr) ->
+ try
+ list_to_integer(IntStr)
+ catch
+ error:badarg ->
+ undefined
+ end.
+
+index_reformat_options([], Opts) ->
+ Defaults = [{concurrency, 2}, {batch_size, 100}],
+ AddIfAbsent =
+ fun({Name,Val}, Acc) ->
+ case lists:keymember(Name, 1, Acc) of
+ true ->
+ Acc;
+ false ->
+ [{Name, Val} | Acc]
+ end
+ end,
+ lists:foldl(AddIfAbsent, Opts, Defaults);
+index_reformat_options(["--downgrade"], Opts) ->
+ [{downgrade, true} | Opts];
+index_reformat_options(["--downgrade" | More], _Opts) ->
+ io:format("Invalid arguments after downgrade switch : ~p~n", [More]),
+ undefined;
+index_reformat_options([IntStr | Rest], Opts) ->
+ HasConcurrency = lists:keymember(concurrency, 1, Opts),
+ HasBatchSize = lists:keymember(batch_size, 1, Opts),
+ case {parse_int(IntStr), HasConcurrency, HasBatchSize} of
+ {_, true, true} ->
+ io:format("Expected --downgrade instead of ~p~n", [IntStr]),
+ undefined;
+ {undefined, _, _ } ->
+ io:format("Expected integer parameter instead of ~p~n", [IntStr]),
+ undefined;
+ {IntVal, false, false} ->
+ index_reformat_options(Rest, [{concurrency, IntVal} | Opts]);
+ {IntVal, true, false} ->
+ index_reformat_options(Rest, [{batch_size, IntVal} | Opts])
+ end;
+index_reformat_options(_, _) ->
+ undefined.
+
+reformat_indexes(Args) ->
+ Opts = index_reformat_options(Args, []),
+ case Opts of
+ undefined ->
+ io:format("Expected options: <concurrency> <batch size> [--downgrade]~n"),
+ ok;
+ _ ->
+ start_index_reformat(Opts),
+ io:format("index reformat started with options ~p ~n", [Opts]),
+ io:format("check console.log for status information~n"),
+ ok
+ end.
+
+start_index_reformat(Opts) ->
+ spawn(fun() -> run_index_reformat(Opts) end).
+
+run_index_reformat(Opts) ->
+ try riak_kv_util:fix_incorrect_index_entries(Opts)
+ catch
+ Err:Reason ->
+ lager:error("index reformat crashed with error type ~p and reason: ~p",
+ [Err, Reason])
+ end.
+
%%%===================================================================
%%% Private
%%%===================================================================
View
244 src/riak_kv_eleveldb_backend.erl
@@ -33,6 +33,10 @@
put/5,
delete/4,
drop/1,
+ fix_index/3,
+ mark_indexes_fixed/2,
+ set_legacy_indexes/2,
+ fixed_index_status/1,
fold_buckets/4,
fold_keys/4,
fold_objects/4,
@@ -50,7 +54,8 @@
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold, indexes]).
+-define(CAPABILITIES, [async_fold, indexes, index_reformat]).
+-define(FIXED_INDEXES_KEY, fixed_indexes).
-record(state, {ref :: reference(),
data_root :: string(),
@@ -58,7 +63,9 @@
config :: config(),
read_opts = [],
write_opts = [],
- fold_opts = [{fill_cache, false}]
+ fold_opts = [{fill_cache, false}],
+ fixed_indexes = false, %% true if legacy indexes have be rewritten
+ legacy_indexes = false %% true if new writes use legacy indexes (downgrade)
}).
@@ -99,11 +106,30 @@ start(Partition, Config) ->
S0 = init_state(DataDir, Config),
case open_db(S0) of
{ok, State} ->
- {ok, State};
+ determine_fixed_index_status(State);
{error, Reason} ->
{error, Reason}
end.
+determine_fixed_index_status(State) ->
+ case indexes_fixed(State) of
+ {error, Reason} ->
+ {error, Reason};
+ true ->
+ {ok, State#state{fixed_indexes=true}};
+ false ->
+ case is_empty(State) of
+ true -> mark_indexes_fixed_on_start(State);
+ false -> {ok, State#state{fixed_indexes=false}}
+ end
+ end.
+
+mark_indexes_fixed_on_start(State) ->
+ case mark_indexes_fixed(State, true) of
+ {error, Reason, _} -> {error, Reason};
+ Res -> Res
+ end.
+
%% @doc Stop the eleveldb backend
-spec stop(state()) -> ok.
stop(State) ->
@@ -138,18 +164,25 @@ get(Bucket, Key, #state{read_opts=ReadOpts,
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{ref=Ref,
- write_opts=WriteOpts}=State) ->
+ write_opts=WriteOpts,
+ legacy_indexes=WriteLegacy,
+ fixed_indexes=FixedIndexes}=State) ->
%% Create the KV update...
StorageKey = to_object_key(Bucket, PrimaryKey),
Updates1 = [{put, StorageKey, Val}],
%% Convert IndexSpecs to index updates...
F = fun({add, Field, Value}) ->
- {put, to_index_key(Bucket, PrimaryKey, Field, Value), <<>>};
+ case WriteLegacy of
+ true ->
+ [{put, to_legacy_index_key(Bucket, PrimaryKey, Field, Value), <<>>}];
+ false ->
+ [{put, to_index_key(Bucket, PrimaryKey, Field, Value), <<>>}]
+ end;
({remove, Field, Value}) ->
- {delete, to_index_key(Bucket, PrimaryKey, Field, Value)}
+ index_deletes(FixedIndexes, Bucket, PrimaryKey, Field, Value)
end,
- Updates2 = [F(X) || X <- IndexSpecs],
+ Updates2 = lists:flatmap(F, IndexSpecs),
%% Perform the write...
case eleveldb:write(Ref, Updates1 ++ Updates2, WriteOpts) of
@@ -159,13 +192,105 @@ put(Bucket, PrimaryKey, IndexSpecs, Val, #state{ref=Ref,
{error, Reason, State}
end.
+indexes_fixed(#state{ref=Ref,read_opts=ReadOpts}) ->
+ case eleveldb:get(Ref, to_md_key(?FIXED_INDEXES_KEY), ReadOpts) of
+ {ok, <<1>>} ->
+ true;
+ {ok, <<0>>} ->
+ false;
+ not_found ->
+ false;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+index_deletes(FixedIndexes, Bucket, PrimaryKey, Field, Value) ->
+ IndexKey = to_index_key(Bucket, PrimaryKey, Field, Value),
+ LegacyKey = to_legacy_index_key(Bucket, PrimaryKey, Field, Value),
+ KeyDelete = [{delete, IndexKey}],
+ LegacyDelete = [{delete, LegacyKey}
+ || FixedIndexes =:= false andalso IndexKey =/= LegacyKey],
+ KeyDelete ++ LegacyDelete.
+
+fix_index(IndexKeys, ForUpgrade, #state{ref=Ref,
+ read_opts=ReadOpts,
+ write_opts=WriteOpts} = State)
+ when is_list(IndexKeys) ->
+ FoldFun =
+ fun(ok, {Success, Ignore, Error}) ->
+ {Success+1, Ignore, Error};
+ (ignore, {Success, Ignore, Error}) ->
+ {Success, Ignore+1, Error};
+ ({error, _}, {Success, Ignore, Error}) ->
+ {Success, Ignore, Error+1}
+ end,
+ Totals =
+ lists:foldl(FoldFun, {0,0,0},
+ [fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts)
+ || {_Bucket, IndexKey} <- IndexKeys]),
+ {reply, Totals, State};
+fix_index(IndexKey, ForUpgrade, #state{ref=Ref,
+ read_opts=ReadOpts,
+ write_opts=WriteOpts} = State) ->
+ case fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts) of
+ Atom when is_atom(Atom) ->
+ {Atom, State};
+ {error, Reason} ->
+ {error, Reason, State}
+ end.
+
+fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts) ->
+ case eleveldb:get(Ref, IndexKey, ReadOpts) of
+ {ok, _} ->
+ {Bucket, Key, Field, Value} = from_index_key(IndexKey),
+ NewKey = case ForUpgrade of
+ true -> to_index_key(Bucket, Key, Field, Value);
+ false -> to_legacy_index_key(Bucket, Key, Field, Value)
+ end,
+ Updates = [{delete, IndexKey}, {put, NewKey, <<>>}],
+ case eleveldb:write(Ref, Updates, WriteOpts) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ not_found ->
+ ignore;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+mark_indexes_fixed(State=#state{fixed_indexes=true}, true) ->
+ {ok, State};
+mark_indexes_fixed(State=#state{fixed_indexes=false}, false) ->
+ {ok, State};
+mark_indexes_fixed(State=#state{ref=Ref, write_opts=WriteOpts}, ForUpgrade) ->
+ Value = case ForUpgrade of
+ true -> <<1>>;
+ false -> <<0>>
+ end,
+ Updates = [{put, to_md_key(?FIXED_INDEXES_KEY), Value}],
+ case eleveldb:write(Ref, Updates, WriteOpts) of
+ ok ->
+ {ok, State#state{fixed_indexes=ForUpgrade}};
+ {error, Reason} ->
+ {error, Reason, State}
+ end.
+
+set_legacy_indexes(State, WriteLegacy) ->
+ State#state{legacy_indexes=WriteLegacy}.
+
+-spec fixed_index_status(state()) -> boolean().
+fixed_index_status(#state{fixed_indexes=Fixed}) ->
+ Fixed.
%% @doc Delete an object from the eleveldb backend
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, PrimaryKey, IndexSpecs, #state{ref=Ref,
- write_opts=WriteOpts}=State) ->
+ write_opts=WriteOpts,
+ fixed_indexes=FixedIndexes}=State) ->
%% Create the KV delete...
StorageKey = to_object_key(Bucket, PrimaryKey),
@@ -173,9 +298,9 @@ delete(Bucket, PrimaryKey, IndexSpecs, #state{ref=Ref,
%% Convert IndexSpecs to index deletes...
F = fun({remove, Field, Value}) ->
- {delete, to_index_key(Bucket, PrimaryKey, Field, Value)}
+ index_deletes(FixedIndexes, Bucket, PrimaryKey, Field, Value)
end,
- Updates2 = [F(X) || X <- IndexSpecs],
+ Updates2 = lists:flatmap(F, IndexSpecs),
case eleveldb:write(Ref, Updates1 ++ Updates2, WriteOpts) of
ok ->
@@ -218,6 +343,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{fold_opts=FoldOpts,
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{fold_opts=FoldOpts,
+ fixed_indexes=FixedIdx,
+ legacy_indexes=WriteLegacyIdx,
ref=Ref}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
@@ -235,15 +362,23 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{fold_opts=FoldOpts,
FirstKey = to_first_key(Limiter),
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
FoldOpts1 = [{first_key, FirstKey} | FoldOpts],
+ ExtraFold = not FixedIdx orelse WriteLegacyIdx,
KeyFolder =
fun() ->
- %% Do the fold. ELevelDB uses throw/1 to break out of a fold...
- try
- eleveldb:fold_keys(Ref, FoldFun, Acc, FoldOpts1)
- catch
- {break, AccFinal} ->
- AccFinal
- end
+ %% Do the fold. ELevelDB uses throw/1 to break out of a fold...
+ AccFinal =
+ try
+ eleveldb:fold_keys(Ref, FoldFun, Acc, FoldOpts1)
+ catch
+ {break, BrkResult} ->
+ BrkResult
+ end,
+ case ExtraFold of
+ true ->
+ legacy_key_fold(Ref, FoldFun, AccFinal, FoldOpts1, Limiter);
+ false ->
+ AccFinal
+ end
end,
case lists:member(async_fold, Opts) of
true ->
@@ -252,6 +387,24 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{fold_opts=FoldOpts,
{ok, KeyFolder()}
end.
+legacy_key_fold(Ref, FoldFun, Acc, FoldOpts0, Query={index, _, _}) ->
+ {_, FirstKey} = lists:keyfind(first_key, 1, FoldOpts0),
+ LegacyKey = to_legacy_first_key(Query),
+ case LegacyKey =/= FirstKey of
+ true ->
+ try
+ FoldOpts = lists:keyreplace(first_key, 1, FoldOpts0, {first_key, LegacyKey}),
+ eleveldb:fold_keys(Ref, FoldFun, Acc, FoldOpts)
+ catch
+ {break, AccFinal} ->
+ AccFinal
+ end;
+ false ->
+ Acc
+ end;
+legacy_key_fold(_Ref, _FoldFun, Acc, _FoldOpts, _Query) ->
+ Acc.
+
%% @doc Fold over all the objects for one or all buckets.
-spec fold_objects(riak_kv_backend:fold_objects_fun(),
any(),
@@ -298,10 +451,10 @@ is_empty(#state{ref=Ref}) ->
%% @doc Get the status information for this eleveldb backend
-spec status(state()) -> [{atom(), term()}].
-status(State) ->
+status(State=#state{fixed_indexes=FixedIndexes}) ->
{ok, Stats} = eleveldb:status(State#state.ref, <<"leveldb.stats">>),
{ok, ReadBlockError} = eleveldb:status(State#state.ref, <<"leveldb.ReadBlockError">>),
- [{stats, Stats}, {read_block_error, ReadBlockError}].
+ [{stats, Stats}, {read_block_error, ReadBlockError}, {fixed_indexes, FixedIndexes}].
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@@ -476,6 +629,36 @@ fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, FilterField, StartTerm,
throw({break, Acc})
end
end;
+fold_keys_fun(FoldKeysFun, {index, incorrect_format, ForUpgrade}) when is_boolean(ForUpgrade) ->
+ %% Over incorrectly formatted 2i index values
+ fun(StorageKey, Acc) ->
+ Action =
+ case from_index_key(StorageKey) of
+ {Bucket, Key, Field, Term} ->
+ NewKey = case ForUpgrade of
+ true ->
+ to_index_key(Bucket, Key, Field, Term);
+ false ->
+ to_legacy_index_key(Bucket, Key, Field, Term)
+ end,
+ case NewKey =:= StorageKey of
+ true ->
+ ignore;
+ false ->
+ {fold, Bucket, StorageKey}
+ end;
+ _ ->
+ stop
+ end,
+ case Action of
+ {fold, B, K} ->
+ FoldKeysFun(B, K, Acc);
+ ignore ->
+ Acc;
+ stop ->
+ throw({break, Acc})
+ end
+ end;
fold_keys_fun(_FoldKeysFun, Other) ->
throw({unknown_limiter, Other}).
@@ -498,10 +681,9 @@ fold_objects_fun(FoldObjectsFun, FilterBucket) ->
%% Augment the fold options list if a
%% bucket is defined.
fold_opts(undefined, FoldOpts) ->
- FoldOpts;
+ [{first_key, to_first_key(undefined)} | FoldOpts];
fold_opts(Bucket, FoldOpts) ->
- BKey = sext:encode({Bucket, <<>>}),
- [{first_key, BKey} | FoldOpts].
+ [{first_key, to_first_key({bucket, Bucket})} | FoldOpts].
%% @private Given a scope limiter, use sext to encode an expression
@@ -514,6 +696,9 @@ to_first_key(undefined) ->
to_first_key({bucket, Bucket}) ->
%% Start at the first object for a given bucket...
to_object_key(Bucket, <<>>);
+to_first_key({index, incorrect_format, ForUpgrade}) when is_boolean(ForUpgrade) ->
+ %% Start at first index entry
+ to_index_key(<<>>, <<>>, <<>>, <<>>);
to_first_key({index, Bucket, {eq, <<"$bucket">>, _Term}}) ->
%% 2I exact match query on special $bucket field...
to_first_key({bucket, Bucket});
@@ -529,6 +714,13 @@ to_first_key({index, Bucket, {range, Field, StartTerm, _EndTerm}}) ->
to_first_key(Other) ->
erlang:throw({unknown_limiter, Other}).
+% @doc If index query, encode key using legacy sext format.
+to_legacy_first_key({index, Bucket, {eq, Field, Term}}) ->
+ to_legacy_first_key({index, Bucket, {range, Field, Term, Term}});
+to_legacy_first_key({index, Bucket, {range, Field, StartTerm, _EndTerm}}) ->
+ to_legacy_index_key(Bucket, <<>>, Field, StartTerm);
+to_legacy_first_key(Other) ->
+ to_first_key(Other).
to_object_key(Bucket, Key) ->
sext:encode({o, Bucket, Key}).
@@ -544,6 +736,9 @@ from_object_key(LKey) ->
to_index_key(Bucket, Key, Field, Term) ->
sext:encode({i, Bucket, Field, Term, Key}).
+to_legacy_index_key(Bucket, Key, Field, Term) -> %% encode with legacy bignum encoding
+ sext:encode({i, Bucket, Field, Term, Key}, true).
+
from_index_key(LKey) ->
case sext:decode(LKey) of
{i, Bucket, Field, Term, Key} ->
@@ -552,6 +747,11 @@ from_index_key(LKey) ->
undefined
end.
+%% @doc Encode a key to store partition meta-data attributes.
+to_md_key(Key) ->
+ sext:encode({md, Key}).
+
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
View
226 src/riak_kv_env.erl
@@ -0,0 +1,226 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_env: environmental utilities.
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% @doc utility functions for interacting with the environment.
+
+-module(riak_kv_env).
+
+-export([doc_env/0]).
+
+-define(LINUX_PARAMS, [
+ {"vm.swappiness", 0, gte},
+ {"net.core.wmem_default", 8388608, lte},
+ {"net.core.rmem_default", 8388608, lte},
+ {"net.core.wmem_max", 8388608, lte},
+ {"net.core.rmem_max", 8388608, lte},
+ {"net.core.netdev_max_backlog", 10000, lte},
+ {"net.core.somaxconn", 4000, lte},
+ {"net.ipv4.tcp_max_syn_backlog", 40000, lte},
+ {"net.ipv4.tcp_fin_timeout", 15, gte},
+ {"net.ipv4.tcp_tw_reuse", 1, eq}
+ ]).
+
+
+doc_env() ->
+ lager:info("Environment and OS variables:"),
+ Ulimits = check_ulimits(),
+ ErlLimits = check_erlang_limits(),
+ OSLimits = case os:type() of
+ {unix, linux} ->
+ check_sysctls(?LINUX_PARAMS);
+ {unix, freebsd} ->
+ [];
+ {unix, sunos} ->
+ [];
+ _ ->
+ [{warn, "Unknown OS type, no platform specific info", []}]
+ end,
+ lists:map(fun({F, Fmt, Args}) ->
+ lager:debug("Term: ~p", [{F, Fmt, Args}]),
+ %% fake out lager a bit here
+ F1 = case F of
+ info -> info_msg;
+ warn -> warning_msg;
+ error -> error_msg
+ end,
+ error_logger:F1("riak_kv_env: "++Fmt, Args)
+ end, Ulimits ++ ErlLimits ++ OSLimits).
+
+%% we don't really care about anything other than cores and open files
+%% @private
+check_ulimits() ->
+ %% file ulimit
+ FileLimit0 = string:strip(os:cmd("ulimit -n"), right, $\n),
+ FLMsg = case FileLimit0 of
+ "unlimited" ->
+ %% check the OS limit;
+ OSLimit = case os:type() of
+ {unix, linux} ->
+ string:strip(os:cmd("sysctl -n fs.file-max"),
+ right, $\n);
+ _ -> unknown
+ end,
+ case OSLimit of
+ unknown ->
+ {warn, "Open file limit unlimited but actual limit "
+ ++ "could not be ascertained", []};
+ _ ->
+ test_file_limit(OSLimit)
+ end;
+ _ ->
+ test_file_limit(FileLimit0)
+ end,
+ CoreLimit0 = string:strip(os:cmd("ulimit -c"), right, $\n),
+ CLMsg = case CoreLimit0 of
+ "unlimited" ->
+ {info, "No core size limit", []};
+ _ ->
+ CoreLimit = list_to_integer(CoreLimit0),
+ case CoreLimit == 0 of
+ true ->
+ {warn, "Cores are disabled, this may "
+ ++ "hinder debugging", []};
+ false ->
+ {info, "Core size limit: ~p", [CoreLimit]}
+ end
+ end,
+ [FLMsg, CLMsg].
+
+%% @private
+test_file_limit(FileLimit0) ->
+ FileLimit = (catch list_to_integer(FileLimit0)),
+ case FileLimit of
+ {'EXIT', {badarg,_}} ->
+ {warn, "Open file limit was read as non-integer string: ~s",
+ [FileLimit0]};
+
+ _ ->
+ case FileLimit < 4096 of
+ true ->
+ {warn, "Open file limit of ~p is low, at least "
+ ++ "4096 is recommended", [FileLimit]};
+ false ->
+ {info, "Open file limit: ~p", [FileLimit]}
+ end
+ end.
+
+%% @private
+check_erlang_limits() ->
+ %% processes
+ PLMsg = case erlang:system_info(process_limit) of
+ PL1 when PL1 < 4096 ->
+ {warn, "Erlang process limit of ~p is low, at least "
+ "4096 is recommended", [PL1]};
+ PL2 ->
+ {info,"Erlang process limit: ~p", [PL2]}
+ end,
+ %% ports
+ PortLimit = case os:getenv("ERL_MAX_PORTS") of
+ false -> 1024;
+ PL -> list_to_integer(PL)
+ end,
+ PortMsg = case PortLimit < 4096 of
+ true ->
+ %% needs to be revisited for R16+
+ {warn, "Erlang ports limit of ~p is low, at least "
+ "4096 is recommended", [PortLimit]};
+ false ->
+ {info, "Erlang ports limit: ~p", [PortLimit]}
+ end,
+
+ %% ets tables
+ ETSLimit = case os:getenv("ERL_MAX_ETS_TABLES") of
+ false -> 1400;
+ Limit -> list_to_integer(Limit)
+ end,
+ ETSMsg = case ETSLimit < 8192 of
+ true ->
+ {warn,"ETS table count limit of ~p is low, at least "
+ "8192 is recommended.", [ETSLimit]};
+ false ->
+ {info, "ETS table count limit: ~p",
+ [ETSLimit]}
+ end,
+
+ %% fullsweep_after
+ {fullsweep_after, GCGens} = erlang:system_info(fullsweep_after),
+ GCMsg = {info, "Generations before full sweep: ~p", [GCGens]},
+
+ %% async_threads
+ TPSMsg = case erlang:system_info(thread_pool_size) of
+ TPS1 when TPS1 < 64 ->
+ {warn,"Thread pool size of ~p is low, at least 64 "
+ "suggested", [TPS1]};
+ TPS2 ->
+ {info, "Thread pool size: ~p", [TPS2]}
+ end,
+ %% schedulers
+ Schedulers = erlang:system_info(schedulers),
+ Cores = erlang:system_info(logical_processors_available),
+ SMsg = case Schedulers /= Cores of
+ true ->
+ {warn, "Running ~p schedulers for ~p cores, "
+ "these should match", [Schedulers, Cores]};
+ false ->
+ {info, "Schedulers: ~p for ~p cores",
+ [Schedulers, Cores]}
+ end,
+ [PLMsg, PortMsg, ETSMsg, TPSMsg, GCMsg, SMsg].
+
+%% @private
+check_sysctls(Checklist) ->
+ Fn = fun({Param, Val, Direction}) ->
+ Output = string:strip(os:cmd("sysctl -n "++Param), right, $\n),
+ Actual = list_to_integer(Output -- "\n"),
+ Good = case Direction of
+ gte -> Actual =< Val;
+ lte -> Actual >= Val;
+ eq -> Actual == Val
+ end,
+ case Good of
+ true ->
+ {info , "sysctl ~s is ~p ~s ~p)",
+ [Param, Actual,
+ direction_to_word(Direction),
+ Val]};
+ false ->
+ {warn, "sysctl ~s is ~p, should be ~s~p)",
+ [Param, Actual,
+ direction_to_word2(Direction),
+ Val]}
+ end
+ end,
+ lists:map(Fn, Checklist).
+
+%% @private
+direction_to_word(Direction) ->
+ case Direction of
+ gte -> "greater than or equal to";
+ lte -> "lesser than or equal to";
+ eq -> "equal to"
+ end.
+
+%% @private
+direction_to_word2(Direction) ->
+ case Direction of
+ gte -> "no more than ";
+ lte -> "at least ";
+ eq -> ""
+ end.
View
321 src/riak_kv_get_core.erl
@@ -20,10 +20,14 @@
%%
%% -------------------------------------------------------------------
-module(riak_kv_get_core).
--export([init/6, add_result/3, result_shortcode/1, enough/1, response/1,
+-export([init/8, add_result/3, result_shortcode/1, enough/1, response/1,
has_all_results/1, final_action/1, info/1]).
-export_type([getcore/0, result/0, reply/0, final_action/0]).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-type result() :: {ok, riak_object:riak_object()} |
{error, notfound} | % for dialyzer
{error, any()}.
@@ -35,9 +39,11 @@
{read_repair, [{non_neg_integer() | repair_reason()}], riak_object:riak_object()} |
delete.
-type idxresult() :: {non_neg_integer(), result()}.
+-type idx_type() :: [{non_neg_integer, 'primary' | 'fallback'}].
-record(getcore, {n :: pos_integer(),
r :: pos_integer(),
+ pr :: pos_integer(),
fail_threshold :: pos_integer(),
notfound_ok :: boolean(),
allow_mult :: boolean(),
@@ -46,8 +52,11 @@
merged :: {notfound | tombstone | ok,
riak_object:riak_object() | undefined},
num_ok = 0 :: non_neg_integer(),
+ num_pok = 0 :: non_neg_integer(),
num_notfound = 0 :: non_neg_integer(),
- num_fail = 0 :: non_neg_integer()}).
+ num_deleted = 0 :: non_neg_integer(),
+ num_fail = 0 :: non_neg_integer(),
+ idx_type :: idx_type()}).
-opaque getcore() :: #getcore{}.
%% ====================================================================
@@ -55,37 +64,50 @@
%% ====================================================================
%% Initialize a get and return an opaque get core context
--spec init(pos_integer(), pos_integer(), pos_integer(), boolean(), boolean(),
- boolean()) -> getcore().
-init(N, R, FailThreshold, NotFoundOk, AllowMult, DeletedVClock) ->
+-spec init(N::pos_integer(), R::pos_integer(), PR::pos_integer(),
+ FailThreshold::pos_integer(), NotFoundOK::boolean(),
+ AllowMult::boolean(), DeletedVClock::boolean(),
+ IdxType::idx_type()) -> getcore().
+init(N, R, PR, FailThreshold, NotFoundOk, AllowMult, DeletedVClock, IdxType) ->
#getcore{n = N,
r = R,
+ pr = PR,
fail_threshold = FailThreshold,
notfound_ok = NotFoundOk,
allow_mult = AllowMult,
- deletedvclock = DeletedVClock}.
+ deletedvclock = DeletedVClock,
+ idx_type = IdxType}.
%% Add a result for a vnode index
-spec add_result(non_neg_integer(), result(), getcore()) -> getcore().
-add_result(Idx, Result, GetCore = #getcore{results = Results}) ->
- UpdResults = [{Idx, Result} | Results],
- case Result of
- {ok, _RObj} ->
- GetCore#getcore{results = UpdResults, merged = undefined,
- num_ok = GetCore#getcore.num_ok + 1};
- {error, notfound} ->
- case GetCore#getcore.notfound_ok of
- true ->
- GetCore#getcore{results = UpdResults, merged = undefined,
- num_ok = GetCore#getcore.num_ok + 1};
- _ ->
- GetCore#getcore{results = UpdResults, merged = undefined,
- num_notfound = GetCore#getcore.num_notfound + 1}
- end;
- {error, _Reason} ->
- GetCore#getcore{results = UpdResults, merged = undefined,
- num_fail = GetCore#getcore.num_fail + 1}
- end.
+add_result(Idx, {ok, RObj} = Result, GetCore) ->
+ Dels = case riak_kv_util:is_x_deleted(RObj) of
+ true -> 1;
+ false -> 0
+ end,
+ num_pr(GetCore#getcore{
+ results = [{Idx, Result}|GetCore#getcore.results],
+ merged = undefined,
+ num_ok = GetCore#getcore.num_ok + 1,
+ num_deleted = GetCore#getcore.num_deleted + Dels}, Idx);
+add_result(Idx, {error, notfound} = Result, GetCore) ->
+ case GetCore#getcore.notfound_ok of
+ true ->
+ num_pr(GetCore#getcore{
+ results = [{Idx, Result}|GetCore#getcore.results],
+ merged = undefined,
+ num_ok = GetCore#getcore.num_ok + 1}, Idx);
+ _ ->
+ GetCore#getcore{
+ results = [{Idx, Result}|GetCore#getcore.results],
+ merged = undefined,
+ num_notfound = GetCore#getcore.num_notfound + 1}
+ end;
+add_result(Idx, {error, _Reason} = Result, GetCore) ->
+ GetCore#getcore{
+ results = [{Idx, Result}|GetCore#getcore.results],
+ merged = undefined,
+ num_fail = GetCore#getcore.num_fail + 1}.
result_shortcode({ok, _RObj}) -> 1;
result_shortcode({error, notfound}) -> 0;
@@ -93,44 +115,54 @@ result_shortcode(_) -> -1.
%% Check if enough results have been added to respond
-spec enough(getcore()) -> boolean().
-enough(#getcore{r = R, num_ok = NumOk,
- num_notfound = NumNotFound,
- num_fail = NumFail,
- fail_threshold = FailThreshold}) ->
- if
- NumOk >= R ->
- true;
- NumNotFound + NumFail >= FailThreshold ->
- true;
- true ->
- false
- end.
+%% Met quorum
+enough(#getcore{r = R, num_ok = NumOK, pr= PR, num_pok = NumPOK}) when
+ NumOK >= R andalso NumPOK >= PR ->
+ true;
+%% too many failures
+enough(#getcore{fail_threshold = FailThreshold, num_notfound = NumNotFound,
+ num_fail = NumFail}) when NumNotFound + NumFail >= FailThreshold ->
+ true;
+%% Got all N responses, but unable to satisfy PR
+enough(#getcore{n = N, num_ok = NumOK, num_notfound = NumNotFound,
+ num_fail = NumFail}) when NumOK + NumNotFound + NumFail >= N ->
+ true;
+enough(_) ->
+ false.
%% Get success/fail response once enough results received
-spec response(getcore()) -> {reply(), getcore()}.
-response(GetCore = #getcore{r = R, num_ok = NumOk, num_notfound = NumNotFound,
- results = Results, allow_mult = AllowMult,
- deletedvclock = DeletedVClock}) ->
+%% Met quorum
+response(#getcore{r = R, num_ok = NumOK, pr= PR, num_pok = NumPOK} = GetCore)
+ when NumOK >= R andalso NumPOK >= PR ->
+ #getcore{results = Results, allow_mult=AllowMult,
+ deletedvclock = DeletedVClock} = GetCore,
{ObjState, MObj} = Merged = merge(Results, AllowMult),
- Reply = case NumOk >= R of
- true ->
- case ObjState of
- ok ->
- Merged; % {ok, MObj}
- tombstone when DeletedVClock ->
- {error, {deleted, riak_object:vclock(MObj)}};
- _ -> % tombstone or notfound
- {error, notfound}
- end;
- false ->
- DelObjs = length([xx || {_Idx, {ok, RObj}} <- Results,
- riak_kv_util:is_x_deleted(RObj)]),
- Fails = [F || F = {_Idx, {error, Reason}} <- Results,
- Reason /= notfound],
- fail_reply(R, NumOk, NumOk - DelObjs,
- NumNotFound + DelObjs, Fails)
- end,
- {Reply, GetCore#getcore{merged = Merged}}.
+ Reply = case ObjState of
+ ok ->
+ Merged; % {ok, MObj}
+ tombstone when DeletedVClock ->
+ {error, {deleted, riak_object:vclock(MObj)}};
+ _ -> % tombstone or notfound
+ {error, notfound}
+ end,
+ {Reply, GetCore#getcore{merged = Merged}};
+%% everything was either a tombstone or a notfound
+response(#getcore{num_notfound = NumNotFound, num_ok = NumOK,
+ num_deleted = NumDel, num_fail = NumFail} = GetCore)
+ when NumNotFound + NumDel > 0, NumOK - NumDel == 0, NumFail == 0 ->
+ {{error, notfound}, GetCore};
+%% We've satisfied R, but not PR
+response(#getcore{r = R, pr = PR, num_ok = NumR, num_pok = NumPR} = GetCore)
+ when PR > 0, NumPR < PR, NumR >= R ->
+ {{error, {pr_val_unsatisfied, PR, NumPR}}, GetCore};
+%% PR and/or R are unsatisfied, but PR is more restrictive
+response(#getcore{r = R, num_pok = NumPR, pr = PR} = GetCore) when PR >= R ->
+ {{error, {pr_val_unsatisfied, PR, NumPR}}, GetCore};
+%% PR and/or R are unsatisfied, but R is more restrictive
+response(#getcore{r = R, num_ok = NumR} = GetCore) ->
+ {{error, {r_val_unsatisfied, R, NumR}}, GetCore}.
+
%% Check if all expected results have been added
-spec has_all_results(getcore()) -> boolean().
@@ -222,7 +254,170 @@ merge(Replies, AllowMult) ->
end
end.
-fail_reply(_R, _NumR, 0, NumNotFound, []) when NumNotFound > 0 ->
- {error, notfound};
-fail_reply(R, NumR, _NumNotDeleted, _NumNotFound, _Fails) ->
- {error, {r_val_unsatisfied, R, NumR}}.
+%% @private If the Idx is not in the IdxType
+%% the world should end
+is_primary_response(Idx, IdxType) ->
+ {Idx, Status} = lists:keyfind(Idx, 1, IdxType),
+ Status == primary.
+
+%% @private Increment PR, if appropriate
+num_pr(GetCore = #getcore{num_pok=NumPOK, idx_type=IdxType}, Idx) ->
+ case is_primary_response(Idx, IdxType) of
+ true ->
+ GetCore#getcore{num_pok=NumPOK+1};
+ false ->
+ GetCore
+ end.
+
+
+-ifdef(TEST).
+%% simple sanity tests
+enough_test_() ->
+ [
+ {"Checking R",
+ fun() ->
+ %% cannot meet R
+ ?assertEqual(false, enough(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 0, num_pok = 0,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ ?assertEqual(false, enough(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 1, num_pok = 0,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ ?assertEqual(false, enough(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 2, num_pok = 0,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ %% met R
+ ?assertEqual(true, enough(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 3, num_pok = 0,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ %% too many failures
+ ?assertEqual(true, enough(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 2, num_pok = 0,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 1})),
+ ok
+ end},
+ {"Checking PR",
+ fun() ->
+ %% cannot meet PR
+ ?assertEqual(false, enough(#getcore{n= 3, r = 0, pr=3,
+ fail_threshold = 1, num_ok = 1, num_pok = 1,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ ?assertEqual(false, enough(#getcore{n= 3, r = 0, pr=3,
+ fail_threshold = 1, num_ok = 2, num_pok = 2,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ %% met PR
+ ?assertEqual(true, enough(#getcore{n= 3, r = 0, pr=3,
+ fail_threshold = 1, num_ok = 3, num_pok = 3,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ %% met R but not PR
+ ?assertEqual(true, enough(#getcore{n= 3, r = 0, pr=3,
+ fail_threshold = 3, num_ok = 3, num_pok = 2,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0})),
+ ok
+ end}
+ ].
+
+response_test_() ->
+ [
+ {"Requirements met",
+ fun() ->
+ RObj = riak_object:new(<<"foo">>, <<"bar">>, <<"baz">>),
+ ?assertMatch({{ok, RObj}, _},
+ response(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 3, num_pok = 0,
+ allow_mult = false,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0,
+ results= [
+ {1, {ok, RObj}},
+ {2, {ok, RObj}},
+ {3, {ok, RObj}}]})),
+ ok
+ end},
+ {"R unsatisfied",
+ fun() ->
+ RObj = riak_object:new(<<"foo">>, <<"bar">>, <<"baz">>),
+ ?assertMatch({{error, {r_val_unsatisfied, 3, 2}}, _},
+ response(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 2, num_pok = 2,
+ allow_mult = false,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 1,
+ results= [
+ {1, {ok, RObj}},
+ {3, {ok, RObj}}]})),
+ ok
+ end},
+ {"PR unsatisfied",
+ fun() ->
+ RObj = riak_object:new(<<"foo">>, <<"bar">>, <<"baz">>),
+ ?assertMatch({{error, {pr_val_unsatisfied, 3, 2}}, _},
+ response(#getcore{n= 3, r = 0, pr=3,
+ fail_threshold = 1, num_ok = 3, num_pok = 2,
+ allow_mult = false,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 0,
+ results= [
+ {1, {ok, RObj}},
+ {2, {ok, RObj}},
+ {4, {ok, RObj}}]})), %% from a fallback
+ ok
+ end},
+ {"R & PR unsatisfied, PR >= R",
+ fun() ->
+ RObj = riak_object:new(<<"foo">>, <<"bar">>, <<"baz">>),
+ ?assertMatch({{error, {pr_val_unsatisfied, 3, 1}}, _},
+ response(#getcore{n= 3, r = 2, pr=3,
+ fail_threshold = 1, num_ok = 1, num_pok = 1,
+ allow_mult = false,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 2,
+ results= [
+ {1, {ok, RObj}},
+ {2, {error, foo}},
+ {3, {error, foo}}]})),
+ ok
+ end},
+ {"R & PR unsatisfied, R > PR",
+ fun() ->
+ RObj = riak_object:new(<<"foo">>, <<"bar">>, <<"baz">>),
+ ?assertMatch({{error, {r_val_unsatisfied, 3, 1}}, _},
+ response(#getcore{n= 3, r = 3, pr=2,
+ fail_threshold = 1, num_ok = 1, num_pok = 1,
+ allow_mult = false,
+ num_notfound = 0, num_deleted = 0,
+ num_fail = 2,
+ results= [
+ {1, {ok, RObj}},
+ {2, {error, foo}},
+ {3, {error, foo}}]})),
+ ok
+ end},
+
+ {"All results notfound/tombstone",
+ fun() ->
+ RObj = riak_object:new(<<"foo">>, <<"bar">>, <<"baz">>,
+ dict:from_list([{<<"X-Riak-Deleted">>, true}])),
+ ?assertMatch({{error, notfound}, _},
+ response(#getcore{n= 3, r = 3, pr=0,
+ fail_threshold = 1, num_ok = 1, num_pok = 0,
+ allow_mult = false,
+ num_notfound = 2, num_deleted = 1,
+ num_fail = 0,
+ results= [
+ {1, {ok, RObj}},
+ {2, {error, notfound}},
+ {3, {error, notfound}}]})),
+ ok
+ end}
+ ].
+-endif.
View
31 src/riak_kv_get_fsm.erl
@@ -163,6 +163,7 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key}}) ->
StatTracked = proplists:get_value(stat_tracked, BucketProps, false),
UpNodes = riak_core_node_watcher:nodes(riak_kv),
Preflist2 = riak_core_apl:get_apl_ann(DocIdx, N, Ring, UpNodes),
+
new_state_timeout(validate, StateData#state{starttime=riak_core_util:moment(),
n = N,
bucket_props=BucketProps,
@@ -184,24 +185,27 @@ validate(timeout, StateData=#state{from = {raw, ReqId, _Pid}, options = Options,
PR = riak_kv_util:expand_rw_value(pr, PR0, BucketProps, N),
NumVnodes = length(PL2),
NumPrimaries = length([x || {_,primary} <- PL2]),
+ IdxType = [{Part, Type} || {{Part, _Node}, Type} <- PL2],
+
case validate_quorum(R, R0, N, PR, PR0, NumPrimaries, NumVnodes) of
ok ->
BQ0 = get_option(basic_quorum, Options, default),
+ FailR = erlang:max(R, PR), %% fail fast
FailThreshold =
case riak_kv_util:expand_value(basic_quorum, BQ0, BucketProps) of
true ->
erlang:min((N div 2)+1, % basic quorum, or
- (N-R+1)); % cannot ever get R 'ok' replies
+ (N-FailR+1)); % cannot ever get R 'ok' replies
_ElseFalse ->
- N - R + 1 % cannot ever get R 'ok' replies
+ N - FailR + 1 % cannot ever get R 'ok' replies
end,
AllowMult = proplists:get_value(allow_mult,BucketProps),
NFOk0 = get_option(notfound_ok, Options, default),
NotFoundOk = riak_kv_util:expand_value(notfound_ok, NFOk0, BucketProps),
DeletedVClock = get_option(deletedvclock, Options, false),
- GetCore = riak_kv_get_core:init(N, R, FailThreshold,
+ GetCore = riak_kv_get_core:init(N, R, PR, FailThreshold,
NotFoundOk, AllowMult,
- DeletedVClock),
+ DeletedVClock, IdxType),
new_state_timeout(execute, StateData#state{get_core = GetCore,
timeout = Timeout,
req_id = ReqId});
@@ -402,7 +406,7 @@ determine_do_read_repair(SoftCap, HardCap, Actual, Roll) ->
-ifdef(TEST).
roll_d100() ->
- fsm_eqc_util:get_fake_rng(get_fsm_qc).
+ fsm_eqc_util:get_fake_rng(get_fsm_eqc).
-else.
% technically not a d100 as it has a 0
roll_d100() ->
@@ -455,26 +459,13 @@ client_reply(Reply, StateData0 = #state{from = {raw, ReqId, Pid},
update_stats({ok, Obj}, #state{tracked_bucket = StatTracked, calculated_timings={ResponseUSecs, Stages}}) ->
%% Stat the number of siblings and the object size, and timings
NumSiblings = riak_object:value_count(Obj),
+ ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
+ ObjSize = riak_object:approximate_size(ObjFmt, Obj),
Bucket = riak_object:bucket(Obj),
- ObjSize = calculate_objsize(Bucket, Obj),
riak_kv_stat:update({get_fsm, Bucket, ResponseUSecs, Stages, NumSiblings, ObjSize, StatTracked});
update_stats(_, #state{ bkey = {Bucket, _}, tracked_bucket = StatTracked, calculated_timings={ResponseUSecs, Stages}}) ->
riak_kv_stat:update({get_fsm, Bucket, ResponseUSecs, Stages, undefined, undefined, StatTracked}).
-%% Get an approximation of object size by adding together the bucket, key,
-%% vectorclock, and all of the siblings. This is more complex than
-%% calling term_to_binary/1, but it should be easier on memory,
-%% especially for objects with large values.
-calculate_objsize(Bucket, Obj) ->
- Contents = riak_object:get_contents(Obj),
- size(Bucket) +
- size(riak_object:key(Obj)) +
- size(term_to_binary(riak_object:vclock(Obj))) +
- lists:sum([size(term_to_binary(MD)) + value_size(Value) || {MD, Value} <- Contents]).
-
-value_size(Value) when is_binary(Value) -> size(Value);
-value_size(Value) -> size(term_to_binary(Value)).
-
client_info(true, StateData, Acc) ->
client_info(details(), StateData, Acc);
client_info([], _StateData, Acc) ->
View
9 src/riak_kv_index_fsm_sup.erl
@@ -32,7 +32,14 @@
-export([init/1]).
start_index_fsm(Node, Args) ->
- supervisor:start_child({?MODULE, Node}, Args).
+ case supervisor:start_child({?MODULE, Node}, Args) of
+ {ok, Pid} ->
+ riak_kv_stat:update({index_create, Pid}),
+ {ok, Pid};
+ Error ->
+ riak_kv_stat:update(index_create_error),
+ Error
+ end.
%% @spec start_link() -> ServerRet
%% @doc API for starting the supervisor.
View
21 src/riak_kv_index_hashtree.erl
@@ -107,10 +107,11 @@ insert(_Id, _Key, _Hash, undefined, _Options) ->
insert(Id, Key, Hash, Tree, Options) ->
catch gen_server:call(Tree, {insert, Id, Key, Hash, Options}, infinity).
-%% @doc Add a term_to_binary encoded riak_object associated with a given
+%% @doc Add an encoded (binary) riak_object associated with a given
%% bucket/key to the appropriate hashtree managed by the provided
%% index_hashtree pid. The hash value is generated using
-%% {@link hash_object/1}.
+%% {@link hash_object/2}. Any encoding version is supported. The encoding
+%% will be changed to the appropriate version before hashing the object.
-spec insert_object({binary(), binary()}, riak_object_t2b(), pid()) -> ok.
insert_object(_BKey, _RObj, undefined) ->
ok;
@@ -240,7 +241,7 @@ handle_call({insert, Id, Key, Hash, Options}, _From, State) ->
handle_call({insert_object, BKey, RObj}, _From, State) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
IndexN = riak_kv_util:get_index_n(BKey, Ring),
- State2 = do_insert(IndexN, term_to_binary(BKey), hash_object(RObj), [], State),
+ State2 = do_insert(IndexN, term_to_binary(BKey), hash_object(BKey, RObj), [], State),
{reply, ok, State2};
handle_call({delete, BKey}, _From, State) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
@@ -300,7 +301,7 @@ handle_cast(stop, State) ->
handle_cast({insert_object, BKey, RObj}, State) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
IndexN = riak_kv_util:get_index_n(BKey, Ring),
- State2 = do_insert(IndexN, term_to_binary(BKey), hash_object(RObj), [], State),
+ State2 = do_insert(IndexN, term_to_binary(BKey), hash_object(BKey, RObj), [], State),
{noreply, State2};
handle_cast(build_failed, State) ->
@@ -381,13 +382,11 @@ load_built(#state{trees=Trees}) ->
end.
%% Generate a hash value for a binary-encoded `riak_object'
--spec hash_object(riak_object_t2b()) -> binary().
-hash_object(RObjBin) ->
+-spec hash_object({riak_object:bucket(), riak_object:key()}, riak_object_t2b()) -> binary().
+hash_object({Bucket, Key}, RObjBin) ->
%% Normalize the `riak_object' vector clock before hashing
- RObj = binary_to_term(RObjBin),
- Vclock = riak_object:vclock(RObj),
- UpdObj = riak_object:set_vclock(RObj, lists:sort(Vclock)),
- Hash = erlang:phash2(term_to_binary(UpdObj)),
+ RObj = riak_object:from_binary(Bucket, Key, RObjBin),
+ Hash = riak_object:hash(RObj),
term_to_binary(Hash).
%% Fold over a given vnode's data, inserting each object into the appropriate
@@ -402,7 +401,7 @@ fold_keys(Partition, Tree) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Req = ?FOLD_REQ{foldfun=fun(BKey={Bucket,Key}, RObj, _) ->
IndexN = riak_kv_util:get_index_n({Bucket, Key}, Ring),
- insert(IndexN, term_to_binary(BKey), hash_object(RObj),
+ insert(IndexN, term_to_binary(BKey), hash_object(BKey, RObj),
Tree, [if_missing]),
ok
end,
View
9 src/riak_kv_keys_fsm_sup.erl
@@ -31,7 +31,14 @@
-export([init/1]).
start_keys_fsm(Node, Args) ->
- supervisor:start_child({?MODULE, Node}, Args).
+ case supervisor:start_child({?MODULE, Node}, Args) of
+ {ok, Pid} ->
+ riak_kv_stat:update({list_create, Pid}),
+ {ok, Pid};
+ Error ->
+ riak_kv_stat:update(list_create_error),
+ Error
+ end.
%% @spec start_link() -> ServerRet
%% @doc API for starting the supervisor.
View
5 src/riak_kv_mrc_pipe.erl
@@ -283,7 +283,7 @@ mapred_stream_sink(Inputs, Query, Timeout) ->
sender={Sender,SenderMon},
timer={Timer,PipeRef},
keeps=NumKeeps}}
- catch throw:{badard, Fitting, Reason} ->
+ catch throw:{badarg, Fitting, Reason} ->
riak_kv_mrc_sink:stop(Sink),
{error, {Fitting, Reason}}
end.
@@ -959,6 +959,9 @@ example_setup() ->
%% @doc Store some example data for the other example functions.
%%
+%% WARNING: This function is used by riak_test mapred_*
+%% tests. Changing what it creates will break those tests.
+%%
%% Objects stored:
%% <dl>
%% <dt>`foo/bar'</dt>
View
164 src/riak_kv_multi_backend.erl
@@ -38,14 +38,18 @@
fold_objects/4,
is_empty/1,
status/1,
- callback/3]).
+ callback/3,
+ fix_index/3,
+ set_legacy_indexes/2,
+ mark_indexes_fixed/2,
+ fixed_index_status/1]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold]).
+-define(CAPABILITIES, [async_fold, index_reformat]).
-record (state, {backends :: [{atom(), atom(), term()}],
default_backend :: atom()}).
@@ -98,12 +102,12 @@ capabilities(State) ->
%% Expose ?CAPABILITIES plus the intersection of all child
%% backends. (This backend creates a shim for any backends that
%% don't support async_fold.)
- F = fun({_, Mod, ModState}, Acc) ->
+ F = fun(Mod, ModState) ->
{ok, S1} = Mod:capabilities(ModState),
- S2 = ordsets:from_list(S1),
- ordsets:intersection(Acc, S2)
+ ordsets:from_list(S1)
end,
- Caps1 = lists:foldl(F, ordsets:new(), State#state.backends),
+ AllCaps = [F(Mod, ModState) || {_, Mod, ModState} <- State#state.backends],
+ Caps1 = ordsets:intersection(AllCaps),
Caps2 = ordsets:to_list(Caps1),
Capabilities = lists:usort(?CAPABILITIES ++ Caps2),
@@ -329,6 +333,90 @@ callback(Ref, Msg, #state{backends=Backends}=State) ->
[Mod:callback(Ref, Msg, ModState) || {_N, Mod, ModState} <- Backends],
{ok, State}.
+set_legacy_indexes(State=#state{backends=Backends}, WriteLegacy) ->
+ NewBackends = [{I, Mod, maybe_set_legacy_indexes(Mod, ModState, WriteLegacy)} ||
+ {I, Mod, ModState} <- Backends],
+ State#state{backends=NewBackends}.
+
+maybe_set_legacy_indexes(Mod, ModState, WriteLegacy) ->
+ case backend_can_index_reformat(Mod, ModState) of
+ true -> Mod:set_legacy_indexes(ModState, WriteLegacy);
+ false -> ModState
+ end.
+
+mark_indexes_fixed(State=#state{backends=Backends}, ForUpgrade) ->
+ NewBackends = mark_indexes_fixed(Backends, [], ForUpgrade),
+ {ok, State#state{backends=NewBackends}}.
+
+mark_indexes_fixed([], NewBackends, _) ->
+ lists:reverse(NewBackends);
+mark_indexes_fixed([{I, Mod, ModState} | Backends], NewBackends, ForUpgrade) ->
+ Res = maybe_mark_indexes_fixed(Mod, ModState, ForUpgrade),
+ case Res of
+ {error, Reason} ->
+ {error, Reason};
+ {ok, NewModState} ->
+ mark_indexes_fixed(Backends, [{I, Mod, NewModState} | NewBackends], ForUpgrade)
+ end.
+
+maybe_mark_indexes_fixed(Mod, ModState, ForUpgrade) ->
+ case backend_can_index_reformat(Mod, ModState) of
+ true -> Mod:mark_indexes_fixed(ModState, ForUpgrade);
+ false -> {ok, ModState}
+ end.
+
+fix_index(BKeys, ForUpgrade, State) ->
+ % Group keys per bucket
+ PerBucket = lists:foldl(fun(BK={B,_},D) -> dict:append(B,BK,D) end, dict:new(), BKeys),
+ Result =
+ dict:fold(
+ fun(Bucket, StorageKey, Acc = {Success, Ignore, Errors}) ->
+ {_, Mod, ModState} = Backend = get_backend(Bucket, State),
+ case backend_can_index_reformat(Mod, ModState) of
+ true ->
+ {S, I, E} = backend_fix_index(Backend, Bucket,
+ StorageKey, ForUpgrade),
+ {Success + S, Ignore + I, Errors + E};
+ false ->
+ Acc
+ end
+ end, {0, 0, 0}, PerBucket),
+ {reply, Result, State}.
+
+backend_fix_index({_, Mod, ModState}, Bucket, StorageKey, ForUpgrade) ->
+ case Mod:fix_index(StorageKey, ForUpgrade, ModState) of
+ {reply, Reply, _UpModState} ->
+ Reply;
+ {error, Reason} ->
+ lager:error("Failed to fix index for bucket ~p, key ~p, backend ~p: ~p",
+ [Bucket, StorageKey, Mod, Reason]),
+ {0, 0, length(StorageKey)}
+ end.
+
+-spec fixed_index_status(state()) -> boolean().
+fixed_index_status(#state{backends=Backends}) ->
+ lists:foldl(fun({_N, Mod, ModState}, Acc) ->
+ Status = Mod:status(ModState),
+ case fixed_index_status(Mod, ModState, Status) of
+ undefined -> Acc;
+ Res ->
+ case Acc of
+ undefined -> Res;
+ _ -> Res andalso Acc
+ end
+ end
+ end,
+ undefined,
+ Backends).
+
+fixed_index_status(Mod, ModState, Status) ->
+ case backend_can_index_reformat(Mod, ModState) of
+ true -> proplists:get_value(fixed_indexes, Status);
+ false -> undefined
+ end.
+
+
+
%% ===================================================================
%% Internal functions
%% ===================================================================
@@ -406,31 +494,47 @@ backend_fold_fun(ModFun, FoldFun, Opts, AsyncFold) ->
%% Get the backend capabilities to determine
%% if it supports asynchronous folding.
{ok, ModCaps} = Module:capabilities(SubState),
- case AsyncFold andalso
- lists:member(async_fold, ModCaps) of
- true ->
- AsyncWork =
- fun(Acc1) ->
- Module:ModFun(FoldFun,
- Acc1,
- Opts,
- SubState)
- end,
- {Acc, [AsyncWork | WorkList]};
- false ->
- Result = Module:ModFun(FoldFun,
- Acc,
- Opts,
- SubState),
- case Result of
- {ok, Acc1} ->
- {Acc1, WorkList};
- {error, Reason} ->
- throw({error, {Module, Reason}})
- end
+ DoAsync = AsyncFold andalso lists:member(async_fold, ModCaps),
+ Indexes = lists:keyfind(index, 1, Opts),
+ case Indexes of
+ {index, incorrect_format, _ForUpgrade} ->
+ case lists:member(index_reformat, ModCaps) of
+ true -> backend_fold_fun(Module, ModFun, SubState, FoldFun,
+ Opts, {Acc, WorkList}, DoAsync);
+ false -> {Acc, WorkList}
+ end;
+ _ ->
+ backend_fold_fun(Module,
+ ModFun,
+ SubState,
+ FoldFun,
+ Opts,
+ {Acc, WorkList},
+ DoAsync)
end
end.
+backend_fold_fun(Module, ModFun, SubState, FoldFun, Opts, {Acc, WorkList}, true) ->
+ AsyncWork =
+ fun(Acc1) ->
+ Module:ModFun(FoldFun,
+ Acc1,
+ Opts,
+ SubState)
+ end,
+ {Acc, [AsyncWork | WorkList]};
+backend_fold_fun(Module, ModFun, SubState, FoldFun, Opts, {Acc, WorkList}, false) ->
+ Result = Module:ModFun(FoldFun,
+ Acc,
+ Opts,
+ SubState),
+ case Result of
+ {ok, Acc1} ->
+ {Acc1, WorkList};
+ {error, Reason} ->
+ throw({error, {Module, Reason}})
+ end.
+
async_fold_fun() ->
fun(AsyncWork, Acc) ->
case AsyncWork(Acc) of
@@ -452,6 +556,10 @@ error_filter({error, _, _}) ->
error_filter(_) ->
false.
+backend_can_index_reformat(Mod, ModState) ->
+ {ok, Caps} = Mod:capabilities(ModState),
+ lists:member(index_reformat, Caps).
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
View
24 src/riak_kv_pb_bucket.erl
@@ -26,8 +26,6 @@
%% <pre>
%% 15 - RpbListBucketsReq
%% 17 - RpbListKeysReq
-%% 19 - RpbGetBucketReq
-%% 21 - RpbSetBucketReq
%% </pre>
%%
%% <p>This service produces the following responses:</p>
@@ -35,8 +33,6 @@
%% <pre>
%% 16 - RpbListBucketsResp
%% 18 - RpbListKeysResp{1,}
-%% 20 - RpbGetBucketResp
-%% 22 - RpbSetBucketResp
%% </pre>
%%
%% <p>The semantics are unchanged from their original
@@ -88,25 +84,7 @@ process(rpblistbucketsreq,
process(#rpblistkeysreq{bucket=B}=Req, #state{client=C} = State) ->
%% stream_list_keys results will be processed by process_stream/3
{ok, ReqId} = C:stream_list_keys(B),
- {reply, {stream, ReqId}, State#state{req = Req, req_ctx = ReqId}};
-
-%% Get bucket properties
-process(#rpbgetbucketreq{bucket=B},
- #state{client=C} = State) ->
- Props = C:get_bucket(B),
- PbProps = riak_pb_kv_codec:encode_bucket_props(Props),
- {reply, #rpbgetbucketresp{props = PbProps}, State};
-
-%% Set bucket properties
-process(#rpbsetbucketreq{bucket=B, props = PbProps},
- #state{client=C} = State) ->
- Props = riak_pb_kv_codec:decode_bucket_props(PbProps),
- case C:set_bucket(B, Props) of
- ok ->
- {reply, rpbsetbucketresp, State};
- {error, Details} ->
- {error, {format, "Invalid bucket properties: ~p", [Details]}, State}
- end.
+ {reply, {stream, ReqId}, State#state{req = Req, req_ctx = ReqId}}.
%% @doc process_stream/3 callback. Handles streaming keys messages.
process_stream({ReqId, done}, ReqId,
View
92 src/riak_kv_pb_object.erl
@@ -50,6 +50,11 @@
-include_lib("riak_pb/include/riak_kv_pb.hrl").
-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").
+-ifdef(TEST).
+-compile([export_all]).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-behaviour(riak_api_pb_service).
-export([init/0,
@@ -65,8 +70,6 @@
req_ctx, % context to go along with request (partial results, request ids etc)
client_id = <<0,0,0,0>> }). % emulate legacy API when vnode_vclocks is true
--define(DEFAULT_TIMEOUT, 60000).
-
%% @doc init/0 callback. Returns the service internal start
%% state.
-spec init() -> any().
@@ -100,14 +103,20 @@ process(#rpbsetclientidreq{client_id = ClientId}, State) ->
end,
{reply, rpbsetclientidresp, NewState};
+process(#rpbgetreq{bucket = <<>>}, State) ->
+ {error, "Bucket cannot be zero-length", State};
+process(#rpbgetreq{key = <<>>}, State) ->
+ {error, "Key cannot be zero-length", State};
process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
basic_quorum=BQ, if_modified=VClock,
- head=Head, deletedvclock=DeletedVClock}, #state{client=C} = State) ->
+ head=Head, deletedvclock=DeletedVClock,
+ timeout=Timeout}, #state{client=C} = State) ->
R = decode_quorum(R0),
PR = decode_quorum(PR0),
case C:get(B, K, make_option(deletedvclock, DeletedVClock) ++
make_option(r, R) ++
make_option(pr, PR) ++
+ make_option(timeout, Timeout) ++
make_option(notfound_ok, NFOk) ++
make_option(basic_quorum, BQ)) of
{ok, O} ->
@@ -139,6 +148,10 @@ process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
{error, {format,Reason}, State}
end;
+process(#rpbputreq{bucket = <<>>}, State) ->
+ {error, "Bucket cannot be zero-length", State};
+process(#rpbputreq{key = <<>>}, State) ->
+ {error, "Key cannot be zero-length", State};
process(#rpbputreq{bucket=B, key=K, vclock=PbVC,
if_not_modified=NotMod, if_none_match=NoneMatch} = Req,
#state{client=C} = State) when NotMod; NoneMatch ->
@@ -166,7 +179,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC,
process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
w=W0, dw=DW0, pw=PW0, return_body=ReturnBody,
- return_head=ReturnHead},
+ return_head=ReturnHead, timeout=Timeout},
#state{client=C} = State) ->
case K of
@@ -196,7 +209,8 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
end
end,
case C:put(O, make_option(w, W) ++ make_option(dw, DW) ++
- make_option(pw, PW) ++ [{timeout, default_timeout()} | Options]) of
+ make_option(pw, PW) ++ make_option(timeout, Timeout) ++
+ Options) of
ok when is_binary(ReturnKey) ->
PutResp = #rpbputresp{key = ReturnKey},
{reply, PutResp, State};
@@ -226,7 +240,8 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
end;
process(#rpbdelreq{bucket=B, key=K, vclock=PbVc,
- r=R0, w=W0, pr=PR0, pw=PW0, dw=DW0, rw=RW0},
+ r=R0, w=W0, pr=PR0, pw=PW0, dw=DW0, rw=RW0,
+ timeout=Timeout},
#state{client=C} = State) ->
W = decode_quorum(W0),
PW = decode_quorum(PW0),
@@ -240,7 +255,8 @@ process(#rpbdelreq{bucket=B, key=K, vclock=PbVc,
make_option(rw, RW) ++
make_option(pr, PR) ++
make_option(pw, PW) ++
- make_option(dw, DW),
+ make_option(dw, DW) ++
+ make_option(timeout, Timeout),
Result = case PbVc of
undefined ->
C:delete(B, K, Options);
@@ -299,5 +315,63 @@ erlify_rpbvc(PbVc) ->
pbify_rpbvc(Vc) ->
zlib:zip(term_to_binary(Vc)).
-default_timeout() ->
- ?DEFAULT_TIMEOUT.
+%% ===================================================================
+%% Tests
+%% ===================================================================
+-ifdef(TEST).
+
+-define(CODE(Msg), riak_pb_codec:msg_code(Msg)).
+-define(PAYLOAD(Msg), riak_kv_pb:encode(Msg)).
+
+empty_bucket_key_test_() ->
+ Name = "empty_bucket_key_test",
+ SetupFun = fun (load) ->
+ application:set_env(riak_kv, storage_backend, riak_kv_memory_backend),
+ application:set_env(riak_api, pb_ip, "127.0.0.1"),
+ application:set_env(riak_api, pb_port, 32767);
+ (_) -> ok end,
+ {setup,
+ riak_kv_test_util:common_setup(Name, SetupFun),
+ riak_kv_test_util:common_cleanup(Name, SetupFun),
+ [{"RpbPutReq with empty key is disallowed",
+ ?_assertMatch([0|_], request(#rpbputreq{bucket = <<"foo">>,
+ key = <<>>,
+ content=#rpbcontent{value = <<"dummy">>}}))},
+ {"RpbPutReq with empty bucket is disallowed",
+ ?_assertMatch([0|_], request(#rpbputreq{bucket = <<>>,
+ key = <<"foo">>,
+ content=#rpbcontent{value = <<"dummy">>}}))},
+ {"RpbGetReq with empty key is disallowed",
+ ?_assertMatch([0|_], request(#rpbgetreq{bucket = <<"foo">>,
+ key = <<>>}))},
+ {"RpbGetReq with empty bucket is disallowed",
+ ?_assertMatch([0|_], request(#rpbgetreq{bucket = <<>>,
+ key = <<"foo">>}))}]}.
+
+%% Utility funcs copied from riak_api/test/pb_service_test.erl
+
+request(Msg) when is_tuple(Msg) andalso is_atom(element(1, Msg)) ->
+ request(?CODE(element(1,Msg)), iolist_to_binary(?PAYLOAD(Msg))).
+
+request(Code, Payload) when is_binary(Payload), is_integer(Code) ->
+ Connection = new_connection(),
+ ?assertMatch({ok, _}, Connection),
+ {ok, Socket} = Connection,
+ request(Code, Payload, Socket).
+
+request(Code, Payload, Socket) when is_binary(Payload), is_integer(Code) ->
+ ?assertEqual(ok, gen_tcp:send(Socket, <<Code:8, Payload/binary>>)),
+ Result = gen_tcp:recv(Socket, 0),
+ ?assertMatch({ok, _}, Result),
+ {ok, Response} = Result,
+ Response.
+
+new_connection() ->
+ new_connection([{packet,4}, {header, 1}]).
+
+new_connection(Options) ->
+ Host = app_helper:get_env(riak_api, pb_ip),
+ Port = app_helper:get_env(riak_api, pb_port),
+ gen_tcp:connect(Host, Port, [binary, {active, false},{nodelay, true}|Options]).
+
+-endif.
View
225 src/riak_kv_put_core.erl
@@ -20,10 +20,14 @@
%%
%% -------------------------------------------------------------------
-module(riak_kv_put_core).
--export([init/7, add_result/2, enough/1, response/1,
+-export([init/9, add_result/2, enough/1, response/1,
final/1, result_shortcode/1, result_idx/1]).
-export_type([putcore/0, result/0, reply/0]).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-type vput_result() :: any().
-type result() :: w |
@@ -36,10 +40,12 @@
{error, notfound} |
{error, any()}.
-type idxresult() :: {non_neg_integer(), result()}.
+-type idx_type() :: [{non_neg_integer, 'primary' | 'fallback'}].
-record(putcore, {n :: pos_integer(),
w :: non_neg_integer(),
dw :: non_neg_integer(),
- w_fail_threshold :: pos_integer(),
+ pw :: non_neg_integer(),
+