Permalink
Browse files

Add ability to throttle gossip rate

  • Loading branch information...
1 parent 9af7c8c commit 92bc2e6f0a7d3ac1e853eb3c36f40608732f5ab9 @jtuple jtuple committed Sep 28, 2011
Showing with 25 additions and 3 deletions.
  1. +25 −3 src/riak_core_gossip.erl
View
@@ -47,7 +47,11 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
--record(state, {gossip_versions}).
+%% Default gossip rate: allow at most 45 gossip messages every 10 seconds
+-define(DEFAULT_LIMIT, {45, 10000}).
+
+-record(state, {gossip_versions,
+ gossip_tokens}).
%% ===================================================================
%% Public API
@@ -134,9 +138,12 @@ random_recursive_gossip(Ring) ->
%% @private
init(_State) ->
schedule_next_gossip(),
+ schedule_next_reset(),
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ {Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
State = update_known_versions(Ring,
- #state{gossip_versions=orddict:new()}),
+ #state{gossip_versions=orddict:new(),
+ gossip_tokens=Tokens}),
{ok, State}.
@@ -262,7 +269,12 @@ rpc_gossip_version(Ring, Node) ->
end.
%% @private
+handle_cast({send_ring_to, _Node}, State=#state{gossip_tokens=0}) ->
+ %% Out of gossip tokens, ignore the send request
+ {noreply, State};
+
handle_cast({send_ring_to, Node}, State) ->
+ lager:debug("Sending ring to ~p~n", [Node]),
{ok, MyRing0} = riak_core_ring_manager:get_raw_ring(),
MyRing = update_gossip_version(MyRing0),
GossipVsn = case gossip_version() of
@@ -273,7 +285,8 @@ handle_cast({send_ring_to, Node}, State) ->
end,
RingOut = riak_core_ring:downgrade(GossipVsn, MyRing),
gen_server:cast({?MODULE, Node}, {reconcile_ring, RingOut}),
- {noreply, State};
+ Tokens = State#state.gossip_tokens - 1,
+ {noreply, State#state{gossip_tokens=Tokens}};
handle_cast({distribute_ring, Ring}, State) ->
RingOut = case check_legacy_gossip(Ring, State) of
@@ -303,6 +316,11 @@ handle_cast({reconcile_ring, RingIn}, State) ->
{noreply, State2}
end;
+handle_cast(reset_tokens, State) ->
+ schedule_next_reset(),
+ {Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
+ {noreply, State#state{gossip_tokens=Tokens}};
+
handle_cast(gossip_ring, State) ->
% First, schedule the next round of gossip...
schedule_next_gossip(),
@@ -360,6 +378,10 @@ schedule_next_gossip() ->
Interval = random:uniform(MaxInterval),
timer:apply_after(Interval, gen_server, cast, [?MODULE, gossip_ring]).
+schedule_next_reset() ->
+ {_, Reset} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
+ timer:apply_after(Reset, gen_server, cast, [?MODULE, reset_tokens]).
+
reconcile(Ring0, [OtherRing0]) ->
%% Due to rolling upgrades and legacy gossip, a ring's cluster name
%% may be temporarily undefined. This is eventually fixed by the claimant.

0 comments on commit 92bc2e6

Please sign in to comment.