Skip to content

Commit

Permalink
mango: rolling execution statistics
Browse files Browse the repository at this point in the history
In case of map-reduce views, the arrival of the `complete` message
is not guaranteed for the view callback (at the shard) when a
`stop` is issued during the aggregation (at the coordinator).  Due
to that, internally collected shard-level statistics may not be
fed back to the coordinator which can cause data loss hence
inaccuracy in the overall execution statistics.

Address this issue by switching to a "rolling" model where
row-level statistics are immediately streamed back to the
coordinator.  Support mixed-version cluster upgrades by activating
this model only if requested through the map-reduce arguments and
the given shard supports that.

Fixes #4560
  • Loading branch information
pgj committed Mar 27, 2024
1 parent f88ba94 commit caa5e23
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 38 deletions.
17 changes: 16 additions & 1 deletion src/fabric/src/fabric_view_row.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
get_value/1,
get_doc/1,
get_worker/1,
get_stats/1,
set_key/2,
set_doc/2,
set_worker/2,
set_stats/2,
transform/1
]).

Expand Down Expand Up @@ -91,6 +93,14 @@ set_worker(#view_row{} = Row, Worker) ->
set_worker({view_row, #{} = Row}, Worker) ->
{view_row, Row#{worker => Worker}}.

get_stats({view_row, #{stats := Stats}}) ->
Stats;
get_stats({view_row, #{}}) ->
undefined.

set_stats({view_row, #{} = Row}, Stats) ->
{view_row, Row#{stats => Stats}}.

transform(#view_row{value = {[{reduce_overflow_error, Msg}]}}) ->
{row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, Msg}]};
transform(#view_row{key = Key, id = reduced, value = Value}) ->
Expand All @@ -109,8 +119,13 @@ transform({view_row, #{} = Row0}) ->
Value = maps:get(value, Row0, undefined),
Doc = maps:get(doc, Row0, undefined),
Worker = maps:get(worker, Row0, undefined),
Stats = maps:get(stats, Row0, undefined),
Row = #view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker},
transform(Row).
{row, Props} = RowProps = transform(Row),
case Stats of
undefined -> RowProps;
#{} -> {row, [{stats, Stats} | Props]}
end.

-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
Expand Down
146 changes: 109 additions & 37 deletions src/mango/src/mango_cursor_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
covering_index => 'maybe'(#idx{})
}.

-type mrargs_extra_item() ::
{callback, {atom(), atom()}}
| {selector, any()}
| {callback_args, viewcbargs()}
| {ignore_partition_query_limit, boolean()}
| {execution_stats_map, boolean()}
| {execution_stats_rolling, boolean()}.
-type mrargs_extra() :: [mrargs_extra_item()].

-spec viewcbargs_new(Selector, Fields, CoveringIndex) -> ViewCBArgs when
Selector :: selector(),
Fields :: fields(),
Expand Down Expand Up @@ -207,7 +216,9 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) -
% - Return execution statistics in a map
{execution_stats_map, true},
% - Return view rows in a map
{view_row_map, true}
{view_row_map, true},
% - Stream execution statistics
{execution_stats_rolling, true}
]
}.

Expand Down Expand Up @@ -341,6 +352,43 @@ choose_best_index(IndexRanges) ->
{SelectedIndex, SelectedIndexRanges, _} = hd(SortedIndexRanges),
{{SelectedIndex, SelectedIndexRanges}, SortedIndexRanges}.

-spec format_stats(RawStats, Options) -> FormattedStats when
RawStats :: shard_stats_v2(),
Options :: mrargs_extra(),
FormattedStats :: shard_stats_v1() | shard_stats_v2().
format_stats(Stats, Options) when is_list(Options) ->
case couch_util:get_value(execution_stats_map, Options, false) of
true ->
Stats;
false ->
#{docs_examined := DocsExamined} = Stats,
{docs_examined, DocsExamined}
end.

-spec submit_stats(Options) -> ok when
Options :: mrargs_extra().
submit_stats(Options) when is_list(Options) ->
ShardStats = mango_execution_stats:shard_get_stats(),
Stats = format_stats(ShardStats, Options),
% Send execution stats in batch (shard-level)
ok = rexi:stream2({execution_stats, Stats}).

-spec roll_stats(ViewRow, Options) -> ViewRow when
ViewRow :: view_row(),
Options :: mrargs_extra().
roll_stats(ViewRow, Options) when is_list(Options) ->
ViewRowMap = couch_util:get_value(view_row_map, Options, false),
RollingStats = couch_util:get_value(execution_stats_rolling, Options, false),
case ViewRowMap andalso RollingStats of
true ->
ShardStats = mango_execution_stats:shard_get_stats(),
mango_execution_stats:shard_init(),
Stats = format_stats(ShardStats, Options),
fabric_view_row:set_stats(ViewRow, Stats);
false ->
ViewRow
end.

-spec view_cb
(Message, #mrargs{}) -> Response when
Message :: {meta, any()} | {row, row_properties()} | complete,
Expand Down Expand Up @@ -382,7 +430,8 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
case match_and_extract_doc(Doc, Selector, Fields) of
{match, FinalDoc} ->
ViewRow1 = fabric_view_row:set_doc(ViewRow, FinalDoc),
ok = rexi:stream2(ViewRow1),
ViewRow2 = roll_stats(ViewRow1, Options),
ok = rexi:stream2(ViewRow2),
set_mango_msg_timestamp();
{no_match, undefined} ->
maybe_send_mango_ping()
Expand All @@ -397,7 +446,8 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
Process(Doc);
{undefined, _} ->
% include_docs=false. Use quorum fetch at coordinator
ok = rexi:stream2(ViewRow),
ViewRow1 = roll_stats(ViewRow, Options),
ok = rexi:stream2(ViewRow1),
set_mango_msg_timestamp();
{Doc, _} ->
mango_execution_stats:shard_incr_docs_examined(),
Expand All @@ -406,17 +456,7 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
end,
{ok, Acc};
view_cb(complete, #mrargs{extra = Options} = Acc) ->
ShardStats = mango_execution_stats:shard_get_stats(),
Stats =
case couch_util:get_value(execution_stats_map, Options, false) of
true ->
ShardStats;
false ->
DocsExamined = maps:get(docs_examined, ShardStats),
{docs_examined, DocsExamined}
end,
% Send shard-level execution stats
ok = rexi:stream2({execution_stats, Stats}),
submit_stats(Options),
% Finish view output
ok = rexi:stream_last(complete),
{ok, Acc};
Expand Down Expand Up @@ -472,6 +512,21 @@ maybe_send_mango_ping() ->
set_mango_msg_timestamp() ->
put(mango_last_msg_timestamp, os:timestamp()).

-spec add_shard_stats(#execution_stats{}, shard_stats()) -> #execution_stats{}.
add_shard_stats(Stats0, {docs_examined, DocsExamined}) ->
mango_execution_stats:incr_docs_examined(Stats0, DocsExamined);
add_shard_stats(Stats0, #{} = ShardStats) ->
DocsExamined = shard_stats_get(docs_examined, ShardStats),
KeysExamined = shard_stats_get(keys_examined, ShardStats),
Stats = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
mango_execution_stats:incr_keys_examined(Stats, KeysExamined).

-spec handle_execution_stats(#cursor{}, shard_stats()) -> {ok, #cursor{}}.
handle_execution_stats(Cursor0, ShardStats) ->
#cursor{execution_stats = Stats} = Cursor0,
Cursor = Cursor0#cursor{execution_stats = add_shard_stats(Stats, ShardStats)},
{ok, Cursor}.

-spec handle_message(message(), #cursor{}) -> Response when
Response ::
{ok, #cursor{}}
Expand All @@ -495,20 +550,10 @@ handle_message({row, Props}, Cursor) ->
couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
{ok, Cursor}
end;
handle_message({execution_stats, {docs_examined, DocsExamined}}, Cursor0) ->
#cursor{execution_stats = Stats} = Cursor0,
Cursor = Cursor0#cursor{
execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
},
{ok, Cursor};
handle_message({execution_stats, {docs_examined, _} = ShardStats}, Cursor0) ->
handle_execution_stats(Cursor0, ShardStats);
handle_message({execution_stats, #{} = ShardStats}, Cursor0) ->
DocsExamined = shard_stats_get(docs_examined, ShardStats),
KeysExamined = shard_stats_get(keys_examined, ShardStats),
#cursor{execution_stats = Stats0} = Cursor0,
Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined),
Cursor = Cursor0#cursor{execution_stats = Stats},
{ok, Cursor};
handle_execution_stats(Cursor0, ShardStats);
handle_message(complete, Cursor) ->
{ok, Cursor};
handle_message({error, Reason}, _Cursor) ->
Expand Down Expand Up @@ -648,9 +693,14 @@ consider_index_coverage(Index, Fields, #mrargs{include_docs = IncludeDocs0} = Ar
| {no_match, null, {execution_stats, shard_stats()}}
| any().
doc_member_and_extract(Cursor, RowProps) ->
Db = Cursor#cursor.db,
Opts = Cursor#cursor.opts,
ExecutionStats = Cursor#cursor.execution_stats,
#cursor{db = Db, opts = Opts, execution_stats = ExecutionStats0} = Cursor,
ExecutionStats =
case couch_util:get_value(stats, RowProps) of
undefined ->
ExecutionStats0;
ShardStats ->
add_shard_stats(ExecutionStats0, ShardStats)
end,
Selector = Cursor#cursor.selector,
case couch_util:get_value(doc, RowProps) of
{DocProps} ->
Expand Down Expand Up @@ -748,7 +798,8 @@ base_opts_test() ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
MRArgs =
#mrargs{
Expand Down Expand Up @@ -1093,7 +1144,8 @@ t_execute_ok_all_docs(_) ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
Args =
#mrargs{
Expand Down Expand Up @@ -1180,7 +1232,8 @@ t_execute_ok_query_view(_) ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
Args =
#mrargs{
Expand Down Expand Up @@ -1279,7 +1332,8 @@ t_execute_ok_all_docs_with_execution_stats(_) ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
Args =
#mrargs{
Expand Down Expand Up @@ -1394,6 +1448,8 @@ t_view_cb_row_matching_regular_doc(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1413,6 +1469,8 @@ t_view_cb_row_non_matching_regular_doc(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1432,6 +1490,8 @@ t_view_cb_row_null_doc(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1452,6 +1512,8 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand Down Expand Up @@ -1479,6 +1541,8 @@ t_view_cb_row_matching_covered_doc(_) ->
fields => Fields,
covering_index => Index
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1503,6 +1567,8 @@ t_view_cb_row_non_matching_covered_doc(_) ->
fields => Fields,
covering_index => Index
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand Down Expand Up @@ -1638,10 +1704,13 @@ t_handle_message_row_ok_above_limit(_) ->
user_acc = accumulator,
user_fun = fun foo:bar/2
},
Row = [{id, id}, {key, key}, {doc, Doc}],
ShardStats = #{keys_examined => 2, docs_examined => 3},
Row = [{id, id}, {key, key}, {doc, Doc}, {stats, ShardStats}],
Cursor1 =
Cursor#cursor{
execution_stats = #execution_stats{resultsReturned = 1},
execution_stats = #execution_stats{
resultsReturned = 1, totalKeysExamined = 2, totalDocsExamined = 3
},
limit = 8,
user_acc = updated_accumulator,
bookmark_docid = id,
Expand Down Expand Up @@ -1689,12 +1758,15 @@ t_handle_message_row_ok_triggers_quorum_fetch_match(_) ->
user_acc = accumulator,
limit = 1
},
Row = [{id, id}, {doc, undefined}],
ShardStats = #{keys_examined => 2, docs_examined => 3},
Row = [{id, id}, {doc, undefined}, {stats, ShardStats}],
Cursor1 =
Cursor#cursor{
execution_stats =
#execution_stats{
totalQuorumDocsExamined = 1,
totalKeysExamined = 2,
totalDocsExamined = 3,
resultsReturned = 1
},
user_acc = updated_accumulator,
Expand Down

0 comments on commit caa5e23

Please sign in to comment.