Skip to content

Commit

Permalink
Return detailed replication stats for running and pending jobs
Browse files Browse the repository at this point in the history
Previously `_scheduled/docs` returned detailed replication statistics for
completed jobs only. To get the same level of details from a running or pending
jobs users had to use `_active_tasks`, which is not optimal and required jumping
between monitoring endpoints.

`info` field was originally meant to hold these statistics but they were not
implemented and it just returned `null` as a placeholder. With work for 3.0
finalizing, this might be a good time to add this improvement to avoid
disturbing the API afterwards.

Just updating the `_scheduler/docs` was not quite enough since, replications
started from the `_replicate` endpoint would not be visible there and users
would still have to access `_active_tasks` to get inspect them, so let's add
the `info` field to the `_scheduler/jobs` as well.

After this update, all states and status details from `_active_tasks` and
`_replicator` docs should be available under `_scheduler/jobs` and
`_scheduler/docs` endpoints.
  • Loading branch information
nickva committed Nov 6, 2019
1 parent d60551d commit f31b504
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 54 deletions.
11 changes: 1 addition & 10 deletions src/couch_replicator/src/couch_replicator_doc_processor.erl
Expand Up @@ -533,15 +533,6 @@ doc_lookup(Db, DocId, HealthThreshold) ->
end.


-spec ejson_state_info(binary() | nil) -> binary() | null.
ejson_state_info(nil) ->
null;
ejson_state_info(Info) when is_binary(Info) ->
Info;
ejson_state_info(Info) ->
couch_replicator_utils:rep_error_to_binary(Info).


-spec ejson_rep_id(rep_id() | nil) -> binary() | null.
ejson_rep_id(nil) ->
null;
Expand Down Expand Up @@ -579,7 +570,7 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
{database, DbName},
{id, ejson_rep_id(RepId)},
{state, RepState},
{info, ejson_state_info(StateInfo)},
{info, couch_replicator_utils:ejson_state_info(StateInfo)},
{error_count, ErrorCount},
{node, node()},
{last_updated, couch_replicator_utils:iso8601(StateTime)},
Expand Down
21 changes: 16 additions & 5 deletions src/couch_replicator/src/couch_replicator_scheduler.erl
Expand Up @@ -148,19 +148,19 @@ job_summary(JobId, HealthThreshold) ->
[{{crashed, Error}, _When} | _] ->
{crashing, crash_reason_json(Error)};
[_ | _] ->
{pending, null}
{pending, Rep#rep.stats}
end;
{undefined, ErrorCount} when ErrorCount > 0 ->
[{{crashed, Error}, _When} | _] = History,
{crashing, crash_reason_json(Error)};
{Pid, ErrorCount} when is_pid(Pid) ->
{running, null}
{running, Rep#rep.stats}
end,
[
{source, iolist_to_binary(ejson_url(Rep#rep.source))},
{target, iolist_to_binary(ejson_url(Rep#rep.target))},
{state, State},
{info, Info},
{info, couch_replicator_utils:ejson_state_info(Info)},
{error_count, ErrorCount},
{last_updated, last_updated(History)},
{start_time,
Expand Down Expand Up @@ -829,6 +829,7 @@ job_ejson(Job) ->
{database, Rep#rep.db_name},
{user, (Rep#rep.user_ctx)#user_ctx.name},
{doc_id, Rep#rep.doc_id},
{info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)},
{history, History},
{node, node()},
{start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
Expand Down Expand Up @@ -1431,7 +1432,12 @@ t_job_summary_running() ->
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
?assertEqual(running, proplists:get_value(state, Summary)),
?assertEqual(null, proplists:get_value(info, Summary)),
?assertEqual(0, proplists:get_value(error_count, Summary))
?assertEqual(0, proplists:get_value(error_count, Summary)),

Stats = [{source_seq, <<"1-abc">>}],
handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
?assertEqual({Stats}, proplists:get_value(info, Summary1))
end).


Expand All @@ -1447,7 +1453,12 @@ t_job_summary_pending() ->
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
?assertEqual(pending, proplists:get_value(state, Summary)),
?assertEqual(null, proplists:get_value(info, Summary)),
?assertEqual(0, proplists:get_value(error_count, Summary))
?assertEqual(0, proplists:get_value(error_count, Summary)),

Stats = [{doc_write_failures, 1}],
handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
?assertEqual({Stats}, proplists:get_value(info, Summary1))
end).


Expand Down
16 changes: 6 additions & 10 deletions src/couch_replicator/src/couch_replicator_scheduler_job.erl
Expand Up @@ -600,7 +600,7 @@ init_state(Rep) ->
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View,
stats = Stats
stats = couch_replicator_stats:new(Stats)
},
State#rep_state{timer = start_timer(State)}.

Expand Down Expand Up @@ -949,20 +949,16 @@ get_pending_count_int(#rep_state{source = Db}=St) ->

update_task(State) ->
#rep_state{
rep_details = #rep{id = JobId},
current_through_seq = {_, ThroughSeq},
highest_seq_done = {_, HighestSeq}
} = State,
update_scheduler_job_stats(State),
couch_task_status:update(
rep_stats(State) ++ [
Status = rep_stats(State) ++ [
{source_seq, HighestSeq},
{through_seq, ThroughSeq}
]).


update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
JobId = Rep#rep.id,
couch_replicator_scheduler:update_job_stats(JobId, Stats).
],
couch_replicator_scheduler:update_job_stats(JobId, Status),
couch_task_status:update(Status).


rep_stats(State) ->
Expand Down
50 changes: 22 additions & 28 deletions src/couch_replicator/src/couch_replicator_stats.erl
Expand Up @@ -12,14 +12,6 @@

-module(couch_replicator_stats).

-record(rep_stats, {
missing_checked = 0,
missing_found = 0,
docs_read = 0,
docs_written = 0,
doc_write_failures = 0
}).

-export([
new/0,
new/1,
Expand All @@ -39,45 +31,47 @@
new() ->
orddict:new().

new(Initializers) when is_list(Initializers) ->
orddict:from_list(Initializers).
new(Initializers0) when is_list(Initializers0) ->
Initializers1 = lists:filtermap(fun fmap/1, Initializers0),
orddict:from_list(Initializers1).

missing_checked(Stats) ->
get(missing_checked, upgrade(Stats)).
get(missing_checked, Stats).

missing_found(Stats) ->
get(missing_found, upgrade(Stats)).
get(missing_found, Stats).

docs_read(Stats) ->
get(docs_read, upgrade(Stats)).
get(docs_read, Stats).

docs_written(Stats) ->
get(docs_written, upgrade(Stats)).
get(docs_written, Stats).

doc_write_failures(Stats) ->
get(doc_write_failures, upgrade(Stats)).
get(doc_write_failures, Stats).

get(Field, Stats) ->
case orddict:find(Field, upgrade(Stats)) of
case orddict:find(Field, Stats) of
{ok, Value} ->
Value;
error ->
0
end.

increment(Field, Stats) ->
orddict:update_counter(Field, 1, upgrade(Stats)).
orddict:update_counter(Field, 1, Stats).

sum_stats(S1, S2) ->
orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)).
orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).


upgrade(#rep_stats{} = Stats) ->
orddict:from_list([
{missing_checked, Stats#rep_stats.missing_checked},
{missing_found, Stats#rep_stats.missing_found},
{docs_read, Stats#rep_stats.docs_read},
{docs_written, Stats#rep_stats.docs_written},
{doc_write_failures, Stats#rep_stats.doc_write_failures}
]);
upgrade(Stats) ->
Stats.
% Handle initializing from a status object which uses same values but different
% field names.
fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
fmap({missing_checked, _}) -> true;
fmap({missing_found, _}) -> true;
fmap({docs_read, _}) -> true;
fmap({docs_written, _}) -> true;
fmap({doc_write_failures, _}) -> true;
fmap({_, _}) -> false.
16 changes: 15 additions & 1 deletion src/couch_replicator/src/couch_replicator_utils.erl
Expand Up @@ -24,7 +24,8 @@
iso8601/1,
filter_state/3,
remove_basic_auth_from_headers/1,
normalize_rep/1
normalize_rep/1,
ejson_state_info/1
]).


Expand Down Expand Up @@ -176,6 +177,19 @@ normalize_rep(#rep{} = Rep)->
}.


-spec ejson_state_info(binary() | nil) -> binary() | null.
ejson_state_info(nil) ->
null;
ejson_state_info(Info) when is_binary(Info) ->
Info;
ejson_state_info([]) ->
null; % Status not set yet => null for compatibility reasons
ejson_state_info([{_, _} | _] = Info) ->
{Info};
ejson_state_info(Info) ->
couch_replicator_utils:rep_error_to_binary(Info).


-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").
Expand Down

0 comments on commit f31b504

Please sign in to comment.