Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: basho/riak_core
base: feuerlabs-stat-combo
...
head fork: basho/riak_core
compare: gh335-reshed-stats-1.3
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 2 files changed
  • 0 commit comments
  • 1 contributor
Commits on Jun 14, 2013
@russelldb russelldb Fix crashing stat mod never getting rescheduled
1.3.1 updated the cache to fetch stats in the background rather than
on demand. A new bug was added. If the stat mod crashes during
production of stats, it is never rescheduled.

Fix by rescheduling when crash is detected. Exponentially backoff
the schedule after an error so as not to spam the log.
f10b21d
@russelldb russelldb Fix crash when vnode Pid vanishes during stat calc 1bd8044
Showing with 40 additions and 4 deletions.
  1. +8 −1 src/riak_core_stat.erl
  2. +32 −3 src/riak_core_stat_cache.erl
View
9 src/riak_core_stat.erl
@@ -169,13 +169,20 @@ safe_trunc(X) ->
%% Provide aggregate stats for vnode queues. Compute instantaneously for now,
%% may need to cache if stats are called heavily (multiple times per seconds)
vnodeq_stats() ->
- VnodesInfo = [{Service, element(2, erlang:process_info(Pid, message_queue_len))} ||
+ VnodesInfo = [{Service, vnodeq_len(Pid)} ||
{Service, _Index, Pid} <- riak_core_vnode_manager:all_vnodes()],
ServiceInfo = lists:foldl(fun({S,MQL}, A) ->
orddict:append_list(S, [MQL], A)
end, orddict:new(), VnodesInfo),
lists:flatten([vnodeq_aggregate(S, MQLs) || {S, MQLs} <- ServiceInfo]).
+vnodeq_len(Pid) ->
+ try
+ element(2, erlang:process_info(Pid, message_queue_len))
+ catch _ ->
+ 0
+ end.
+
vnodeq_aggregate(_Service, []) ->
[]; % no vnodes, no stats
vnodeq_aggregate(Service, MQLs0) ->
View
35 src/riak_core_stat_cache.erl
@@ -47,6 +47,7 @@
%% @doc Cache item refresh rate in seconds
-define(REFRESH_RATE, 1).
-define(REFRSH_MILLIS(N), timer:seconds(N)).
+-define(MAX_REFRESH, timer:seconds(60)).
-define(ENOTREG(App), {error, {not_registered, App}}).
-define(DEFAULT_REG(Mod, RefreshRateMillis), {{Mod, produce_stats, []}, RefreshRateMillis}).
@@ -140,20 +141,25 @@ handle_cast({stats, App, Stats0, TS}, State0=#state{tab=Tab, active=Active, apps
end,
{ok, {MFA, RefreshRateMillis}} = orddict:find(App, Apps),
schedule_get_stats(RefreshRateMillis, App, MFA),
- {noreply, State};
+ Apps2 = clear_fail_count(App, Apps),
+ {noreply, State#state{apps=Apps2}};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
%% don't let a crashing stat mod crash the cache
-handle_info({'EXIT', FromPid, Reason}, State0=#state{active=Active}) when Reason /= normal ->
+handle_info({'EXIT', FromPid, Reason}, State0=#state{active=Active, apps=Apps}) when Reason /= normal ->
Reply = case awaiting_for_pid(FromPid, Active) of
not_found ->
{stop, Reason, State0};
{ok, {App, Awaiting}} ->
[gen_server:reply(From, {error, Reason}) || From <- Awaiting, From /= ?SERVER],
- {noreply, State0#state{active=orddict:erase(App, Active)}}
+ {ok, {MFA, RefreshRateMillis}} = orddict:find(App, Apps),
+ Apps2 = update_fail_count(App, Apps),
+ FailCnt = get_fail_count(App, Apps2),
+ schedule_get_stats(RefreshRateMillis, App, MFA, FailCnt),
+ {noreply, State0#state{active=orddict:erase(App, Active), apps=Apps2}}
end,
Reply;
%% @doc callback on timer timeout to keep cache fresh
@@ -170,10 +176,33 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% internal
+get_fail_count(App, Apps) ->
+ case orddict:find([App, fail], Apps) of
+ {ok, Cnt} ->
+ Cnt;
+ error ->
+ 0
+ end.
+
+clear_fail_count(App, Apps) ->
+ orddict:erase([App, fail], Apps).
+
+update_fail_count(App, Apps) ->
+ orddict:update_counter([App, fail], 1, Apps).
+
schedule_get_stats(After, App, MFA) ->
Pid = self(),
erlang:send_after(After, Pid, {get_stats, {App, MFA}}).
+schedule_get_stats(After, Apps, MFA, 0) ->
+ schedule_get_stats(After, Apps, MFA);
+schedule_get_stats(After, Apps, MFA, FailCnt) ->
+ Millis = back_off(After, FailCnt),
+ schedule_get_stats(Millis, Apps, MFA).
+
+back_off(After, FailCnt) ->
+ min(After * (1 bsl FailCnt), ?MAX_REFRESH).
+
make_freshness_stat(App, TS) ->
{make_freshness_stat_name(App), TS}.

No commit comments for this range

Something went wrong with that request. Please try again.