Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

1.3 to master #519

Merged
merged 22 commits into from

6 participants

@russelldb
Owner

Just to verify that the merge dealt with the conflicts correctly.

engelsanchez and others added some commits
@engelsanchez engelsanchez Add bulk vnode start function to allow parallelism 4092945
@engelsanchez engelsanchez Merge pull request #511 from basho/eas-add-kv-vnode-parallel-init
riak_kv 1.3.1 using parallel vnode init
f9b8bca
@engelsanchez engelsanchez support for handling legacy sext encoding of 2i keys
This commit is a squashed version of the branch:
jrw-eas-2i-fix-keys-backport.

The version of sext used by riak 1.3 and prior did not
preserve the sort order of bigints when serializing erlang
terms resulting in 2i queries that returned missing or
invalid results (riak_kv#499). The updated version of sext fixes the issue
with an incompatible change to the encoding of bigints. Special
handling of this incompatability is necessary to provide a path
for indexes to be fixed (as they are written to disk).

A utility function for reformatting the on-disk indexes (per-node)
is provided. This allows indexes to be upgraded on a running node.
The utility folds over all indexes, per partition, and rewrites any
invalid index (those containing bigints encoded using the old version
of sext). In order to safely migrate indexes, until the fold that
rewrites the indexes in the new format completes, all riak_object
puts/deletes must delete the legacy index format in addition to the new format

Upon successful completion or if the partition is empty a flag is
written to the leveldb backend to ensure that future writes/deletes
to not inccur the additional cost of deleting the legacy index format

add support for downgrading back to legacy index format

Fix eleveldb fold_objects to start at 1st object

Also fixed special bucket option so it starts at the beginning of the
given bucket. But that doesn't appear to be used anywhere...

Add key namespace for special leveldb markers

My argument is that this may make it less messy down the road if others
start adding other special markers in leveldb partitions. They now have
a separate namespace, like objects and index keys do.

add console support for reformatting indices

Fix 2i bigint eq query during transition

This causes a double scan while in the transition from the legacy
(broken for big int) sext encoding for 2i keys. Once starting at the old
key value, later at the new one. Once the upgrade or
downgrade is finished, it should return to a normal scan.

add support for multi-backend index reformatting

the multibackend supports indexing on buckets assigned to sub-backends
that support indexing. LevelDB sub-backends must be reformatted.
58170cd
@engelsanchez engelsanchez Merge pull request #516 from basho/jrw-eas-2i-fix-keys-backport-squashed
support for handling legacy sext encoding of 2i keys
bbf87af
@jtuple jtuple Change AAE to use incremental crypto:sha calculations
The crypto library is implemented as a NIF, and long running NIFs can
negatively impact the work balancing algorithm of the Erlang
scheduler. Using normal crypto:sha(object) is unbounded in duration as
the size of an object is unbounded. This commit therefore changes AAE
to use an incremental SHA calculation (sha_init/sha_update/sha_final)
with a set chunk size (defaults to 4096; configurable app var
riak_kv/anti_entropy_sha_chunk).
2fd89b5
@jtuple jtuple Merge pull request #514 from basho/jdb-aae-incremental-sha-1.3 98aba59
@russelldb russelldb Re-use the stat calc funs in riak_core
Use the cached riak_pipe stats for legacy stats

When a broken stat is detected, register it.

Fix bug where stats endpoints were calculating _all_ riak_kv stats

Since adding many more stats, and most of the infrastructure for
ad hoc querying of stats, the stat calculation code for the
(not yet legacy) endpoints was calculating all stats for riak_kv.

As there are about (ring_size * vnode stats) + (fsm stages * fsm stats)
more stats now, this calculation, understandbly, took a long time.

This patch instead only calculates the minimum subset of stats
needed to support the (not yet legacy) stats endpoints.
fd2e527
@russelldb russelldb Since stats now get repaired when an update fails, log as `warning` 5d3a080
@russelldb russelldb Merge pull request #517 from basho/kv508-stats-warn
Since stats now get repaired when an update fails, log as `warning`
893d752
@jaredmorrow jaredmorrow Roll riak_kv version 1.3.1 eb24ff9
@russelldb russelldb Merge branch '1.3'
Conflicts:
	rebar.config
	src/riak_kv_stat.erl
	src/riak_kv_stat_bc.erl
	src/riak_kv_vnode.erl
4fa5967
@beerriot beerriot spell badarg correctly
Spelled incorrectly, the catch clause fails to prevent the process from
blowing up on error. Spelled correctly, the user receives an error
message describing what the problem is.
1045e81
@beerriot beerriot Merge pull request #522 from branch 'bwf-badard' into 1.3 8e7318d
@engelsanchez engelsanchez Fix perf problems and bug in 2i reformat
Add batch size parameter to control how many keys
are fixed at a time.
Add backpressure to query of bad idx entries to avoid
having the reformat process flooded with msgs.
Fixed problem with list keys and code triggering a second
scan during 2i reformat transition.
a03f2e9
@engelsanchez engelsanchez Make multi backend work with batched 2i reformats
This is WIP. Testing is not conclusive on whether it works, but it runs
to completion now.
371f480
@engelsanchez engelsanchez Fix 2i reformat batch for multibackend 3e1ec43
@engelsanchez engelsanchez Make reformat-indexes print default parameters 1bfaeae
@jrwest jrwest move querying of fixed index status to seperate backend function
makes top-level fixed_indexes status on multi-backend,
which messes with stats, unncessary allowing it to be removed
b50d1df
@engelsanchez
Collaborator

The code looks good to me. I tested index reformatting with a small data set in a mixed bitcask/memory/2 leveldb setup (from 1.3.0 -> 1.3.1). Ship it...
:+1: :dancer:

Collaborator

And to be clear: the logs are not spammed by bad stat messages anymore with this applied

@engelsanchez
Collaborator

I just merged the latest 1.3 into this and I'm reviewing the diff to merge into master.

@engelsanchez engelsanchez merged commit 19f19a9 into master
@engelsanchez engelsanchez deleted the 1.3_to_master branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 18, 2013
  1. @engelsanchez
Commits on Mar 21, 2013
  1. @engelsanchez

    Merge pull request #511 from basho/eas-add-kv-vnode-parallel-init

    engelsanchez authored
    riak_kv 1.3.1 using parallel vnode init
  2. @engelsanchez

    support for handling legacy sext encoding of 2i keys

    engelsanchez authored
    This commit is a squashed version of the branch:
    jrw-eas-2i-fix-keys-backport.
    
    The version of sext used by riak 1.3 and prior did not
    preserve the sort order of bigints when serializing erlang
    terms resulting in 2i queries that returned missing or
    invalid results (riak_kv#499). The updated version of sext fixes the issue
    with an incompatible change to the encoding of bigints. Special
    handling of this incompatability is necessary to provide a path
    for indexes to be fixed (as they are written to disk).
    
    A utility function for reformatting the on-disk indexes (per-node)
    is provided. This allows indexes to be upgraded on a running node.
    The utility folds over all indexes, per partition, and rewrites any
    invalid index (those containing bigints encoded using the old version
    of sext). In order to safely migrate indexes, until the fold that
    rewrites the indexes in the new format completes, all riak_object
    puts/deletes must delete the legacy index format in addition to the new format
    
    Upon successful completion or if the partition is empty a flag is
    written to the leveldb backend to ensure that future writes/deletes
    to not inccur the additional cost of deleting the legacy index format
    
    add support for downgrading back to legacy index format
    
    Fix eleveldb fold_objects to start at 1st object
    
    Also fixed special bucket option so it starts at the beginning of the
    given bucket. But that doesn't appear to be used anywhere...
    
    Add key namespace for special leveldb markers
    
    My argument is that this may make it less messy down the road if others
    start adding other special markers in leveldb partitions. They now have
    a separate namespace, like objects and index keys do.
    
    add console support for reformatting indices
    
    Fix 2i bigint eq query during transition
    
    This causes a double scan while in the transition from the legacy
    (broken for big int) sext encoding for 2i keys. Once starting at the old
    key value, later at the new one. Once the upgrade or
    downgrade is finished, it should return to a normal scan.
    
    add support for multi-backend index reformatting
    
    the multibackend supports indexing on buckets assigned to sub-backends
    that support indexing. LevelDB sub-backends must be reformatted.
  3. @engelsanchez

    Merge pull request #516 from basho/jrw-eas-2i-fix-keys-backport-squashed

    engelsanchez authored
    support for handling legacy sext encoding of 2i keys
  4. @jtuple

    Change AAE to use incremental crypto:sha calculations

    jtuple authored
    The crypto library is implemented as a NIF, and long running NIFs can
    negatively impact the work balancing algorithm of the Erlang
    scheduler. Using normal crypto:sha(object) is unbounded in duration as
    the size of an object is unbounded. This commit therefore changes AAE
    to use an incremental SHA calculation (sha_init/sha_update/sha_final)
    with a set chunk size (defaults to 4096; configurable app var
    riak_kv/anti_entropy_sha_chunk).
  5. @jtuple
  6. @russelldb

    Re-use the stat calc funs in riak_core

    russelldb authored
    Use the cached riak_pipe stats for legacy stats
    
    When a broken stat is detected, register it.
    
    Fix bug where stats endpoints were calculating _all_ riak_kv stats
    
    Since adding many more stats, and most of the infrastructure for
    ad hoc querying of stats, the stat calculation code for the
    (not yet legacy) endpoints was calculating all stats for riak_kv.
    
    As there are about (ring_size * vnode stats) + (fsm stages * fsm stats)
    more stats now, this calculation, understandbly, took a long time.
    
    This patch instead only calculates the minimum subset of stats
    needed to support the (not yet legacy) stats endpoints.
Commits on Mar 22, 2013
  1. @russelldb
  2. @russelldb

    Merge pull request #517 from basho/kv508-stats-warn

    russelldb authored
    Since stats now get repaired when an update fails, log as `warning`
Commits on Mar 23, 2013
  1. @jaredmorrow
Commits on Mar 26, 2013
  1. @russelldb

    Merge branch '1.3'

    russelldb authored
    Conflicts:
    	rebar.config
    	src/riak_kv_stat.erl
    	src/riak_kv_stat_bc.erl
    	src/riak_kv_vnode.erl
Commits on Mar 27, 2013
  1. @beerriot

    spell badarg correctly

    beerriot authored
    Spelled incorrectly, the catch clause fails to prevent the process from
    blowing up on error. Spelled correctly, the user receives an error
    message describing what the problem is.
  2. @beerriot
Commits on Mar 29, 2013
  1. @engelsanchez

    Fix perf problems and bug in 2i reformat

    engelsanchez authored
    Add batch size parameter to control how many keys
    are fixed at a time.
    Add backpressure to query of bad idx entries to avoid
    having the reformat process flooded with msgs.
    Fixed problem with list keys and code triggering a second
    scan during 2i reformat transition.
Commits on Mar 30, 2013
  1. @engelsanchez

    Make multi backend work with batched 2i reformats

    engelsanchez authored
    This is WIP. Testing is not conclusive on whether it works, but it runs
    to completion now.
Commits on Apr 1, 2013
  1. @engelsanchez
  2. @engelsanchez
  3. @jrwest

    move querying of fixed index status to seperate backend function

    jrwest authored
    makes top-level fixed_indexes status on multi-backend,
    which messes with stats, unncessary allowing it to be removed
  4. @engelsanchez

    Merge pull request #525 from basho/jrw-2i-reformat-status-change

    engelsanchez authored
    move querying of fixed index status to seperate backend function
  5. @engelsanchez

    Merge pull request #523 from basho/eas-perf-fixes-to-2i-reformat

    engelsanchez authored
    Fix perf problems and bug in 2i reformat
Commits on Apr 2, 2013
  1. @jaredmorrow
Commits on Apr 5, 2013
  1. @engelsanchez

    Merge branch '1.3' into 1.3_to_master

    engelsanchez authored
    Conflicts:
    	rebar.config
This page is out of date. Refresh to see the latest.
View
35 src/hashtree.erl
@@ -506,7 +506,27 @@ share_segment_store(State, #state{ref=Ref, path=Path}) ->
-spec hash(term()) -> binary().
hash(X) ->
%% erlang:phash2(X).
- crypto:sha(term_to_binary(X)).
+ sha(term_to_binary(X)).
+
+sha(Bin) ->
+ Chunk = app_helper:get_env(riak_kv, anti_entropy_sha_chunk, 4096),
+ sha(Chunk, Bin).
+
+sha(Chunk, Bin) ->
+ Ctx1 = crypto:sha_init(),
+ Ctx2 = sha(Chunk, Bin, Ctx1),
+ SHA = crypto:sha_final(Ctx2),
+ SHA.
+
+sha(Chunk, Bin, Ctx) ->
+ case Bin of
+ <<Data:Chunk/binary, Rest/binary>> ->
+ Ctx2 = crypto:sha_update(Ctx, Data),
+ sha(Chunk, Rest, Ctx2);
+ Data ->
+ Ctx2 = crypto:sha_update(Ctx, Data),
+ Ctx2
+ end.
-spec update_levels(integer(),
[{integer(), [{integer(), binary()}]}],
@@ -1026,6 +1046,19 @@ delta_test() ->
%%%===================================================================
-ifdef(EQC).
+sha_test_() ->
+ {timeout, 60,
+ fun() ->
+ ?assert(eqc:quickcheck(eqc:testing_time(4, prop_sha())))
+ end
+ }.
+
+prop_sha() ->
+ ?FORALL(Size, choose(256, 1024*1024),
+ ?FORALL(Chunk, choose(1, Size),
+ ?FORALL(Bin, binary(Size),
+ sha(Chunk, Bin) =:= crypto:sha(Bin)))).
+
eqc_test_() ->
{timeout, 5,
fun() ->
View
2  src/riak_kv.app.src
@@ -3,7 +3,7 @@
{application, riak_kv,
[
{description, "Riak Key/Value Store"},
- {vsn, "1.3.0"},
+ {vsn, "1.3.1"},
{applications, [
kernel,
stdlib,
View
68 src/riak_kv_console.erl
@@ -36,6 +36,7 @@
cluster_info/1,
down/1,
aae_status/1,
+ reformat_indexes/1,
reload_code/1]).
%% Arrow is 24 chars wide
@@ -421,6 +422,73 @@ format_timestamp(_Now, undefined) ->
format_timestamp(Now, TS) ->
riak_core_format:human_time_fmt("~.1f", timer:now_diff(Now, TS)).
+parse_int(IntStr) ->
+ try
+ list_to_integer(IntStr)
+ catch
+ error:badarg ->
+ undefined
+ end.
+
+index_reformat_options([], Opts) ->
+ Defaults = [{concurrency, 2}, {batch_size, 100}],
+ AddIfAbsent =
+ fun({Name,Val}, Acc) ->
+ case lists:keymember(Name, 1, Acc) of
+ true ->
+ Acc;
+ false ->
+ [{Name, Val} | Acc]
+ end
+ end,
+ lists:foldl(AddIfAbsent, Opts, Defaults);
+index_reformat_options(["--downgrade"], Opts) ->
+ [{downgrade, true} | Opts];
+index_reformat_options(["--downgrade" | More], _Opts) ->
+ io:format("Invalid arguments after downgrade switch : ~p~n", [More]),
+ undefined;
+index_reformat_options([IntStr | Rest], Opts) ->
+ HasConcurrency = lists:keymember(concurrency, 1, Opts),
+ HasBatchSize = lists:keymember(batch_size, 1, Opts),
+ case {parse_int(IntStr), HasConcurrency, HasBatchSize} of
+ {_, true, true} ->
+ io:format("Expected --downgrade instead of ~p~n", [IntStr]),
+ undefined;
+ {undefined, _, _ } ->
+ io:format("Expected integer parameter instead of ~p~n", [IntStr]),
+ undefined;
+ {IntVal, false, false} ->
+ index_reformat_options(Rest, [{concurrency, IntVal} | Opts]);
+ {IntVal, true, false} ->
+ index_reformat_options(Rest, [{batch_size, IntVal} | Opts])
+ end;
+index_reformat_options(_, _) ->
+ undefined.
+
+reformat_indexes(Args) ->
+ Opts = index_reformat_options(Args, []),
+ case Opts of
+ undefined ->
+ io:format("Expected options: <concurrency> <batch size> [--downgrade]~n"),
+ ok;
+ _ ->
+ start_index_reformat(Opts),
+ io:format("index reformat started with options ~p ~n", [Opts]),
+ io:format("check console.log for status information~n"),
+ ok
+ end.
+
+start_index_reformat(Opts) ->
+ spawn(fun() -> run_index_reformat(Opts) end).
+
+run_index_reformat(Opts) ->
+ try riak_kv_util:fix_incorrect_index_entries(Opts)
+ catch
+ Err:Reason ->
+ lager:error("index reformat crashed with error type ~p and reason: ~p",
+ [Err, Reason])
+ end.
+
%%%===================================================================
%%% Private
%%%===================================================================
View
244 src/riak_kv_eleveldb_backend.erl
@@ -33,6 +33,10 @@
put/5,
delete/4,
drop/1,
+ fix_index/3,
+ mark_indexes_fixed/2,
+ set_legacy_indexes/2,
+ fixed_index_status/1,
fold_buckets/4,
fold_keys/4,
fold_objects/4,
@@ -50,7 +54,8 @@
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold, indexes]).
+-define(CAPABILITIES, [async_fold, indexes, index_reformat]).
+-define(FIXED_INDEXES_KEY, fixed_indexes).
-record(state, {ref :: reference(),
data_root :: string(),
@@ -58,7 +63,9 @@
config :: config(),
read_opts = [],
write_opts = [],
- fold_opts = [{fill_cache, false}]
+ fold_opts = [{fill_cache, false}],
+ fixed_indexes = false, %% true if legacy indexes have be rewritten
+ legacy_indexes = false %% true if new writes use legacy indexes (downgrade)
}).
@@ -99,11 +106,30 @@ start(Partition, Config) ->
S0 = init_state(DataDir, Config),
case open_db(S0) of
{ok, State} ->
- {ok, State};
+ determine_fixed_index_status(State);
{error, Reason} ->
{error, Reason}
end.
+determine_fixed_index_status(State) ->
+ case indexes_fixed(State) of
+ {error, Reason} ->
+ {error, Reason};
+ true ->
+ {ok, State#state{fixed_indexes=true}};
+ false ->
+ case is_empty(State) of
+ true -> mark_indexes_fixed_on_start(State);
+ false -> {ok, State#state{fixed_indexes=false}}
+ end
+ end.
+
+mark_indexes_fixed_on_start(State) ->
+ case mark_indexes_fixed(State, true) of
+ {error, Reason, _} -> {error, Reason};
+ Res -> Res
+ end.
+
%% @doc Stop the eleveldb backend
-spec stop(state()) -> ok.
stop(State) ->
@@ -138,18 +164,25 @@ get(Bucket, Key, #state{read_opts=ReadOpts,
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{ref=Ref,
- write_opts=WriteOpts}=State) ->
+ write_opts=WriteOpts,
+ legacy_indexes=WriteLegacy,
+ fixed_indexes=FixedIndexes}=State) ->
%% Create the KV update...
StorageKey = to_object_key(Bucket, PrimaryKey),
Updates1 = [{put, StorageKey, Val}],
%% Convert IndexSpecs to index updates...
F = fun({add, Field, Value}) ->
- {put, to_index_key(Bucket, PrimaryKey, Field, Value), <<>>};
+ case WriteLegacy of
+ true ->
+ [{put, to_legacy_index_key(Bucket, PrimaryKey, Field, Value), <<>>}];
+ false ->
+ [{put, to_index_key(Bucket, PrimaryKey, Field, Value), <<>>}]
+ end;
({remove, Field, Value}) ->
- {delete, to_index_key(Bucket, PrimaryKey, Field, Value)}
+ index_deletes(FixedIndexes, Bucket, PrimaryKey, Field, Value)
end,
- Updates2 = [F(X) || X <- IndexSpecs],
+ Updates2 = lists:flatmap(F, IndexSpecs),
%% Perform the write...
case eleveldb:write(Ref, Updates1 ++ Updates2, WriteOpts) of
@@ -159,13 +192,105 @@ put(Bucket, PrimaryKey, IndexSpecs, Val, #state{ref=Ref,
{error, Reason, State}
end.
+indexes_fixed(#state{ref=Ref,read_opts=ReadOpts}) ->
+ case eleveldb:get(Ref, to_md_key(?FIXED_INDEXES_KEY), ReadOpts) of
+ {ok, <<1>>} ->
+ true;
+ {ok, <<0>>} ->
+ false;
+ not_found ->
+ false;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+index_deletes(FixedIndexes, Bucket, PrimaryKey, Field, Value) ->
+ IndexKey = to_index_key(Bucket, PrimaryKey, Field, Value),
+ LegacyKey = to_legacy_index_key(Bucket, PrimaryKey, Field, Value),
+ KeyDelete = [{delete, IndexKey}],
+ LegacyDelete = [{delete, LegacyKey}
+ || FixedIndexes =:= false andalso IndexKey =/= LegacyKey],
+ KeyDelete ++ LegacyDelete.
+
+fix_index(IndexKeys, ForUpgrade, #state{ref=Ref,
+ read_opts=ReadOpts,
+ write_opts=WriteOpts} = State)
+ when is_list(IndexKeys) ->
+ FoldFun =
+ fun(ok, {Success, Ignore, Error}) ->
+ {Success+1, Ignore, Error};
+ (ignore, {Success, Ignore, Error}) ->
+ {Success, Ignore+1, Error};
+ ({error, _}, {Success, Ignore, Error}) ->
+ {Success, Ignore, Error+1}
+ end,
+ Totals =
+ lists:foldl(FoldFun, {0,0,0},
+ [fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts)
+ || {_Bucket, IndexKey} <- IndexKeys]),
+ {reply, Totals, State};
+fix_index(IndexKey, ForUpgrade, #state{ref=Ref,
+ read_opts=ReadOpts,
+ write_opts=WriteOpts} = State) ->
+ case fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts) of
+ Atom when is_atom(Atom) ->
+ {Atom, State};
+ {error, Reason} ->
+ {error, Reason, State}
+ end.
+
+fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts) ->
+ case eleveldb:get(Ref, IndexKey, ReadOpts) of
+ {ok, _} ->
+ {Bucket, Key, Field, Value} = from_index_key(IndexKey),
+ NewKey = case ForUpgrade of
+ true -> to_index_key(Bucket, Key, Field, Value);
+ false -> to_legacy_index_key(Bucket, Key, Field, Value)
+ end,
+ Updates = [{delete, IndexKey}, {put, NewKey, <<>>}],
+ case eleveldb:write(Ref, Updates, WriteOpts) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ not_found ->
+ ignore;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+mark_indexes_fixed(State=#state{fixed_indexes=true}, true) ->
+ {ok, State};
+mark_indexes_fixed(State=#state{fixed_indexes=false}, false) ->
+ {ok, State};
+mark_indexes_fixed(State=#state{ref=Ref, write_opts=WriteOpts}, ForUpgrade) ->
+ Value = case ForUpgrade of
+ true -> <<1>>;
+ false -> <<0>>
+ end,
+ Updates = [{put, to_md_key(?FIXED_INDEXES_KEY), Value}],
+ case eleveldb:write(Ref, Updates, WriteOpts) of
+ ok ->
+ {ok, State#state{fixed_indexes=ForUpgrade}};
+ {error, Reason} ->
+ {error, Reason, State}
+ end.
+
+set_legacy_indexes(State, WriteLegacy) ->
+ State#state{legacy_indexes=WriteLegacy}.
+
+-spec fixed_index_status(state()) -> boolean().
+fixed_index_status(#state{fixed_indexes=Fixed}) ->
+ Fixed.
%% @doc Delete an object from the eleveldb backend
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, PrimaryKey, IndexSpecs, #state{ref=Ref,
- write_opts=WriteOpts}=State) ->
+ write_opts=WriteOpts,
+ fixed_indexes=FixedIndexes}=State) ->
%% Create the KV delete...
StorageKey = to_object_key(Bucket, PrimaryKey),
@@ -173,9 +298,9 @@ delete(Bucket, PrimaryKey, IndexSpecs, #state{ref=Ref,
%% Convert IndexSpecs to index deletes...
F = fun({remove, Field, Value}) ->
- {delete, to_index_key(Bucket, PrimaryKey, Field, Value)}
+ index_deletes(FixedIndexes, Bucket, PrimaryKey, Field, Value)
end,
- Updates2 = [F(X) || X <- IndexSpecs],
+ Updates2 = lists:flatmap(F, IndexSpecs),
case eleveldb:write(Ref, Updates1 ++ Updates2, WriteOpts) of
ok ->
@@ -218,6 +343,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{fold_opts=FoldOpts,
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{fold_opts=FoldOpts,
+ fixed_indexes=FixedIdx,
+ legacy_indexes=WriteLegacyIdx,
ref=Ref}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
@@ -235,15 +362,23 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{fold_opts=FoldOpts,
FirstKey = to_first_key(Limiter),
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
FoldOpts1 = [{first_key, FirstKey} | FoldOpts],
+ ExtraFold = not FixedIdx orelse WriteLegacyIdx,
KeyFolder =
fun() ->
- %% Do the fold. ELevelDB uses throw/1 to break out of a fold...
- try
- eleveldb:fold_keys(Ref, FoldFun, Acc, FoldOpts1)
- catch
- {break, AccFinal} ->
- AccFinal
- end
+ %% Do the fold. ELevelDB uses throw/1 to break out of a fold...
+ AccFinal =
+ try
+ eleveldb:fold_keys(Ref, FoldFun, Acc, FoldOpts1)
+ catch
+ {break, BrkResult} ->
+ BrkResult
+ end,
+ case ExtraFold of
+ true ->
+ legacy_key_fold(Ref, FoldFun, AccFinal, FoldOpts1, Limiter);
+ false ->
+ AccFinal
+ end
end,
case lists:member(async_fold, Opts) of
true ->
@@ -252,6 +387,24 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{fold_opts=FoldOpts,
{ok, KeyFolder()}
end.
+legacy_key_fold(Ref, FoldFun, Acc, FoldOpts0, Query={index, _, _}) ->
+ {_, FirstKey} = lists:keyfind(first_key, 1, FoldOpts0),
+ LegacyKey = to_legacy_first_key(Query),
+ case LegacyKey =/= FirstKey of
+ true ->
+ try
+ FoldOpts = lists:keyreplace(first_key, 1, FoldOpts0, {first_key, LegacyKey}),
+ eleveldb:fold_keys(Ref, FoldFun, Acc, FoldOpts)
+ catch
+ {break, AccFinal} ->
+ AccFinal
+ end;
+ false ->
+ Acc
+ end;
+legacy_key_fold(_Ref, _FoldFun, Acc, _FoldOpts, _Query) ->
+ Acc.
+
%% @doc Fold over all the objects for one or all buckets.
-spec fold_objects(riak_kv_backend:fold_objects_fun(),
any(),
@@ -298,10 +451,10 @@ is_empty(#state{ref=Ref}) ->
%% @doc Get the status information for this eleveldb backend
-spec status(state()) -> [{atom(), term()}].
-status(State) ->
+status(State=#state{fixed_indexes=FixedIndexes}) ->
{ok, Stats} = eleveldb:status(State#state.ref, <<"leveldb.stats">>),
{ok, ReadBlockError} = eleveldb:status(State#state.ref, <<"leveldb.ReadBlockError">>),
- [{stats, Stats}, {read_block_error, ReadBlockError}].
+ [{stats, Stats}, {read_block_error, ReadBlockError}, {fixed_indexes, FixedIndexes}].
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@@ -476,6 +629,36 @@ fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, FilterField, StartTerm,
throw({break, Acc})
end
end;
+fold_keys_fun(FoldKeysFun, {index, incorrect_format, ForUpgrade}) when is_boolean(ForUpgrade) ->
+ %% Over incorrectly formatted 2i index values
+ fun(StorageKey, Acc) ->
+ Action =
+ case from_index_key(StorageKey) of
+ {Bucket, Key, Field, Term} ->
+ NewKey = case ForUpgrade of
+ true ->
+ to_index_key(Bucket, Key, Field, Term);
+ false ->
+ to_legacy_index_key(Bucket, Key, Field, Term)
+ end,
+ case NewKey =:= StorageKey of
+ true ->
+ ignore;
+ false ->
+ {fold, Bucket, StorageKey}
+ end;
+ _ ->
+ stop
+ end,
+ case Action of
+ {fold, B, K} ->
+ FoldKeysFun(B, K, Acc);
+ ignore ->
+ Acc;
+ stop ->
+ throw({break, Acc})
+ end
+ end;
fold_keys_fun(_FoldKeysFun, Other) ->
throw({unknown_limiter, Other}).
@@ -498,10 +681,9 @@ fold_objects_fun(FoldObjectsFun, FilterBucket) ->
%% Augment the fold options list if a
%% bucket is defined.
fold_opts(undefined, FoldOpts) ->
- FoldOpts;
+ [{first_key, to_first_key(undefined)} | FoldOpts];
fold_opts(Bucket, FoldOpts) ->
- BKey = sext:encode({Bucket, <<>>}),
- [{first_key, BKey} | FoldOpts].
+ [{first_key, to_first_key({bucket, Bucket})} | FoldOpts].
%% @private Given a scope limiter, use sext to encode an expression
@@ -514,6 +696,9 @@ to_first_key(undefined) ->
to_first_key({bucket, Bucket}) ->
%% Start at the first object for a given bucket...
to_object_key(Bucket, <<>>);
+to_first_key({index, incorrect_format, ForUpgrade}) when is_boolean(ForUpgrade) ->
+ %% Start at first index entry
+ to_index_key(<<>>, <<>>, <<>>, <<>>);
to_first_key({index, Bucket, {eq, <<"$bucket">>, _Term}}) ->
%% 2I exact match query on special $bucket field...
to_first_key({bucket, Bucket});
@@ -529,6 +714,13 @@ to_first_key({index, Bucket, {range, Field, StartTerm, _EndTerm}}) ->
to_first_key(Other) ->
erlang:throw({unknown_limiter, Other}).
+% @doc If index query, encode key using legacy sext format.
+to_legacy_first_key({index, Bucket, {eq, Field, Term}}) ->
+ to_legacy_first_key({index, Bucket, {range, Field, Term, Term}});
+to_legacy_first_key({index, Bucket, {range, Field, StartTerm, _EndTerm}}) ->
+ to_legacy_index_key(Bucket, <<>>, Field, StartTerm);
+to_legacy_first_key(Other) ->
+ to_first_key(Other).
to_object_key(Bucket, Key) ->
sext:encode({o, Bucket, Key}).
@@ -544,6 +736,9 @@ from_object_key(LKey) ->
to_index_key(Bucket, Key, Field, Term) ->
sext:encode({i, Bucket, Field, Term, Key}).
+to_legacy_index_key(Bucket, Key, Field, Term) -> %% encode with legacy bignum encoding
+ sext:encode({i, Bucket, Field, Term, Key}, true).
+
from_index_key(LKey) ->
case sext:decode(LKey) of
{i, Bucket, Field, Term, Key} ->
@@ -552,6 +747,11 @@ from_index_key(LKey) ->
undefined
end.
+%% @doc Encode a key to store partition meta-data attributes.
+to_md_key(Key) ->
+ sext:encode({md, Key}).
+
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
View
2  src/riak_kv_mrc_pipe.erl
@@ -283,7 +283,7 @@ mapred_stream_sink(Inputs, Query, Timeout) ->
sender={Sender,SenderMon},
timer={Timer,PipeRef},
keeps=NumKeeps}}
- catch throw:{badard, Fitting, Reason} ->
+ catch throw:{badarg, Fitting, Reason} ->
riak_kv_mrc_sink:stop(Sink),
{error, {Fitting, Reason}}
end.
View
164 src/riak_kv_multi_backend.erl
@@ -38,14 +38,18 @@
fold_objects/4,
is_empty/1,
status/1,
- callback/3]).
+ callback/3,
+ fix_index/3,
+ set_legacy_indexes/2,
+ mark_indexes_fixed/2,
+ fixed_index_status/1]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold]).
+-define(CAPABILITIES, [async_fold, index_reformat]).
-record (state, {backends :: [{atom(), atom(), term()}],
default_backend :: atom()}).
@@ -98,12 +102,12 @@ capabilities(State) ->
%% Expose ?CAPABILITIES plus the intersection of all child
%% backends. (This backend creates a shim for any backends that
%% don't support async_fold.)
- F = fun({_, Mod, ModState}, Acc) ->
+ F = fun(Mod, ModState) ->
{ok, S1} = Mod:capabilities(ModState),
- S2 = ordsets:from_list(S1),
- ordsets:intersection(Acc, S2)
+ ordsets:from_list(S1)
end,
- Caps1 = lists:foldl(F, ordsets:new(), State#state.backends),
+ AllCaps = [F(Mod, ModState) || {_, Mod, ModState} <- State#state.backends],
+ Caps1 = ordsets:intersection(AllCaps),
Caps2 = ordsets:to_list(Caps1),
Capabilities = lists:usort(?CAPABILITIES ++ Caps2),
@@ -329,6 +333,90 @@ callback(Ref, Msg, #state{backends=Backends}=State) ->
[Mod:callback(Ref, Msg, ModState) || {_N, Mod, ModState} <- Backends],
{ok, State}.
+set_legacy_indexes(State=#state{backends=Backends}, WriteLegacy) ->
+ NewBackends = [{I, Mod, maybe_set_legacy_indexes(Mod, ModState, WriteLegacy)} ||
+ {I, Mod, ModState} <- Backends],
+ State#state{backends=NewBackends}.
+
+maybe_set_legacy_indexes(Mod, ModState, WriteLegacy) ->
+ case backend_can_index_reformat(Mod, ModState) of
+ true -> Mod:set_legacy_indexes(ModState, WriteLegacy);
+ false -> ModState
+ end.
+
+mark_indexes_fixed(State=#state{backends=Backends}, ForUpgrade) ->
+ NewBackends = mark_indexes_fixed(Backends, [], ForUpgrade),
+ {ok, State#state{backends=NewBackends}}.
+
+mark_indexes_fixed([], NewBackends, _) ->
+ lists:reverse(NewBackends);
+mark_indexes_fixed([{I, Mod, ModState} | Backends], NewBackends, ForUpgrade) ->
+ Res = maybe_mark_indexes_fixed(Mod, ModState, ForUpgrade),
+ case Res of
+ {error, Reason} ->
+ {error, Reason};
+ {ok, NewModState} ->
+ mark_indexes_fixed(Backends, [{I, Mod, NewModState} | NewBackends], ForUpgrade)
+ end.
+
+maybe_mark_indexes_fixed(Mod, ModState, ForUpgrade) ->
+ case backend_can_index_reformat(Mod, ModState) of
+ true -> Mod:mark_indexes_fixed(ModState, ForUpgrade);
+ false -> {ok, ModState}
+ end.
+
+fix_index(BKeys, ForUpgrade, State) ->
+ % Group keys per bucket
+ PerBucket = lists:foldl(fun(BK={B,_},D) -> dict:append(B,BK,D) end, dict:new(), BKeys),
+ Result =
+ dict:fold(
+ fun(Bucket, StorageKey, Acc = {Success, Ignore, Errors}) ->
+ {_, Mod, ModState} = Backend = get_backend(Bucket, State),
+ case backend_can_index_reformat(Mod, ModState) of
+ true ->
+ {S, I, E} = backend_fix_index(Backend, Bucket,
+ StorageKey, ForUpgrade),
+ {Success + S, Ignore + I, Errors + E};
+ false ->
+ Acc
+ end
+ end, {0, 0, 0}, PerBucket),
+ {reply, Result, State}.
+
+backend_fix_index({_, Mod, ModState}, Bucket, StorageKey, ForUpgrade) ->
+ case Mod:fix_index(StorageKey, ForUpgrade, ModState) of
+ {reply, Reply, _UpModState} ->
+ Reply;
+ {error, Reason} ->
+ lager:error("Failed to fix index for bucket ~p, key ~p, backend ~p: ~p",
+ [Bucket, StorageKey, Mod, Reason]),
+ {0, 0, length(StorageKey)}
+ end.
+
+-spec fixed_index_status(state()) -> boolean().
+fixed_index_status(#state{backends=Backends}) ->
+ lists:foldl(fun({_N, Mod, ModState}, Acc) ->
+ Status = Mod:status(ModState),
+ case fixed_index_status(Mod, ModState, Status) of
+ undefined -> Acc;
+ Res ->
+ case Acc of
+ undefined -> Res;
+ _ -> Res andalso Acc
+ end
+ end
+ end,
+ undefined,
+ Backends).
+
+fixed_index_status(Mod, ModState, Status) ->
+ case backend_can_index_reformat(Mod, ModState) of
+ true -> proplists:get_value(fixed_indexes, Status);
+ false -> undefined
+ end.
+
+
+
%% ===================================================================
%% Internal functions
%% ===================================================================
@@ -406,31 +494,47 @@ backend_fold_fun(ModFun, FoldFun, Opts, AsyncFold) ->
%% Get the backend capabilities to determine
%% if it supports asynchronous folding.
{ok, ModCaps} = Module:capabilities(SubState),
- case AsyncFold andalso
- lists:member(async_fold, ModCaps) of
- true ->
- AsyncWork =
- fun(Acc1) ->
- Module:ModFun(FoldFun,
- Acc1,
- Opts,
- SubState)
- end,
- {Acc, [AsyncWork | WorkList]};
- false ->
- Result = Module:ModFun(FoldFun,
- Acc,
- Opts,
- SubState),
- case Result of
- {ok, Acc1} ->
- {Acc1, WorkList};
- {error, Reason} ->
- throw({error, {Module, Reason}})
- end
+ DoAsync = AsyncFold andalso lists:member(async_fold, ModCaps),
+ Indexes = lists:keyfind(index, 1, Opts),
+ case Indexes of
+ {index, incorrect_format, _ForUpgrade} ->
+ case lists:member(index_reformat, ModCaps) of
+ true -> backend_fold_fun(Module, ModFun, SubState, FoldFun,
+ Opts, {Acc, WorkList}, DoAsync);
+ false -> {Acc, WorkList}
+ end;
+ _ ->
+ backend_fold_fun(Module,
+ ModFun,
+ SubState,
+ FoldFun,
+ Opts,
+ {Acc, WorkList},
+ DoAsync)
end
end.
+backend_fold_fun(Module, ModFun, SubState, FoldFun, Opts, {Acc, WorkList}, true) ->
+ AsyncWork =
+ fun(Acc1) ->
+ Module:ModFun(FoldFun,
+ Acc1,
+ Opts,
+ SubState)
+ end,
+ {Acc, [AsyncWork | WorkList]};
+backend_fold_fun(Module, ModFun, SubState, FoldFun, Opts, {Acc, WorkList}, false) ->
+ Result = Module:ModFun(FoldFun,
+ Acc,
+ Opts,
+ SubState),
+ case Result of
+ {ok, Acc1} ->
+ {Acc1, WorkList};
+ {error, Reason} ->
+ throw({error, {Module, Reason}})
+ end.
+
async_fold_fun() ->
fun(AsyncWork, Acc) ->
case AsyncWork(Acc) of
@@ -452,6 +556,10 @@ error_filter({error, _, _}) ->
error_filter(_) ->
false.
+backend_can_index_reformat(Mod, ModState) ->
+ {ok, Caps} = Mod:capabilities(ModState),
+ lists:member(index_reformat, Caps).
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
View
133 src/riak_kv_stat.erl
@@ -51,11 +51,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, monitor_loop/1]).
+-record(state, {repair_mon, monitors}).
+
-define(SERVER, ?MODULE).
-define(APP, riak_kv).
--record(state, {monitors}).
-
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@@ -88,7 +88,8 @@ update(Arg) ->
ok
catch
ErrClass:Err ->
- lager:error("~p:~p updating stat ~p.", [ErrClass, Err, Arg])
+ lager:warning("~p:~p updating stat ~p.", [ErrClass, Err, Arg]),
+ gen_server:cast(?SERVER, {re_register_stat, Arg})
end.
track_bucket(Bucket) when is_binary(Bucket) ->
@@ -112,8 +113,9 @@ stop() ->
init([]) ->
register_stats(),
- State = {state, [spawn_link(?MODULE, monitor_loop, [index]),
- spawn_link(?MODULE, monitor_loop, [list])]},
+ State = #state{monitors = [spawn_link(?MODULE, monitor_loop, [index]),
+ spawn_link(?MODULE, monitor_loop, [list])],
+ repair_mon = spawn_monitor(fun() -> stat_repair_loop() end)},
{ok, State}.
handle_call({register, Name, Type}, _From, State) ->
@@ -123,23 +125,37 @@ handle_call({get_monitor, Type}, _From, State) ->
Monitors = State#state.monitors,
[Monitor] = lists:filter(fun(Mon) ->
Mon ! {get_type, self()},
- receive
- T when is_atom(T),
+ receive
+ T when is_atom(T),
T == Type ->
true;
_ -> false
- after
+ after
1000 -> false
end
end,
Monitors),
{reply, Monitor, State}.
+handle_cast({re_register_stat, Arg}, State) ->
+ %% To avoid massive message queues
+ %% riak_kv stats are updated in the calling process
+ %% @see `update/1'.
+ %% The downside is that errors updating a stat don't crash
+ %% the server, so broken stats stay broken.
+ %% This re-creates the same behaviour as when a brokwn stat
+ %% crashes the gen_server by re-registering that stat.
+ #state{repair_mon={Pid, _Mon}} = State,
+ Pid ! {re_register_stat, Arg},
+ {noreply, State};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Req, State) ->
{noreply, State}.
+handle_info({'DOWN', MonRef, process, Pid, _Cause}, State=#state{repair_mon={Pid, MonRef}}) ->
+ RepairMonitor = spawn_monitor(fun() -> stat_repair_loop() end),
+ {noreply, State#state{repair_mon=RepairMonitor}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -223,15 +239,15 @@ do_update({fsm_destroy, Type}) ->
add_monitor(Type, Pid) ->
M = gen_server:call(?SERVER, {get_monitor, Type}),
- case M of
+ case M of
Mon when is_pid(Mon) ->
- Mon ! {add_pid, Pid};
- error ->
+ Mon ! {add_pid, Pid};
+ error ->
lager:error("No couldn't find procress to add monitor")
- end.
+ end.
monitor_loop(Type) ->
- receive
+ receive
{add_pid, Pid} ->
erlang:monitor(process, Pid);
{get_type, Sender} ->
@@ -239,7 +255,7 @@ monitor_loop(Type) ->
{'DOWN', _Ref, process, _Pid, _Reason} ->
do_update({fsm_destroy, Type})
end,
- monitor_loop(Type).
+ monitor_loop(Type).
%% Per index stats (by op)
do_per_index(Op, Idx, USecs) ->
@@ -440,10 +456,91 @@ multibackend_read_block_errors([{_Name, Status}|Rest], undefined) ->
multibackend_read_block_errors(_, Val) ->
rbe_val(Val).
-rbe_val(Bin) when is_binary(Bin) ->
- list_to_integer(binary_to_list(Bin));
-rbe_val(_) ->
- undefined.
+rbe_val(undefined) ->
+ undefined;
+rbe_val(Bin) ->
+ list_to_integer(binary_to_list(Bin)).
+
+%% All stat creation is serialized through riak_kv_stat.
+%% Some stats are created on demand as part of the call to `update/1'.
+%% When a stat error is caught, the stat must be deleted and recreated.
+%% Since stat updates can happen from many processes concurrently
+%% a stat that throws an error may already have been deleted and
+%% recreated. To protect against needlessly deleting and recreating
+%% an already 'fixed stat' first retry the stat update. There is a chance
+%% that the retry succeeds as the stat has been recreated, but some on
+%% demand stat it uses has not yet. Since stat creates are serialized
+%% in riak_kv_stat re-registering a stat could cause a deadlock.
+%% This loop is spawned as a process to avoid that.
+stat_repair_loop() ->
+ receive
+ {re_register_stat, Arg} ->
+ re_register_stat(Arg),
+ stat_repair_loop();
+ _ ->
+ stat_repair_loop()
+ end.
+
+re_register_stat(Arg) ->
+ case (catch do_update(Arg)) of
+ {'EXIT', _} ->
+ Stats = stats_from_update_arg(Arg),
+ [begin
+ (catch folsom_metrics:delete_metric(Name)),
+ do_register_stat(Name, Type)
+ end || {Name, {metric, _, Type, _}} <- Stats];
+ ok ->
+ ok
+ end.
+
+%% Map from application argument used in call to `update/1' to
+%% folsom stat names and types.
+%% Updates that create dynamic stats must select all
+%% related stats.
+stats_from_update_arg({vnode_get, _, _}) ->
+ riak_core_stat_q:names_and_types([?APP, vnode, gets]);
+stats_from_update_arg({vnode_put, _, _}) ->
+ riak_core_stat_q:names_and_types([?APP, vnode, puts]);
+stats_from_update_arg(vnode_index_read) ->
+ riak_core_stat_q:names_and_types([?APP, vnode, index, reads]);
+stats_from_update_arg({vnode_index_write, _, _}) ->
+ riak_core_stat_q:names_and_types([?APP, vnode, index, writes]) ++
+ riak_core_stat_q:names_and_types([?APP, vnode, index, deletes]);
+stats_from_update_arg({vnode_index_delete, _}) ->
+ riak_core_stat_q:names_and_types([?APP, vnode, index, deletes]);
+stats_from_update_arg({get_fsm, _, _, _, _, _, _}) ->
+ riak_core_stat_q:names_and_types([?APP, node, gets]);
+stats_from_update_arg({put_fsm_time, _, _, _, _}) ->
+ riak_core_stat_q:names_and_types([?APP, node, puts]);
+stats_from_update_arg({read_repairs, _, _}) ->
+ riak_core_stat_q:names_and_types([?APP, nodes, gets, read_repairs]);
+stats_from_update_arg(coord_redirs) ->
+ [{{?APP, node, puts, coord_redirs}, {metric,[],counter,undefined}}];
+stats_from_update_arg(mapper_start) ->
+ [{{?APP, mapper_count}, {metric,[],counter,undefined}}];
+stats_from_update_arg(mapper_end) ->
+ stats_from_update_arg(mapper_start);
+stats_from_update_arg(precommit_fail) ->
+ [{{?APP, precommit_fail}, {metric,[],counter,undefined}}];
+stats_from_update_arg(postcommit_fail) ->
+ [{{?APP, postcommit_fail}, {metric,[],counter,undefined}}];
+stats_from_update_arg({fsm_spawned, Type}) ->
+ [{{?APP, node, Type, fsm, active}, {metric,[],counter,undefined}}];
+stats_from_update_arg({fsm_exit, Type}) ->
+ stats_from_update_arg({fsm_spawned, Type});
+stats_from_update_arg({fsm_error, Type}) ->
+ stats_from_update_arg({fsm_spawned, Type}) ++
+ [{{?APP, node, Type, fsm, errors}, {metric,[], spiral, undefined}}];
+stats_from_update_arg({index_create, _Pid}) ->
+ [{{?APP, index, fsm, create}, {metric, [], spiral, undefined}}];
+stats_from_update_arg(index_create_error) ->
+ [{{?APP, index, fsm, create, error}, {metric, [], spiral, undefined}}];
+stats_from_update_arg({list_create, _Pid}) ->
+ [{{?APP, list, fsm, create}, {metric, [], spiral, undefined}}];
+stats_from_update_arg(list_create_error) ->
+ [{{?APP, list, fsm, create, error}, {metric, [], spiral, undefined}}];
+stats_from_update_arg(_) ->
+ [].
-ifdef(TEST).
-define(LEVEL_STATUS(Idx, Val), [{Idx, [{backend_status, riak_kv_eleveldb_backend,
View
89 src/riak_kv_stat_bc.erl
@@ -143,8 +143,7 @@
%% of stats.
produce_stats() ->
lists:append(
- [lists:flatten(backwards_compat(riak_core_stat_q:get_stats([riak_kv]))),
- backwards_compat_pb(riak_core_stat_q:get_stats([riak_api])),
+ [lists:flatten(legacy_stats()),
read_repair_stats(),
level_stats(),
pipe_stats(),
@@ -163,30 +162,56 @@ produce_stats() ->
%% naming constraints the new names are not simply the old names
%% with commas for underscores. Uses legacy_stat_map to generate
%% legacys stats from the new list of stats.
-backwards_compat(Stats) ->
- [bc_stat(Old, New, Type, Stats) || {Old, New, Type} <- legacy_stat_map()].
+legacy_stats() ->
+ {Legacy, _Calculated} = lists:foldl(fun({Old, New, Type}, {Acc, Cache}) ->
+ bc_stat({Old, New, Type}, Acc, Cache) end,
+ {[], []},
+ legacy_stat_map()),
+ lists:reverse(Legacy).
-bc_stat(Old, {New, Field}, histogram_percentile, Stats) ->
- Stat = proplists:get_value(New, Stats),
- Percentile = proplists:get_value(percentile, Stat),
- Val = proplists:get_value(Field, Percentile),
- {Old, trunc(Val)};
-bc_stat(Old, {New, Field}, histogram, Stats) ->
- Stat = proplists:get_value(New, Stats),
- Val = proplists:get_value(Field, Stat),
- {Old, trunc(Val)};
-bc_stat(Old, {New, Field}, spiral, Stats) ->
- Stat = proplists:get_value(New, Stats),
- Val = proplists:get_value(Field, Stat),
- {Old, Val};
-bc_stat(Old, New, counter, Stats) ->
- Stat = proplists:get_value(New, Stats),
- {Old, Stat}.
+%% @doc legacy stats uses multifield stats for multiple stats
+%% don't calculate the same stat many times
+get_stat(Name, Type, Cache) ->
+ get_stat(Name, Type, Cache, fun(S) -> S end).
+get_stat(Name, Type, Cache, ValFun) ->
+ case proplists:get_value(Name, Cache) of
+ undefined ->
+ case riak_core_stat_q:calc_stat({Name, Type}) of
+ unavailable -> {unavailable, Cache};
+ Stat ->
+ {ValFun(Stat), [{Name, Stat} | Cache]}
+ end;
+ Cached -> {ValFun(Cached), Cache}
+ end.
+
+bc_stat({Old, {NewName, Field}, histogram}, Acc, Cache) ->
+ ValFun = fun(Stat) -> trunc(proplists:get_value(Field, Stat)) end,
+ {Val, Cache1} = get_stat(NewName, histogram, Cache, ValFun),
+ {[{Old, Val} | Acc], Cache1};
+bc_stat({Old, {NewName, Field}, histogram_percentile}, Acc, Cache) ->
+ ValFun = fun(Stat) ->
+ Percentile = proplists:get_value(percentile, Stat),
+ Val = proplists:get_value(Field, Percentile),
+ trunc(Val) end,
+ {Val, Cache1} = get_stat(NewName, histogram, Cache, ValFun),
+ {[{Old, Val} | Acc], Cache1};
+bc_stat({Old, {NewName, Field}, spiral}, Acc, Cache) ->
+ ValFun = fun(Stat) ->
+ proplists:get_value(Field, Stat)
+ end,
+ {Val, Cache1} = get_stat(NewName, spiral, Cache, ValFun),
+ {[{Old, Val} | Acc], Cache1};
+bc_stat({Old, NewName, counter}, Acc, Cache) ->
+ {Val, Cache1} = get_stat(NewName, counter, Cache),
+ {[{Old, Val} | Acc], Cache1};
+bc_stat({Old, NewName, function}, Acc, Cache) ->
+ {Val, Cache1} = get_stat(NewName, gauge, Cache),
+ {[{Old, Val} | Acc], Cache1}.
%% hard coded mapping of stats to legacy format
%% There was a enough variation in the old names that a simple
-%% concatenation of the elements in the new stat key would not suffice
+%% concatenation of the elements in the new stat key would not suffice
%% applications depend on these exact legacy names.
legacy_stat_map() ->
[{vnode_gets, {{riak_kv, vnode, gets}, one}, spiral},
@@ -238,17 +263,12 @@ legacy_stat_map() ->
{index_fsm_active, {riak_kv, index, fsm, active}, counter},
{list_fsm_create, {{riak_kv, list, fsm, create}, one}, spiral},
{list_fsm_create_error, {{riak_kv, list, fsm, create, error}, one}, spiral},
- {list_fsm_active, {riak_kv, list, fsm, active}, counter}
+ {list_fsm_active, {riak_kv, list, fsm, active}, counter},
+ {pbc_active, {riak_api, pbc_connects, active}, function},
+ {pbc_connects, {{riak_api, pbc_connects}, one}, spiral},
+ {pbc_connects_total, {{riak_api, pbc_connects}, count}, spiral}
].
-%% PB stats are now under riak_api. In the past they were part of riak_kv.
-%% This function maps those new values to the old names.
-backwards_compat_pb(Stats) ->
- [bc_stat(Old, New, Type, Stats) || {Old, New, Type} <-
- [{pbc_active, {riak_api, pbc_connects, active}, counter},
- {pbc_connects, {{riak_api, pbc_connects}, one}, spiral},
- {pbc_connects_total, {{riak_api, pbc_connects}, count}, spiral}]].
-
%% @spec cpu_stats() -> proplist()
%% @doc Get stats on the cpu, as given by the cpu_sup module
%% of the os_mon application.
@@ -325,14 +345,15 @@ config_stats() ->
%% @doc add the pipe stats to the blob in a style consistent
%% with those stats already in the blob
pipe_stats() ->
- Stats = riak_core_stat_q:get_stats([riak_pipe]),
- lists:flatten([bc_stat(Name, Val) || {Name, Val} <- Stats]).
+ lists:flatten([bc_stat(Name, Val) || {Name, Val} <- riak_pipe_stat:get_stats()]).
%% old style blob stats don't have the app name
%% and they have underscores, not commas
-bc_stat(Name, Val) ->
+bc_stat(Name, Val) when is_tuple(Name) ->
StatName = join(tl(tuple_to_list(Name))),
- bc_stat_val(StatName, Val).
+ bc_stat_val(StatName, Val);
+bc_stat(Name, Val) ->
+ bc_stat_val(Name, Val).
%% Old style stats don't have tuple lists as values
%% they have an entry per element in the complex stats tuple list
View
103 src/riak_kv_util.erl
@@ -35,6 +35,8 @@
make_request/2,
get_index_n/2,
preflist_siblings/1,
+ fix_incorrect_index_entries/1,
+ fix_incorrect_index_entries/0,
responsible_preflists/1,
responsible_preflists/2]).
@@ -230,6 +232,107 @@ determine_all_n(Ring) ->
end, [DefaultN], BucketProps),
AllN.
+fix_incorrect_index_entries() ->
+ fix_incorrect_index_entries([]).
+
+fix_incorrect_index_entries(Opts) when is_list(Opts) ->
+ MaxN = proplists:get_value(concurrency, Opts, 2),
+ ForUpgrade = not proplists:get_value(downgrade, Opts, false),
+ BatchSize = proplists:get_value(batch_size, Opts, 100),
+ lager:info("index reformat: starting with concurrency: ~p, batch size: ~p, for upgrade: ~p",
+ [MaxN, BatchSize, ForUpgrade]),
+ IdxList = [Idx || {riak_kv_vnode, Idx, _} <- riak_core_vnode_manager:all_vnodes()],
+ FixOpts = [{batch_size, BatchSize}, {downgrade, not ForUpgrade}],
+ F = fun(X) -> fix_incorrect_index_entries(X, FixOpts) end,
+ Counts = riak_core_util:pmap(F, IdxList, MaxN),
+ {SuccessCounts, IgnoredCounts, ErrorCounts} = lists:unzip3(Counts),
+ SuccessTotal = lists:sum(SuccessCounts),
+ IgnoredTotal = lists:sum(IgnoredCounts),
+ ErrorTotal = lists:sum(ErrorCounts),
+ case ErrorTotal of
+ 0 ->
+ lager:info("index reformat: complete on all partitions. Fixed: ~p, Ignored: ~p",
+ [SuccessTotal, IgnoredTotal]);
+ _ ->
+ lager:info("index reformat: encountered ~p errors reformatting keys. Please re-run",
+ [ErrorTotal])
+ end,
+ {SuccessTotal, IgnoredTotal, ErrorTotal}.
+
+fix_incorrect_index_entries(Idx, FixOpts) ->
+ fix_incorrect_index_entries(Idx, fun fix_incorrect_index_entry/4, {0, 0, 0}, FixOpts).
+
+fix_incorrect_index_entries(Idx, FixFun, Acc0, FixOpts) ->
+ Ref = make_ref(),
+ ForUpgrade = not proplists:get_value(downgrade, FixOpts, false),
+ lager:info("index reformat: querying partition ~p for index entries to reformat", [Idx]),
+ riak_core_vnode_master:command({Idx, node()},
+ {get_index_entries, FixOpts},
+ {raw, Ref, self()},
+ riak_kv_vnode_master),
+ case process_incorrect_index_entries(Ref, Idx, ForUpgrade, FixFun, Acc0) of
+ ignore -> Acc0;
+ {_,_,ErrorCount}=Res ->
+ MarkRes = mark_indexes_reformatted(Idx, ErrorCount, ForUpgrade),
+ case MarkRes of
+ error ->
+ %% there was an error marking the partition as reformatted. treat this like
+ %% any other error (indicating the need to re-run reformatting)
+ {element(1, Res), element(2, Res), 1};
+ _ -> Res
+ end
+ end.
+
+fix_incorrect_index_entry(Idx, ForUpgrade, BadKeys, {Success, Ignore, Error}) ->
+ Res = riak_core_vnode_master:sync_command({Idx, node()},
+ {fix_incorrect_index_entry, BadKeys, ForUpgrade},
+ riak_kv_vnode_master),
+ case Res of
+ ok ->
+ {Success+1, Ignore, Error};
+ ignore ->
+ {Success, Ignore+1, Error};
+ {error, _} ->
+ {Success, Ignore, Error+1};
+ {S, I, E} ->
+ {Success+S, Ignore+I, Error+E}
+ end.
+
+%% needs to take an acc to count success/error/ignore
+process_incorrect_index_entries(Ref, Idx, ForUpgrade, FixFun, {S, I, E} = Acc) ->
+ receive
+ {Ref, ignore} ->
+ lager:info("index reformat: ignoring partition ~p", [Idx]),
+ ignore;
+ {Ref, done} ->
+ lager:info("index reformat: finished with partition ~p, Fixed=~p, Ignored=~p, Errors=~p", [Idx, S, I, E]),
+ Acc;
+ {Ref, {Pid, BatchRef, Keys}} ->
+ {NS, NI, NE} = NextAcc = FixFun(Idx, ForUpgrade, Keys, Acc),
+ ReportN = 10000,
+ case ((NS+NI+NE) div ReportN) /= ((S+I+E) div ReportN) of
+ true ->
+ lager:info("index reformat: reformatting partition ~p, Fixed=~p, Ignore=~p, Error=~p", [Idx, NS, NI, NE]);
+ false ->
+ ok
+ end,
+ ack_incorrect_keys(Pid, BatchRef),
+ process_incorrect_index_entries(Ref, Idx, ForUpgrade, FixFun, NextAcc)
+ end.
+
+ack_incorrect_keys(Pid, Ref) ->
+ Pid ! {ack_keys, Ref}.
+
+mark_indexes_reformatted(Idx, 0, ForUpgrade) ->
+ riak_core_vnode_master:sync_command({Idx, node()},
+ {fix_incorrect_index_entry, {done, ForUpgrade}},
+ riak_kv_vnode_master),
+ lager:info("index reformat: marked partition ~p as fixed", [Idx]),
+ ok;
+mark_indexes_reformatted(_Idx, _ErrorCount, _ForUpgrade) ->
+ undefined.
+
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
View
78 src/riak_kv_vnode.erl
@@ -28,6 +28,7 @@
%% API
-export([test_vnode/1, put/7]).
-export([start_vnode/1,
+ start_vnodes/1,
get/3,
del/3,
put/6,
@@ -143,6 +144,9 @@ maybe_create_hashtrees(true, State=#state{idx=Index}) ->
start_vnode(I) ->
riak_core_vnode_master:get_vnode_pid(I, riak_kv_vnode).
+start_vnodes(IdxList) ->
+ riak_core_vnode_master:get_vnode_pid(IdxList, riak_kv_vnode).
+
test_vnode(I) ->
riak_core_vnode:start_link(riak_kv_vnode, I, infinity).
@@ -522,7 +526,79 @@ handle_command(?KV_VNODE_STATUS_REQ{},
{reply, {vnode_status, Index, VNodeStatus}, State};
handle_command({reformat_object, BKey}, _Sender, State) ->
{Reply, UpdState} = do_reformat(BKey, State),
- {reply, Reply, UpdState}.
+ {reply, Reply, UpdState};
+handle_command({fix_incorrect_index_entry, {done, ForUpgrade}}, _Sender,
+ State=#state{mod=Mod, modstate=ModState}) ->
+ case Mod:mark_indexes_fixed(ModState, ForUpgrade) of %% only defined for eleveldb backend
+ {ok, NewModState} ->
+ {reply, ok, State#state{modstate=NewModState}};
+ {error, _Reason} ->
+ {reply, error, State}
+ end;
+handle_command({fix_incorrect_index_entry, Keys, ForUpgrade},
+ _Sender,
+ State=#state{mod=Mod,
+ modstate=ModState}) ->
+ Reply =
+ case Mod:fix_index(Keys, ForUpgrade, ModState) of
+ {ok, _UpModState} ->
+ ok;
+ {ignore, _UpModState} ->
+ ignore;
+ {error, Reason, _UpModState} ->
+ {error, Reason};
+ {reply, Totals, _UpModState} ->
+ Totals
+ end,
+ {reply, Reply, State};
+handle_command({get_index_entries, Opts},
+ Sender,
+ State=#state{mod=Mod,
+ modstate=ModState0}) ->
+ ForUpgrade = not proplists:get_value(downgrade, Opts, false),
+ BufferSize = proplists:get_value(batch_size, Opts, 1),
+ {ok, Caps} = Mod:capabilities(ModState0),
+ case lists:member(index_reformat, Caps) of
+ true ->
+ ModState = Mod:set_legacy_indexes(ModState0, not ForUpgrade),
+ Status = Mod:fixed_index_status(ModState),
+ case {ForUpgrade, Status} of
+ {true, true} -> {reply, done, State};
+ {_, _} ->
+ BufferMod = riak_kv_fold_buffer,
+ ResultFun =
+ fun(Results) ->
+ % Send result batch and wait for acknowledgement
+ % before moving on (backpressure to avoid flooding caller).
+ BatchRef = make_ref(),
+ riak_core_vnode:reply(Sender, {self(), BatchRef, Results}),
+ Monitor = riak_core_vnode:monitor(Sender),
+ receive
+ {ack_keys, BatchRef} ->
+ erlang:demonitor(Monitor, [flush]);
+ {'DOWN', Monitor, process, _Pid, _Reason} ->
+ throw(index_reformat_client_died)
+ end
+ end,
+ Buffer = BufferMod:new(BufferSize, ResultFun),
+ FoldFun = fun(B, K, Buf) -> BufferMod:add({B, K}, Buf) end,
+ FinishFun =
+ fun(FinalBuffer) ->
+ BufferMod:flush(FinalBuffer),
+ riak_core_vnode:reply(Sender, done)
+ end,
+ FoldOpts = [{index, incorrect_format, ForUpgrade}, async_fold],
+ case list(FoldFun, FinishFun, Mod, fold_keys, ModState, FoldOpts, Buffer) of
+ {async, AsyncWork} ->
+ {async, {fold, AsyncWork, FinishFun}, Sender, State};
+ _ ->
+ {noreply, State}
+ end
+ end;
+ false ->
+ lager:error("Backend ~p does not support incorrect index query", [Mod]),
+ {reply, ignore, State}
+ end.
%% @doc Handle a coverage request.
%% More information about the specification for the ItemFilter
Something went wrong with that request. Please try again.