Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: basho/riak_core
base: cet-bg-mgr-proto
...
head fork: basho/riak_core
compare: jdm-vnode-overload
  • 1 commit
  • 1 file changed
  • 0 commit comments
  • 1 contributor
Showing with 79 additions and 44 deletions.
  1. +79 −44 src/riak_core_vnode.erl
123 src/riak_core_vnode.erl
View
@@ -27,6 +27,10 @@
-export([init/1,
active/2,
active/3,
+ overload/2,
+ overload/3,
+ shed/2,
+ shed/3,
handle_event/3,
handle_sync_event/4,
handle_info/3,
@@ -175,7 +179,24 @@ core_status(VNode) ->
gen_fsm:sync_send_all_state_event(VNode, core_status).
continue(State) ->
- {next_state, active, State, State#state.inactivity_timeout}.
+ %%TODO: Cache
+ {message_queue_len, QLen} = process_info(self(), message_queue_len),
+ ShedLimit = app_helper:get_env(riak_core, shed_limit, undefined),
+ OverloadLimit = app_helper:get_env(riak_core, overload_limit, undefined),
+
+ %% Decide on next state based on queue
+ NextStateName =
+ if
+ QLen >= ShedLimit andalso is_integer(ShedLimit) ->
+ shed;
+ QLen >= OverloadLimit andalso is_integer(OverloadLimit) ->
+ overload;
+ true -> % catchall case
+ active
+ end,
+ lager:info("QLen: ~p Next: ~p Limits: ~p ~p\n",
+ [QLen, NextStateName, ShedLimit, OverloadLimit]),
+ {next_state, NextStateName, State, State#state.inactivity_timeout}.
continue(State, NewModState) ->
continue(State#state{modstate=NewModState}).
@@ -201,19 +222,22 @@ continue(State, NewModState) ->
%% In the forwarding state, all vnode commands and coverage commands are
%% forwarded to the new owner for processing.
-vnode_command(Sender, Request, State=#state{index=Index,
- mod=Mod,
- modstate=ModState,
- forward=Forward,
- pool_pid=Pool}) ->
+
+vnode_coverage(Sender, Request, KeySpaces, ModHandler,
+ State=#state{index=Index,
+ mod=Mod,
+ modstate=ModState,
+ pool_pid=Pool,
+ forward=Forward}) ->
%% Check if we should forward
case Forward of
undefined ->
- Action = Mod:handle_command(Request, Sender, ModState);
+ Action = Mod:ModHandler(Request, KeySpaces, Sender, ModState);
NextOwner ->
- lager:debug("Forwarding ~p -> ~p: ~p~n", [node(), NextOwner, Index]),
- riak_core_vnode_master:command({Index, NextOwner}, Request, Sender,
- riak_core_vnode_master:reg_name(Mod)),
+ lager:debug("Forwarding coverage ~p -> ~p: ~p~n", [node(), NextOwner, Index]),
+ riak_core_vnode_master:coverage(Request, {Index, NextOwner},
+ KeySpaces, Sender,
+ riak_core_vnode_master:reg_name(Mod)),
Action = continue
end,
case Action of
@@ -233,20 +257,20 @@ vnode_command(Sender, Request, State=#state{index=Index,
{stop, Reason, State#state{modstate=NewModState}}
end.
-vnode_coverage(Sender, Request, KeySpaces, State=#state{index=Index,
+vnode_command(Sender, Request, ModHandler, State=#state{index=Index,
mod=Mod,
modstate=ModState,
- pool_pid=Pool,
- forward=Forward}) ->
+ handoff_node=HN,
+ forward=Forward,
+ pool_pid=Pool}) ->
%% Check if we should forward
case Forward of
undefined ->
- Action = Mod:handle_coverage(Request, KeySpaces, Sender, ModState);
+ Action = Mod:ModHandler(Request, Sender, ModState);
NextOwner ->
- lager:debug("Forwarding coverage ~p -> ~p: ~p~n", [node(), NextOwner, Index]),
- riak_core_vnode_master:coverage(Request, {Index, NextOwner},
- KeySpaces, Sender,
- riak_core_vnode_master:reg_name(Mod)),
+ lager:debug("Forwarding ~p -> ~p: ~p~n", [node(), NextOwner, Index]),
+ riak_core_vnode_master:command({Index, NextOwner}, Request, Sender,
+ riak_core_vnode_master:reg_name(Mod)),
Action = continue
end,
case Action of
@@ -262,31 +286,11 @@ vnode_coverage(Sender, Request, KeySpaces, State=#state{index=Index,
%% the result is sent back to 'From'
riak_core_vnode_worker_pool:handle_work(Pool, Work, From),
continue(State, NewModState);
- {stop, Reason, NewModState} ->
- {stop, Reason, State#state{modstate=NewModState}}
- end.
-
-vnode_handoff_command(Sender, Request, State=#state{index=Index,
- mod=Mod,
- modstate=ModState,
- handoff_node=HN,
- pool_pid=Pool}) ->
- case Mod:handle_handoff_command(Request, Sender, ModState) of
- {reply, Reply, NewModState} ->
- reply(Sender, Reply),
- continue(State, NewModState);
- {noreply, NewModState} ->
- continue(State, NewModState);
- {async, Work, From, NewModState} ->
- %% dispatch some work to the vnode worker pool
- %% the result is sent back to 'From'
- riak_core_vnode_worker_pool:handle_work(Pool, Work, From),
- continue(State, NewModState);
- {forward, NewModState} ->
+ {forward, NewModState} when ModHandler == handle_handoff_command ->
riak_core_vnode_master:command({Index, HN}, Request, Sender,
riak_core_vnode_master:reg_name(Mod)),
continue(State, NewModState);
- {drop, NewModState} ->
+ {drop, NewModState} when ModHandler == handle_handoff_command ->
continue(State, NewModState);
{stop, Reason, NewModState} ->
{stop, Reason, State#state{modstate=NewModState}}
@@ -299,12 +303,12 @@ active(?COVERAGE_REQ{keyspaces=KeySpaces,
request=Request,
sender=Sender}, State) ->
%% Coverage request handled in handoff and non-handoff. Will be forwarded if set.
- vnode_coverage(Sender, Request, KeySpaces, State);
+ vnode_coverage(Sender, Request, KeySpaces, handle_coverage, State);
active(?VNODE_REQ{sender=Sender, request=Request},
State=#state{handoff_node=HN}) when HN =:= none ->
- vnode_command(Sender, Request, State);
+ vnode_command(Sender, Request, handle_command, State);
active(?VNODE_REQ{sender=Sender, request=Request},State) ->
- vnode_handoff_command(Sender, Request, State);
+ vnode_command(Sender, Request, handle_handoff_command, State);
active(handoff_complete, State) ->
State2 = start_manager_event_timer(handoff_complete, State),
continue(State2);
@@ -326,9 +330,40 @@ active(unregistered, State=#state{mod=Mod, index=Index}) ->
pool_pid=undefined}}.
active(_Event, _From, State) ->
- Reply = ok,
+ Reply = ok, % never happens, can go back to active state and resolve
{reply, Reply, active, State, State#state.inactivity_timeout}.
+%% Give the callback the opportunity to handle as overload, if not as active.
+overload(?COVERAGE_REQ{keyspaces=KeySpaces,
+ request=Request,
+ sender=Sender}, State) ->
+ %% Coverage request handled in handoff and non-handoff. Will be forwarded if set.
+ vnode_coverage(Sender, Request, KeySpaces, handle_overload_coverge, State);
+overload(?VNODE_REQ{sender=Sender, request=Request},State) ->
+ vnode_command(Sender, Request, handle_overload_command, State);
+overload(OtherEvent, State) ->
+ active(OtherEvent, State).
+
+overload(_Event, _From, State) ->
+ Reply = ok,
+ {reply, Reply, overload, State, State#state.inactivity_timeout}.
+
+
+%% Drop incoming requests as fast as possible, possibly asking vnode proxy
+%% to shed on our behalf
+shed(?COVERAGE_REQ{sender=Sender}, State) ->
+ %% Coverage request handled in handoff and non-handoff. Will be forwarded if set.
+ continue(State);
+shed(?VNODE_REQ{sender=Sender}, State) ->
+ continue(State);
+shed(OtherEvent, State) ->
+ active(OtherEvent, State).
+
+shed(_Event, _From, State) ->
+ Reply = ok, % never happens, can go back to active state and resolve
+ {reply, Reply, shed, State, State#state.inactivity_timeout}.
+
+
%% This code lives in riak_core_vnode rather than riak_core_vnode_manager
%% because the ring_trans call is a synchronous call to the ring manager,
%% and it is better to block an individual vnode rather than the vnode

No commit comments for this range

Something went wrong with that request. Please try again.