Skip to content

Commit

Permalink
Retry when qmn receives MOVED from any node
Browse files Browse the repository at this point in the history
  • Loading branch information
wandermyz committed Mar 17, 2018
1 parent 00a357e commit 6a95fb0
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 16 deletions.
66 changes: 51 additions & 15 deletions src/eredis_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,30 @@ transaction(Transaction, Slot, ExpectedValue, Counter) ->
%% @end
%% =============================================================================
-spec qmn(redis_pipeline_command()) -> redis_pipeline_result().
qmn(Commands) ->
{CommandsByPools, MappingInfo} = split_by_pools(Commands),
qmn2(CommandsByPools, MappingInfo, []).
qmn(Commands) -> qmn(Commands, 0).

qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc) ->
qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) ->
{error, no_connection};
qmn(Commands, Counter) ->
%% Throttle retries
throttle_retries(Counter),

{CommandsByPools, MappingInfo, Version} = split_by_pools(Commands),
case qmn2(CommandsByPools, MappingInfo, [], Version) of
retry -> qmn(Commands, Counter + 1);
Res -> Res
end.

qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc, Version) ->
Transaction = fun(Worker) -> qw(Worker, PoolCommands) end,
Res = eredis_cluster_pool:transaction(Pool, Transaction),
MappedRes = lists:zip(Mapping,Res),
qmn2(T1, T2, MappedRes ++ Acc);
qmn2([], [], Acc) ->
Result = eredis_cluster_pool:transaction(Pool, Transaction),
case handle_transaction_result(Result, Version, check_pipeline_result) of
retry -> retry;
Res ->
MappedRes = lists:zip(Mapping,Res),
qmn2(T1, T2, MappedRes ++ Acc, Version)
end;
qmn2([], [], Acc, _) ->
SortedAcc =
lists:sort(
fun({Index1, _},{Index2, _}) ->
Expand Down Expand Up @@ -125,10 +139,10 @@ split_by_pools([Command | T], Index, CmdAcc, MapAcc, State) ->
{[{Pool, CmdList2} | CmdAcc2], [{Pool, MapList2} | MapAcc2]}
end,
split_by_pools(T, Index+1, NewAcc1, NewAcc2, State);
split_by_pools([], _Index, CmdAcc, MapAcc, _State) ->
split_by_pools([], _Index, CmdAcc, MapAcc, State) ->
CmdAcc2 = [{Pool, lists:reverse(Commands)} || {Pool, Commands} <- CmdAcc],
MapAcc2 = [{Pool, lists:reverse(Mapping)} || {Pool, Mapping} <- MapAcc],
{CmdAcc2, MapAcc2}.
{CmdAcc2, MapAcc2, eredis_cluster_monitor:get_state_version(State)}.

%% =============================================================================
%% @doc Wrapper function for command using pipelined commands
Expand Down Expand Up @@ -169,29 +183,51 @@ query(Transaction, Slot, Counter) ->

{Pool, Version} = eredis_cluster_monitor:get_pool_by_slot(Slot),

case eredis_cluster_pool:transaction(Pool, Transaction) of
% If we detect a node went down, we should probably refresh the slot
Result = eredis_cluster_pool:transaction(Pool, Transaction),
case handle_transaction_result(Result, Version) of
retry -> query(Transaction, Slot, Counter + 1);
Result -> Result
end.

handle_transaction_result(Result, Version) ->
case Result of
% If we detect a node went down, we should probably refresh the slot
% mapping.
{error, no_connection} ->
eredis_cluster_monitor:refresh_mapping(Version),
query(Transaction, Slot, Counter+1);
retry;

% If the tcp connection is closed (connection timeout), the redis worker
% will try to reconnect, thus the connection should be recovered for
% the next request. We don't need to refresh the slot mapping in this
% case
{error, tcp_closed} ->
query(Transaction, Slot, Counter+1);
retry;

% Redis explicitly say our slot mapping is incorrect, we need to refresh
% it
{error, <<"MOVED ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
query(Transaction, Slot, Counter+1);
retry;

Payload ->
Payload
end.
handle_transaction_result(Result, Version, check_pipeline_result) ->
case handle_transaction_result(Result, Version) of
retry -> retry;
Payload when is_list(Payload) ->
Pred = fun({error, <<"MOVED ", _/binary>>}) -> true;
(_) -> false
end,
case lists:any(Pred, Payload) of
false -> Payload;
true ->
eredis_cluster_monitor:refresh_mapping(Version),
retry
end;
Payload -> Payload
end.

-spec throttle_retries(integer()) -> ok.
throttle_retries(0) -> ok;
Expand Down
5 changes: 4 additions & 1 deletion src/eredis_cluster_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-export([start_link/0]).
-export([connect/1]).
-export([refresh_mapping/1]).
-export([get_state/0]).
-export([get_state/0, get_state_version/1]).
-export([get_pool_by_slot/1, get_pool_by_slot/2]).
-export([get_all_pools/0]).

Expand Down Expand Up @@ -48,6 +48,9 @@ get_state() ->
[{cluster_state, State}] = ets:lookup(?MODULE, cluster_state),
State.

get_state_version(State) ->
State#state.version.

-spec get_all_pools() -> [pid()].
get_all_pools() ->
State = get_state(),
Expand Down
14 changes: 14 additions & 0 deletions test/eredis_cluster_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ basic_test_() ->
end
},

{ "multi node get",
fun () ->
N=1000,
Keys = [integer_to_list(I) || I <- lists:seq(1,N)],
[eredis_cluster:q(["SETEX", Key, "50", Key]) || Key <- Keys],
Guard1 = [{ok, integer_to_binary(list_to_integer(Key))} || Key <- Keys],
?assertMatch(Guard1, eredis_cluster:qmn([["GET", Key] || Key <- Keys])),
eredis_cluster:q(["SETEX", "a", "50", "0"]),
Guard2 = [{ok, integer_to_binary(0)} || _Key <- lists:seq(1,5)],
?assertMatch(Guard2, eredis_cluster:qmn([["GET", "a"] || _I <- lists:seq(1,5)]))
end
},

% WARNING: This test will fail during rebalancing, as qmn does not guarantee transaction across nodes
{ "multi node",
fun () ->
N=1000,
Expand Down

0 comments on commit 6a95fb0

Please sign in to comment.