Skip to content

Commit

Permalink
Merge pull request #1002 from basho/jdb-ensemble-integrity
Browse files Browse the repository at this point in the history
Update to latest riak_ensemble integrity approach

Reviewed-by: andrewjstone
  • Loading branch information
borshop committed Jul 12, 2014
2 parents 30f3ba5 + 1519686 commit f3c570a
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 220 deletions.
16 changes: 0 additions & 16 deletions priv/riak_kv.schema
Expand Up @@ -25,22 +25,6 @@
end
}.

%% @doc Level of trust that the strong consistency system places on durable
%% storage.
%%
%% * high: assumes that vnodes are durable -- that once data has been committed
%% to a vnode, that data will never be silently lost. In this mode, K/V
%% ensembles are not required to perform an AAE exchange before being trusted.
%% * medium: assumes that vnodes are not durable, and that they may silently
%% lose data without detection. This mode requires all K/V peers that
%% restart/crash to first perform an AAE exchange with a majority of other
%% peers before being trusted.
{mapping, "strong_consistency.trust", "riak_kv.consistency_trust", [
{datatype, {enum, [medium, high]}},
{default, medium},
{commented, medium}
]}.

%% @doc Specifies the storage engine used for Riak's key-value data
%% and secondary indexes (if supported).
{mapping, "storage_backend", "riak_kv.storage_backend", [
Expand Down
1 change: 1 addition & 0 deletions src/riak_client.erl
Expand Up @@ -44,6 +44,7 @@
-export([get_stats/2]).
-export([get_client_id/1]).
-export([for_dialyzer_only_ignore/3]).
-export([ensemble/1]).

-compile({no_auto_import,[put/2]}).
%% @type default_timeout() = 60000
Expand Down
83 changes: 8 additions & 75 deletions src/riak_kv_ensemble_backend.erl
Expand Up @@ -30,11 +30,10 @@
-export([obj_epoch/1, obj_seq/1, obj_key/1, obj_value/1]).
-export([set_obj_epoch/2, set_obj_seq/2, set_obj_value/2]).
-export([get/3, put/4, tick/5, ping/2, ready_to_start/0]).
-export([trusted/1, sync_request/2, sync/2]).
-export([synctree_path/2]).
-export([reply/2]).
-export([obj_newer/2]).
-export([handle_down/4]).
-export([trust/0]).

-include_lib("riak_ensemble/include/riak_ensemble_types.hrl").

Expand Down Expand Up @@ -162,79 +161,6 @@ reply(From, Reply) ->

%%===================================================================

trust() ->
app_helper:get_env(riak_kv, consistency_trust, medium).

trusted(#state{id=Id}) ->
{{kv, _PL, _N, Idx}, _} = Id,
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
case trust() of
high ->
{true, Pid};
medium ->
{false, Pid}
end.

-spec sync_request(riak_ensemble_backend:from(), state()) -> state().
sync_request(From, State=#state{proxy=Proxy}) ->
%% TODO: Do we care about this being dropped when overloaded?
catch Proxy ! {ensemble_sync, From},
State.

-spec sync([{peer_id(), orddict:orddict()}], state()) -> {ok, state()} |
{async, state()} |
{{error,_}, state()}.
sync(Replies, State=#state{ensemble=_Ensemble, id=Id}) ->
Peers0 = [{Idx, PeerId} || {PeerId={{kv,_PL,_N,Idx},_Node},_Reply} <- Replies],
Peers = orddict:from_list(Peers0),
{{kv, PL, N, Idx}, _} = Id,
IndexN = {PL,N},
%% Sort to remove duplicates when changing ownership / forwarded response
Siblings0 = lists:usort([I || {{{kv,_PL,_N,I},_Node},_Reply} <- Replies]),
%% Just in case, remove self from list
Siblings = Siblings0 -- [Idx],

case local_partition(Idx) of
true ->
T0 = erlang:now(),
Pid = self(),
spawn_link(fun() ->
wait_for_sync(Idx, IndexN, Pid, T0, Siblings, Peers)
end),
{async, State};
false ->
{ok, State}
end.

wait_for_sync(Idx, IndexN, Pid, T0, Siblings, Peers) ->
Exchanges = riak_kv_entropy_info:exchanges(Idx, IndexN),
Recent = [OtherIdx || {OtherIdx, T1, _} <- Exchanges,
T1 > T0],
%% lager:info("~p/~p: Exchanges: ~p~nT0: ~p~nRecent: ~p~nSibs: ~p",
%% [Idx, IndexN, Exchanges, T0, Recent, Siblings]),
%% Need = length(Siblings),
%% Finished = length(Recent),
Local = local_partition(Idx),
Complete = ((Siblings -- Recent) =:= []),
if not Local ->
%% lager:info("Partition ownership changed. No need to sync."),
riak_ensemble_backend:sync_complete(Pid, []);
Complete ->
%% lager:info("Complete ~b/~b :: ~p -> ~p~n", [Finished, Need, Idx, Pid]),
SyncPeers = [orddict:fetch(PeerIdx, Peers) || PeerIdx <- Siblings],
riak_ensemble_backend:sync_complete(Pid, SyncPeers);
true ->
%% lager:info("Not yet ~b/~b :: ~p", [Finished, Need, Idx]),
timer:sleep(1000),
wait_for_sync(Idx, IndexN, Pid, T0, Siblings, Peers)
end.

local_partition(Index) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
chashbin:index_owner(Index, CHBin) =:= node().

%%===================================================================

-spec handle_down(reference(), pid(), term(), state()) -> false |
{reset, state()}.
handle_down(Ref, _Pid, Reason, #state{id=Id,
Expand Down Expand Up @@ -308,3 +234,10 @@ ping(From, State=#state{proxy=Proxy}) ->
ready_to_start() ->
lists:member(riak_kv, riak_core_node_watcher:services(node())).

synctree_path(_Ensemble, Id) ->
{{kv, PL, N, Idx}, _} = Id,
Bin = term_to_binary({PL, N}),
%% Use a prefix byte to leave open the possibility of different
%% tree id encodings (eg. not term_to_binary) in the future.
TreeId = <<0, Bin/binary>>,
{TreeId, "kv_" ++ integer_to_list(Idx)}.
94 changes: 49 additions & 45 deletions src/riak_kv_ensemble_console.erl
Expand Up @@ -26,15 +26,16 @@
-compile(export_all).

-type ensembles() :: [{ensemble_id(), ensemble_info()}].
-type quorums() :: orddict(ensemble_id(), {leader_id(), [peer_id()]}).
-type quorums() :: orddict(ensemble_id(), {leader_id(), boolean(), [peer_id()]}).
-type counts() :: orddict(node(), pos_integer()).
-type labels() :: [{pos_integer(), peer_id()}].
-type names() :: [{peer_id(), string()}].

-record(details, {enabled :: boolean(),
active :: boolean(),
aae_enabled :: boolean(),
trust :: high | medium,
validation :: strong | weak,
metadata :: async | sync,
ensembles :: ensembles(),
quorums :: quorums(),
peer_info :: orddict(peer_id(), peer_info()),
Expand Down Expand Up @@ -77,31 +78,34 @@ ensemble_detail(N) ->

print_overview(#details{enabled=Enabled,
active=Active,
trust=Trust,
aae_enabled=AAE,
validation=Validation,
metadata=Metadata,
nodes=Nodes,
ring_ready=RingReady}) ->
NumNodes = length(Nodes),
TrustMsg = case Trust of
high ->
"high (syncing not required)";
medium ->
"medium (AAE syncing required)"
ValidationMsg = case Validation of
strong ->
"strong (trusted majority required)";
weak ->
"weak (simple majority required)"
end,
MetadataMsg = case Metadata of
sync ->
"guaranteed replication (synchronous)";
async ->
"best-effort replication (asynchronous)"
end,

io:format("~s~n", [string:centre(" Consensus System ", 79, $=)]),
io:format("Enabled: ~s~n"
"Active: ~s~n"
"Ring Ready: ~s~n"
"Trust: ~s~n"
"AAE enabled: ~s~n~n",
[Enabled, Active, RingReady, TrustMsg, AAE]),
"Validation: ~s~n"
"Metadata: ~s~n~n",
[Enabled, Active, RingReady, ValidationMsg, MetadataMsg]),
if Enabled == false ->
io:format("Note: The consensus subsystem is not enabled.~n~n");
(Active == false) and (NumNodes < 3) ->
io:format(cluster_warning());
(AAE == false) and (Trust == medium) ->
io:format(aae_warning(), [Trust]);
true ->
ok
end,
Expand All @@ -111,10 +115,6 @@ cluster_warning() ->
("Note: The consensus subsystem will not be activated until there are more~n"
" than three nodes in this cluster.~n~n").

aae_warning() ->
("Warning: Trust level is ~s, but AAE is not enabled. Ensembles will be~n"
" unable to become trusted and will never reach quorum.~n~n").

%%%===================================================================

print_ensembles(#details{ensembles=[]}) ->
Expand All @@ -133,7 +133,7 @@ print_ensembles([], _, _) ->
print_ensembles([{Ens, Info}|T], N, AllOnline) ->
#ensemble_info{views=Views} = Info,
Names = peer_names(Views),
{Leader, Online} = orddict:fetch(Ens, AllOnline),
{Leader, _, Online} = orddict:fetch(Ens, AllOnline),
Label = case Ens of
root -> "root";
_ -> N
Expand Down Expand Up @@ -180,16 +180,17 @@ print_detail(N, #details{ensembles=L3, quorums=Quorums, peer_info=PeerInfo}) ->
{Counts, Labels} = label_peers(Views),
Names = peer_names(Counts, Labels),

{LeaderId, _} = orddict:fetch(Id, Quorums),
{LeaderId, Ready, _} = orddict:fetch(Id, Quorums),
Leader = orddict:fetch(LeaderId, Names),

Peers = [format_info(Label, Peer, PeerInfo) || {Label, Peer} <- Labels],

Header = string:centre(" Ensemble #" ++ integer_to_list(N) ++ " ", 79, $=),
io:format("~s~n", [Header]),
io:format("Id: ~p~n"
"Leader: ~s~n~n",
[Id, Leader]),
io:format("Id: ~p~n"
"Leader: ~s~n"
"Leader ready: ~p~n~n",
[Id, Leader, Ready]),
io:format("~s~n", [string:centre(" Peers ", 79, $=)]),
io:format(" Peer Status Trusted Epoch Node~n"),
print_detail_view(Views, Peers),
Expand Down Expand Up @@ -242,10 +243,22 @@ get_details() ->
true -> ordered_ensembles();
_ -> []
end,
Validation = case riak_ensemble_config:tree_validation() of
false ->
weak;
_ ->
strong
end,
Metadata = case riak_ensemble_config:synchronous_tree_updates() of
true ->
sync;
_ ->
async
end,
#details{enabled = Enabled,
active = riak_ensemble_manager:enabled(),
trust = riak_kv_ensemble_backend:trust(),
aae_enabled = riak_kv_entropy_manager:enabled(),
validation = Validation,
metadata = Metadata,
ensembles = Ensembles,
quorums = [],
nodes = riak_core_ring:ready_members(Ring),
Expand Down Expand Up @@ -280,13 +293,13 @@ get_quorums(Ensemble, Details) ->
Quorums = [ping_quorum(Ensemble)],
Details#details{quorums=Quorums}.

-spec ping_quorum(ensemble_id()) -> {ensemble_id(), {leader_id(), [peer_id()]}}.
-spec ping_quorum(ensemble_id()) -> {ensemble_id(), {leader_id(), boolean(), [peer_id()]}}.
ping_quorum(Ens) ->
case riak_ensemble_peer:ping_quorum(Ens, 10000) of
timeout ->
{Ens, {undefined, []}};
{Leader, Peers} ->
{Ens, {Leader, Peers}}
{Ens, {undefined, false, []}};
{Leader, Ready, Peers} ->
{Ens, {Leader, Ready, Peers}}
end.

get_peer_info(N, Details=#details{ensembles=Ensembles}) ->
Expand Down Expand Up @@ -369,8 +382,8 @@ peer_names(Counts, Labels) ->
print_variations() ->
Details = #details{enabled = false,
active = false,
aae_enabled = false,
trust = medium,
validation = strong,
metadata = async,
ensembles = [],
quorums = [],
ring_ready = true,
Expand All @@ -386,17 +399,11 @@ print_variations() ->
ensemble_overview(Details2),
io:format("~n~n"),

%% Enabled, enough nodes, but AAE not enabled
io:format("(*** AAE disabled when required ***)~n~n"),
Details3 = Details2#details{nodes=['dev1@127.0.0.1',
%% Enabled, enough nodes, but not yet active
Details4 = Details2#details{nodes=['dev1@127.0.0.1',
'dev2@127.0.0.1',
'dev3@127.0.0.1']},
ensemble_overview(Details3),
io:format("~n~n"),

%% Enabled, enough nodes, AAE enabled, but not yet active
io:format("(*** All good, waiting on activation ***)~n~n"),
Details4 = Details3#details{aae_enabled=true},
ensemble_overview(Details4),
io:format("~n~n"),

Expand All @@ -423,9 +430,10 @@ print_variations() ->
{test2, Dev3}]]}
}],
Quorums = [{test, {{test, Dev1},
true,
[{test, Dev1},
{test, Dev2}]}},
{test2, {undefined, []}}],
{test2, {undefined, false, []}}],
Details6 = Details5#details{ensembles=Ensembles, quorums=Quorums},
ensemble_overview(Details6),
io:format("~n~n"),
Expand All @@ -445,10 +453,6 @@ print_variations() ->
{test2, Dev2},
{test2, Dev3}]]}
}],
Quorums = [{test, {{test, Dev1},
[{test, Dev1},
{test, Dev2}]}},
{test2, {undefined, []}}],
Details7 = Details5#details{ensembles=Ensembles2, quorums=Quorums},
ensemble_overview(Details7),
io:format("~n~n"),
Expand Down

0 comments on commit f3c570a

Please sign in to comment.