Permalink
Browse files

Merge branch 'az921-gossip-limit' into 1.0

  • Loading branch information...
2 parents 1cc384f + b296a3c commit 4420b9cd3f912032d2ee4b05b2da086952f9bf82 @rzezeski rzezeski committed Nov 21, 2011
Showing with 25 additions and 3 deletions.
  1. +25 −3 src/riak_core_gossip.erl
View
@@ -47,7 +47,11 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
--record(state, {gossip_versions}).
+%% Default gossip rate: allow at most 45 gossip messages every 10 seconds
+-define(DEFAULT_LIMIT, {45, 10000}).
+
+-record(state, {gossip_versions,
+ gossip_tokens}).
%% ===================================================================
%% Public API
@@ -134,9 +138,12 @@ random_recursive_gossip(Ring) ->
%% @private
init(_State) ->
schedule_next_gossip(),
+ schedule_next_reset(),
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ {Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
State = update_known_versions(Ring,
- #state{gossip_versions=orddict:new()}),
+ #state{gossip_versions=orddict:new(),
+ gossip_tokens=Tokens}),
{ok, State}.
@@ -262,7 +269,12 @@ rpc_gossip_version(Ring, Node) ->
end.
%% @private
+handle_cast({send_ring_to, _Node}, State=#state{gossip_tokens=0}) ->
+ %% Out of gossip tokens, ignore the send request
+ {noreply, State};
+
handle_cast({send_ring_to, Node}, State) ->
+ lager:debug("Sending ring to ~p~n", [Node]),
{ok, MyRing0} = riak_core_ring_manager:get_raw_ring(),
MyRing = update_gossip_version(MyRing0),
GossipVsn = case gossip_version() of
@@ -276,7 +288,8 @@ handle_cast({send_ring_to, Node}, State) ->
"Error: riak_core_gossip/send_ring_to :: "
"Sending tainted ring over gossip"),
gen_server:cast({?MODULE, Node}, {reconcile_ring, RingOut}),
- {noreply, State};
+ Tokens = State#state.gossip_tokens - 1,
+ {noreply, State#state{gossip_tokens=Tokens}};
handle_cast({distribute_ring, Ring}, State) ->
RingOut = case check_legacy_gossip(Ring, State) of
@@ -309,6 +322,11 @@ handle_cast({reconcile_ring, RingIn}, State) ->
{noreply, State2}
end;
+handle_cast(reset_tokens, State) ->
+ schedule_next_reset(),
+ {Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
+ {noreply, State#state{gossip_tokens=Tokens}};
+
handle_cast(gossip_ring, State) ->
% First, schedule the next round of gossip...
schedule_next_gossip(),
@@ -366,6 +384,10 @@ schedule_next_gossip() ->
Interval = random:uniform(MaxInterval),
timer:apply_after(Interval, gen_server, cast, [?MODULE, gossip_ring]).
+schedule_next_reset() ->
+ {_, Reset} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
+ timer:apply_after(Reset, gen_server, cast, [?MODULE, reset_tokens]).
+
reconcile(Ring0, [OtherRing0]) ->
%% Due to rolling upgrades and legacy gossip, a ring's cluster name
%% may be temporarily undefined. This is eventually fixed by the claimant.

0 comments on commit 4420b9c

Please sign in to comment.