diff --git a/rebar.config.script b/rebar.config.script index 1bc63675519..60ae4484258 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -142,6 +142,7 @@ SubDirs = [ "src/smoosh", "src/weatherreport", "src/couch_prometheus", + "src/couch_scanner", "rel" ]. diff --git a/rel/reltool.config b/rel/reltool.config index 5cd070ccfe9..9751659df53 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -63,6 +63,7 @@ snappy, weatherreport, couch_prometheus, + couch_scanner, %% extra nouveau, @@ -127,6 +128,7 @@ {app, snappy, [{incl_cond, include}]}, {app, weatherreport, [{incl_cond, include}]}, {app, couch_prometheus, [{incl_cond, include}]}, + {app, couch_scanner, [{incl_cond, include}]}, %% extra {app, nouveau, [{incl_cond, include}]}, diff --git a/src/couch_scanner/README.md b/src/couch_scanner/README.md new file mode 100644 index 00000000000..9b8bf66b724 --- /dev/null +++ b/src/couch_scanner/README.md @@ -0,0 +1,4 @@ +Couch Scanner +================ + +Traverse all dbs periodically and emit various reports diff --git a/src/couch_scanner/src/couch_scanner.app.src b/src/couch_scanner/src/couch_scanner.app.src new file mode 100644 index 00000000000..961e9e80e31 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner.app.src @@ -0,0 +1,29 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, couch_scanner, [ + {description, "CouchDB Scanner"}, + {vsn, git}, + {registered, [ + couch_scanner_server + ]}, + {applications, [ + kernel, + stdlib, + crypto, + config, + couch_log, + couch_stats, + fabric + ]}, + {mod, {couch_scanner_app, []}} +]}. diff --git a/src/couch_scanner/src/couch_scanner.erl b/src/couch_scanner/src/couch_scanner.erl new file mode 100644 index 00000000000..ca901dd609a --- /dev/null +++ b/src/couch_scanner/src/couch_scanner.erl @@ -0,0 +1,20 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner). + +-export([ + status/0 +]). + +status() -> + couch_scanner_server:status(). diff --git a/src/couch_scanner/src/couch_scanner_app.erl b/src/couch_scanner/src/couch_scanner_app.erl new file mode 100644 index 00000000000..23c4093dc41 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_app.erl @@ -0,0 +1,23 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + couch_scanner_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/couch_scanner/src/couch_scanner_checkpoint.erl b/src/couch_scanner/src/couch_scanner_checkpoint.erl new file mode 100644 index 00000000000..acf94248e5e --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_checkpoint.erl @@ -0,0 +1,105 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_checkpoint). + +-export([ + write/2, + read/1, + reset/1 +]). + +-include_lib("couch/include/couch_db.hrl"). + +write(<>, #{} = State) -> + with_db(fun(Db) -> update_doc(Db, doc_id(Plugin), State) end). + +read(<>) -> + with_db(fun(Db) -> load_doc(Db, doc_id(Plugin)) end). + +reset(<>) -> + with_db(fun(Db) -> delete_doc(Db, doc_id(Plugin)) end). + +% Private functions + +doc_id(Plugin) -> + % Dashes are more conventional for doc ids + Plugin1 = binary:replace(Plugin, <<"_">>, <<"-">>, [global]), + <>. + +delete_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = {_, RevList}}} -> + {ok, _} = couch_db:delete_doc(Db, DocId, RevList), + ok; + {not_found, _} -> + ok + end. + +update_doc(Db, DocId, #{} = Body) -> + EJsonBody = ?JSON_DECODE(?JSON_ENCODE(Body#{<<"_id">> => DocId})), + Doc = couch_doc:from_json_obj(EJsonBody), + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = Revs}} -> + {ok, _} = couch_db:update_doc(Db, Doc#doc{revs = Revs}, []); + {not_found, _} -> + {ok, _} = couch_db:update_doc(Db, Doc, []) + end, + ok. + +load_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, [ejson_body]) of + {ok, #doc{body = EJsonBody}} -> + ?JSON_DECODE(?JSON_ENCODE(EJsonBody), [return_maps]); + {not_found, _} -> + not_found + end. + +with_db(Fun) when is_function(Fun, 1) -> + DbName = config:get("mem3", "shards_db", "_dbs"), + case mem3_util:ensure_exists(DbName) of + {ok, Db} -> + try + Fun(Db) + after + catch couch_db:close(Db) + end; + Else -> + throw(Else) + end. + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +couch_scanner_checkpoint_test_() -> + { + foreach, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + [ + ?TDEF_FE(t_read_write_reset) + ] + }. + +t_read_write_reset(_) -> + Plugin = <<"scanner_plugin_abc">>, + ok = reset(Plugin), + ?assertEqual(ok, write(Plugin, #{<<"foo">> => 1})), + ?assertEqual(#{<<"foo">> => 1}, read(Plugin)), + ?assertEqual(ok, write(Plugin, #{<<"bar">> => 2})), + ?assertEqual(#{<<"bar">> => 2}, read(Plugin)), + ?assertEqual(not_found, read(<<"scanner_plugin_other">>)), + ?assertEqual(ok, reset(Plugin)), + ?assertEqual(not_found, read(Plugin)). + +-endif. diff --git a/src/couch_scanner/src/couch_scanner_plugin.erl b/src/couch_scanner/src/couch_scanner_plugin.erl new file mode 100644 index 00000000000..578c08eae69 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_plugin.erl @@ -0,0 +1,488 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under1 +% the License. + +% Scanner plugin runner process +% +% This is the process which is spawned and run for each enabled plugin. +% +% A number of these processes are managed by the couch_scanner_server via +% start_link/1 and stop/1 functions. After a plugin runner is spawned, the only +% thing couch_scanner_server does is wait for it to exit. +% +% The plugin runner process may exit normally, crash, or exit with {shutdown, +% {reschedule, TSec}} if they want to reschedule to run again at some point the +% future (next day, a week later, etc). +% +% After the process starts, it will load and validate the plugin module. Then, +% it will start scanning all the dbs and docs on the local node. Shard ranges +% will be scanned only on one of the cluster nodes to avoid duplicating work. +% For instance, if there are 2 shard ranges, 0-7, 8-f, with copies on nodes n1, +% n2, n3. Then, 0-7 might be scanned on n1 only, and 8-f on n3. +% +% The plugin API defined in the behavior definition section. +% +% The start/1 function is called when the plugin starts running. It returns +% some context (St), which can be any Erlang term. All subsequent function +% calls will be called with the same St object, and may return an updated +% version of it. +% +% If the plugin hasn't finished runing and has resumed running after the node +% was restarted or an error happened, the resume/1 function will be called. +% That's the difference between start and resume: start/1 is called when the +% scan starts from the beginning (first db, first shard, ...), and resume/1 is +% called when the scanning hasn't finished and has to continue. +% +% The checkpoint/1 callback is periodically called to checkpoint the scanning +% progress. start/1 and resume/1 function will be called with the last saved +% checkpoint map value. +% +% The stop/1 callback is called when the scan has finished. The stop callback +% should return ok or {reschedule, TSec} and final checkpoint map. The last +% checkoint will be written and then, if {reschedule, TSec} is specified, it +% will be rescheduled to re-run again at the specified time. +% +% As the cluster dbs, shards, ddocs and individual docs are discovered during +% scanning, the appropriate callbacks will be called. Most callbacks, besides +% the updated St object, can reply with ok, skip or stop tags. The meaning of +% those are: +% +% * ok - continue to the next object +% +% * skip - skip the current object and don't scan its internal (ex: skip a db and +% don't scan its ddocs, but continue with the next db) +% +% * stop - stop scanning any remaining objects of that type (ex: don't scan any more dbs) +% + +-module(couch_scanner_plugin). + +-export([ + % Main plugin process API + spawn_link/1, + stop/1, + % Utility functions + new_scan_id/0, + log/5, + tsec/0, + config_match_patterns/2, + % Internal export + run/1 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +% Behaviour callback definitions + +-callback start(EJson :: #{}) -> + {ok, St :: term()} | {reschedule, TSec :: integer()}. +-callback resume(Json :: #{}) -> + {ok, St :: term()} | {reschedule, TSec :: integer()}. +-callback stop(St :: term()) -> + {ok | {reschedule, TSec :: integer()}, EJson :: #{}}. +-callback checkpoint(St :: term()) -> + {ok, EJson :: #{}}. +-callback db(St :: term(), DbName :: binary()) -> + {ok | skip | stop, St1 :: term()}. +-callback ddoc(St :: term(), DbName :: binary(), #doc{}) -> + {ok | stop, St1 :: term()}. +-callback shards(St :: term(), [#shard{}]) -> + {[#shard{}], St1 :: term()}. +-callback db_opened(St :: term(), Db :: term()) -> + {ok, St :: term()}. +-callback doc_id(St :: term(), DocId :: binary(), Db :: term()) -> + {ok | skip | stop, St1 :: term()}. +-callback doc(St :: term(), Db :: term(), #doc{}) -> + {ok | stop, St1 :: term()}. +-callback db_closing(St :: term(), Db :: term()) -> + {ok, St1 :: term()}. + +-define(CHECKPOINT_INTERVAL_SEC, 10). +-define(STOP_TIMEOUT_MSEC, 5000). + +-record(st, { + id, + mod, + pst, + cursor, + shards_db, + db, + checkpoint_tsec = 0, + skip_dbs, + skip_ddocs, + skip_docs +}). + +spawn_link(<>) -> + proc_lib:spawn_link(?MODULE, run, [Id]). + +stop(Pid) when is_pid(Pid) -> + unlink(Pid), + Ref = erlang:monitor(process, Pid), + Pid ! stop, + receive + {'DOWN', Ref, _, _, _} -> ok + after ?STOP_TIMEOUT_MSEC -> + exit(Pid, kill), + receive + {'DOWN', Ref, _, _, _} -> ok + end + end, + ok. + +% Utilities used by plugins + +new_scan_id() -> + TSec = integer_to_binary(erlang:system_time(second)), + Rand = string:lowercase(binary:encode_hex(crypto:strong_rand_bytes(6))), + <>. + +log(Level, Mod, Fmt, Args, #{} = Meta) when + is_atom(Level), is_atom(Mod), is_list(Fmt), is_list(Args) +-> + {MetaFmt, MetaArgs} = log_format_meta(Mod, Meta), + couch_log:Level(lists:flatten([MetaFmt, Fmt]), MetaArgs ++ Args). + +tsec() -> + erlang:system_time(second). + +config_match_patterns(Module, Type) -> + Section = atom_to_list(Module) ++ "." ++ Type, + Fun = fun + ({K, "true"}, Acc) -> [list_to_binary(K) | Acc]; + ({_, _}, Acc) -> Acc + end, + Items = lists:foldl(Fun, [], config:get(Section)), + case Items of + [] -> undefined; + [<<_/binary>> | _] -> binary:compile_pattern(Items) + end. + +% Main run function + +run(<>) -> + Mod = plugin_mod(Id), + St = #st{id = Id, mod = Mod}, + St1 = init_config(St), + St2 = init_from_checkpoint(St1), + St3 = scan_dbs(St2), + finalize(St3). + +% Private functions + +scan_dbs(#st{cursor = Cursor} = St) -> + DbsDbName = mem3_sync:shards_db(), + ioq:set_io_priority({system, DbsDbName}), + {ok, Db} = mem3_util:ensure_exists(DbsDbName), + St1 = St#st{shards_db = Db}, + Opts = [{start_key, Cursor}], + try + {ok, St2} = couch_db:fold_docs(Db, fun scan_dbs_fold/2, St1, Opts), + St2#st{shards_db = undefined} + after + couch_db:close(Db) + end. + +scan_dbs_fold(#full_doc_info{} = FDI, #st{shards_db = Db} = Acc) -> + Acc1 = Acc#st{cursor = FDI#full_doc_info.id}, + Acc2 = maybe_checkpoint(Acc1), + case couch_db:open_doc(Db, FDI, [ejson_body]) of + {ok, #doc{id = <<"_design/", _/binary>>}} -> + {ok, Acc2}; + {ok, #doc{id = DbName, body = Body}} -> + scan_db(shards(DbName, Body), Acc2) + end. + +scan_db([], #st{} = St) -> + {ok, St}; +scan_db([_ | _] = Shards, #st{} = St) -> + #st{cursor = DbName, mod = Mod, pst = PSt, skip_dbs = Skip} = St, + case match_skip_pat(DbName, Skip) of + false -> + {Go, PSt1} = Mod:db(PSt, DbName), + St1 = St#st{pst = PSt1}, + case Go of + ok -> + St2 = fold_ddocs(fun scan_ddocs_fold/2, St1), + {Shards1, St3} = shards_callback(St2, Shards), + St4 = scan_shards(Shards1, St3), + {ok, St4}; + skip -> + {ok, St1}; + stop -> + {stop, St1} + end; + true -> + {ok, St} + end. + +scan_ddocs_fold({meta, _}, #st{} = Acc) -> + {ok, Acc}; +scan_ddocs_fold({row, RowProps}, #st{} = Acc) -> + DDoc = couch_util:get_value(doc, RowProps), + scan_ddoc(couch_doc:from_json_obj(DDoc), Acc); +scan_ddocs_fold(complete, #st{} = Acc) -> + {ok, Acc}; +scan_ddocs_fold({error, Error}, _Acc) -> + exit({shutdown, {scan_ddocs_fold, Error}}). + +scan_shards([], #st{} = St) -> + St; +scan_shards([#shard{} = Shard | Rest], #st{} = St) -> + St1 = maybe_checkpoint(St), + St2 = scan_docs(St1, Shard), + scan_shards(Rest, St2). + +scan_ddoc(#doc{id = DDocId} = DDoc, #st{} = St) -> + #st{cursor = DbName, mod = Mod, pst = PSt, skip_ddocs = Skip} = St, + case match_skip_pat(DDocId, Skip) of + false -> + {Go, PSt1} = Mod:ddoc(PSt, DbName, DDoc), + St1 = St#st{pst = PSt1}, + case Go of + ok -> {ok, St1}; + skip -> {ok, St1}; + stop -> {stop, St1} + end; + true -> + {ok, St} + end. + +scan_docs(#st{} = St, #shard{name = ShardDbName}) -> + case couch_db:open_int(ShardDbName, [?ADMIN_CTX]) of + {ok, Db} -> + St1 = St#st{db = Db}, + St2 = db_opened_callback(St1), + {ok, St3} = couch_db:fold_docs(Db, fun scan_docs_fold/2, St2, []), + St4 = db_closing_callback(St3), + couch_db:close(Db), + St4#st{db = undefined}; + {not_found, _} -> + St + end. + +scan_docs_fold(#full_doc_info{id = Id} = FDI, #st{} = St) -> + #st{db = Db, mod = Mod, pst = PSt, skip_dbs = Skip} = St, + case match_skip_pat(Id, Skip) of + false -> + {Go, PSt1} = Mod:doc_id(PSt, Id, Db), + St1 = St#st{pst = PSt1}, + case Go of + ok -> scan_doc(FDI, St1); + skip -> {ok, St1}; + stop -> {stop, St1} + end; + true -> + {ok, St} + end. + +scan_doc(#full_doc_info{} = FDI, #st{} = St) -> + St1 = maybe_checkpoint(St), + #st{db = Db, mod = Mod, pst = PSt} = St1, + {ok, #doc{} = Doc} = couch_db:open_doc(Db, FDI, [ejson_body]), + {Go, PSt1} = Mod:doc(PSt, Db, Doc), + case Go of + ok -> {ok, St1#st{pst = PSt1}}; + stop -> {stop, St1#st{pst = PSt1}} + end. + +maybe_checkpoint(#st{checkpoint_tsec = LastCheckpointTSec} = St) -> + receive + stop -> + checkpoint(St), + exit({shutdown, stop}) + after 0 -> + ok + end, + erlang:garbage_collect(), + case tsec() - LastCheckpointTSec > ?CHECKPOINT_INTERVAL_SEC of + true -> checkpoint(St); + false -> St + end. + +checkpoint(#st{} = St) -> + #st{id = Id, mod = Mod, pst = PSt, cursor = Cursor} = St, + JsonPSt = checkpoint_callback(Mod, PSt), + EJson = #{ + <<"cursor">> => Cursor, + <<"pst">> => JsonPSt, + <<"state">> => <<"running">> + }, + ok = couch_scanner_checkpoint:write(Id, EJson), + St#st{checkpoint_tsec = tsec()}. + +init_from_checkpoint(#st{id = Id, mod = Mod} = St) -> + case couch_scanner_checkpoint:read(Id) of + #{<<"state">> := <<"running">>, <<"cursor">> := Cur, <<"pst">> := EJson} -> + PSt1 = resume_callback(Mod, EJson), + St#st{pst = PSt1, cursor = Cur, checkpoint_tsec = tsec()}; + not_found -> + PSt1 = start_callback(Mod, #{}), + Cur = <<>>, + ok = start_checkpoint(Id, Mod, Cur, PSt1), + St#st{pst = PSt1, cursor = Cur, checkpoint_tsec = tsec()}; + #{<<"state">> := <<"finished">>, <<"pst">> := EJson} -> + PSt1 = start_callback(Mod, EJson), + Cur = <<>>, + ok = start_checkpoint(Id, Mod, Cur, PSt1), + St#st{pst = PSt1, cursor = Cur, checkpoint_tsec = tsec()} + end. + +start_callback(Mod, EJson) when is_atom(Mod) -> + case Mod:start(EJson) of + {ok, PSt} -> PSt; + {reschedule, TSec} -> exit({shutdown, {reschedule, TSec}}) + end. + +resume_callback(Mod, EJson) when is_atom(Mod) -> + case Mod:resume(EJson) of + {ok, PSt} -> PSt; + {reschedule, TSec} -> exit({shutdown, {reschedule, TSec}}) + end. + +db_opened_callback(#st{pst = PSt, mod = Mod, db = Db} = St) -> + {ok, PSt1} = Mod:db_opened(PSt, Db), + St#st{pst = PSt1}. + +db_closing_callback(#st{pst = PSt, mod = Mod, db = Db} = St) -> + {ok, PSt1} = Mod:db_closing(PSt, Db), + St#st{pst = PSt1}. + +shards_callback(#st{pst = PSt, mod = Mod} = St, Shards) -> + {Shards1, PSt1} = Mod:shards(PSt, Shards), + {Shards1, St#st{pst = PSt1}}. + +finalize(#st{id = Id, mod = Mod, pst = PSt}) -> + {Go, #{} = PStEJson} = Mod:stop(PSt), + EJson = #{ + <<"cursor">> => <<>>, + <<"pst">> => ejson_map(PStEJson), + <<"state">> => <<"finished">> + }, + ok = couch_scanner_checkpoint:write(Id, EJson), + case Go of + ok -> ok; + {reschedule, TSec} -> exit({shutdown, {reschedule, TSec}}) + end. + +start_checkpoint(Id, Mod, Cur, PSt1) -> + PStEJson = checkpoint_callback(Mod, PSt1), + EJson = #{ + <<"cursor">> => Cur, + <<"pst">> => PStEJson, + <<"state">> => <<"running">> + }, + ok = couch_scanner_checkpoint:write(Id, EJson). + +checkpoint_callback(Mod, PSt) -> + {ok, #{} = PStEJson} = Mod:checkpoint(PSt), + ejson_map(PStEJson). + +plugin_mod(<>) -> + Mod = binary_to_atom(Plugin), + case code:ensure_loaded(Mod) of + {module, _} -> + check_callbacks(Mod), + Mod; + {error, Error} -> + error({?MODULE, {missing_plugin_module, Mod, Error}}) + end. + +check_callbacks(Mod) when is_atom(Mod) -> + Cbks = [ + {start, 1}, + {resume, 1}, + {stop, 1}, + {checkpoint, 1}, + {db, 2}, + {ddoc, 3}, + {shards, 2}, + {db_opened, 2}, + {doc_id, 3}, + {doc, 3}, + {db_closing, 2} + ], + Fun = fun({F, A}) -> + case erlang:function_exported(Mod, F, A) of + true -> ok; + false -> error({?MODULE, {undefined_plugin_fun, Mod, F, A}}) + end + end, + lists:foreach(Fun, Cbks). + +shards(DbName, {Props = [_ | _]}) -> + Shards = lists:sort(mem3_util:build_shards(DbName, Props)), + Fun = fun({R, SList}) -> + case mem3_util:rotate_list({DbName, R}, SList) of + [#shard{node = N} = S | _] when N =:= node() -> + {true, S}; + [_ | _] -> + false + end + end, + lists:filtermap(Fun, shards_by_range(lists:sort(Shards))). + +shards_by_range(Shards) -> + Fun = fun(#shard{range = R} = S, Acc) -> orddict:append(R, S, Acc) end, + Dict = lists:foldl(Fun, orddict:new(), Shards), + orddict:to_list(Dict). + +fold_ddocs(Fun, #st{cursor = DbName} = Acc) -> + QArgs = #mrargs{ + include_docs = true, + extra = [{namespace, <<"_design">>}] + }, + {ok, Acc1} = fabric:all_docs(DbName, [?ADMIN_CTX], Fun, Acc, QArgs), + Acc1. + +ejson_map(Obj) -> + jiffy:decode(jiffy:encode(Obj), [return_maps]). + +init_config(#st{mod = Mod} = St) -> + St#st{ + skip_dbs = config_match_patterns(Mod, "skip_dbs"), + skip_ddocs = config_match_patterns(Mod, "skip_ddocs"), + skip_docs = config_match_patterns(Mod, "skip_docs") + }. + +match_skip_pat(<<_/binary>>, undefined) -> + false; +match_skip_pat(<<_/binary>> = Bin, Pat) -> + binary:match(Bin, Pat) /= nomatch. + +log_format_meta(Mod, #{} = Meta) -> + SId = {"s:~s ", maps:get(sid, Meta, undefined)}, + Fun = {"f:~s ", maps:get(fn, Meta, undefined)}, + Db = {"db:~s ", format_db(maps:get(db, Meta, undefined))}, + DDocId = {"ddoc:~s ", maps:get(ddoc, Meta, undefined)}, + DocId = {"doc:~s ", maps:get(doc, Meta, undefined)}, + FmtArgs = [{"~s ", Mod}, SId, Fun, Db, DDocId, DocId], + lists:unzip([{Fmt, Arg} || {Fmt, Arg} <- FmtArgs, Arg /= undefined]). + +format_db(undefined) -> + undefined; +format_db(Db) when is_list(Db) -> + format_db(list_to_binary(Db)); +format_db(#shard{dbname = Db, range = [B, E]}) -> + {BStr, EStr} = {hex(B), hex(E)}, + <>; +format_db(Db) when is_tuple(Db) -> + format_db(couch_db:name(Db)); +format_db(<<"shards/", B:8/binary, "-", E:8/binary, "/", Rest/binary>>) -> + [Db, _] = binary:split(Rest, <<".">>), + <>; +format_db(<>) -> + Db. + +hex(Val) when is_integer(Val) -> + string:lowercase(erlang:integer_to_binary(Val, 16)). diff --git a/src/couch_scanner/src/couch_scanner_plugin.hrl b/src/couch_scanner/src/couch_scanner_plugin.hrl new file mode 100644 index 00000000000..1a0db3da7af --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_plugin.hrl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +-define(LOG(LVL, FMT, ARGS, META), couch_scanner_plugin:log(LVL, ?MODULE, FMT, ARGS, maps:merge(META, #{fn => ?FUNCTION_NAME}))). +-define(INFO(FMT, ARGS, META), ?LOG(info, FMT, ARGS, META)). +-define(INFO(FMT, ARGS), ?LOG(info, FMT, ARGS, #{})). +-define(INFO(META), ?LOG(info, "", [], META)). +-define(INFO(), ?LOG(info, "", [], #{})). +-define(WARN(FMT, ARGS, META), ?LOG(warn, FMT, ARGS, META)). diff --git a/src/couch_scanner/src/couch_scanner_plugin_quickjs.erl b/src/couch_scanner/src/couch_scanner_plugin_quickjs.erl new file mode 100644 index 00000000000..e4e622e43b6 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_plugin_quickjs.erl @@ -0,0 +1,398 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_plugin_quickjs). +-behaviour(couch_scanner_plugin). + +-export([ + start/1, + resume/1, + stop/1, + checkpoint/1, + db/2, + ddoc/3, + shards/2, + db_opened/2, + doc_id/3, + doc/3, + db_closing/2 +]). + +-include("couch_scanner_plugin.hrl"). + +-define(QJS, <<"javascript_quickjs">>). +-define(SM, <<"javascript_spidermonkey">>). + +-record(st, { + sid, + ddocs = #{}, + docs = [], + qjs_proc, + sm_proc, + ddoc_cnt = 0, + doc_cnt = 0, + doc_step = 0, + max_ddocs = 100, + max_shards = 4, + max_docs = 1000, + max_step = 1000, + batch_size = 100 +}). + +% Behavior callbacks + +start(#{} = JsonSt) -> + St = #st{sid = couch_scanner_plugin:new_scan_id()}, + case JsonSt of + #{<<"sid">> := OldScanId, <<"finished_at">> := OldTSec} -> + Msg = "old id:~p finished at:~p", + Args = [OldScanId, OldTSec], + ?INFO(Msg, Args, #{sid => St#st.sid}); + #{} -> + ?INFO("No old scan results found", [], #{sid => St#st.sid}) + end, + % {reschedule, TSec} + {ok, init_config(St)}. + +resume(#{} = JsonSt) -> + #{<<"sid">> := SId} = JsonSt, + ?INFO(#{sid => SId}), + St = #st{sid = SId}, + % {reschedule, TSec} + {ok, init_config(St)}. + +stop(#st{sid = SId} = St) -> + TSec = couch_scanner_plugin:tsec(), + FinalJsonSt = #{ + <<"sid">> => St#st.sid, + <<"finished_at">> => TSec + }, + ?INFO("Finished at ~p", [TSec], #{sid => SId}), + % {{reschedule, TSec}, #{}} + {ok, FinalJsonSt}. + +checkpoint(#st{sid = SId}) -> + ?INFO(#{sid => SId}), + {ok, #{<<"sid">> => SId}}. + +db(#st{} = St, DbName) -> + St1 = reset_per_db_state(St), + #st{sid = SId} = St1, + ?INFO(#{sid => SId, db => DbName}), + {ok, St1}. + +ddoc(#st{ddoc_cnt = C, max_ddocs = M} = St, _, _) when C > M -> + {stop, St}; +ddoc(#st{} = St, DbName, #doc{id = DDocId} = DDoc) -> + ?INFO(#{sid => St#st.sid, db => DbName, ddoc => DDocId}), + #st{ddoc_cnt = Cnt} = St, + St1 = St#st{ddoc_cnt = Cnt + 1}, + #doc{body = {Props = [_ | _]}} = DDoc, + case couch_util:get_value(<<"language">>, Props, <<"javascript">>) of + <<"javascript">> -> {ok, process_ddoc(St1, DbName, DDoc)}; + _ -> {ok, St1} + end. + +shards(#st{max_shards = Max, ddocs = DDocs} = St, Shards) -> + case {map_size(DDocs), lists:sublist(Shards, Max)} of + {0, _} -> {[], St}; + {_, []} -> {[], St}; + {_, Shards1} -> {Shards1, St} + end. + +db_opened(#st{} = St, Db) -> + #st{max_docs = MaxDocs, max_step = MaxStep} = St, + {ok, DocTotal} = couch_db:get_doc_count(Db), + Step = min(MaxStep, max(1, DocTotal div MaxDocs)), + {ok, St#st{doc_cnt = 0, doc_step = Step, docs = []}}. + +doc_id(#st{doc_cnt = C, max_docs = M} = St, _, _) when C > M -> + {stop, St}; +doc_id(#st{doc_cnt = C, doc_step = S} = St, _, _) when C rem S == 1 -> + {skip, St#st{doc_cnt = C + 1}}; +doc_id(#st{doc_cnt = C} = St, _, _) -> + {ok, St#st{doc_cnt = C + 1}}. + +doc(#st{docs = Docs, batch_size = B} = St, _Db, Doc) when length(Docs) < B -> + {ok, St#st{docs = [Doc | Docs]}}; +doc(#st{docs = Docs} = St, Db, #doc{} = Doc) -> + St1 = process_docs(St#st{docs = []}, Db, lists:reverse([Doc | Docs])), + % {stop, St}? + {ok, St1}. + +db_closing(#st{docs = Docs} = St, Db) -> + St1 = process_docs(St, Db, Docs), + {ok, St1#st{doc_cnt = 0, doc_step = 0, docs = []}}. + +% Private + +process_docs(#st{} = St, Db, Docs) -> + #st{sid = SId, ddocs = DDocs} = St, + Meta = #{sid => SId, db => Db}, + ?INFO("docs:~p ddocs:~p", [length(Docs), map_size(DDocs)], Meta), + St. + +process_ddoc(#st{} = St, DbName, #doc{} = DDoc0) -> + #st{sid = SId, ddocs = DDocs} = St, + #doc{id = DDocId} = DDoc0, + DDoc = parse_ddoc(DDoc0), + case map_size(DDoc) > 0 of + true -> + St1 = start_or_reset_procs(St), + try + lib_validate(St1, maps:get(lib, DDoc, undefined)), + views_validate(St1, maps:get(views, DDoc, undefined)), + filters_validate(St1, maps:get(filter, DDoc, undefined)), + vdu_validate(St1, maps:get(vdu, DDoc, undefined)), + ok = reset_procs(St1), + teach_ddoc_validate(St1, DDocId, DDoc), + St1#st{ddocs = DDocs#{DDocId => DDoc}} + catch + throw:{validate, Error} -> + Meta = #{sid => SId, db => DbName, ddoc => DDocId}, + ?WARN("Validation failed ~p", [Error], Meta), + St1 + end; + false -> + St + end. + +parse_ddoc(#doc{body = Body0}) -> + Body = jiffy:decode(jiffy:encode(Body0), [return_maps]), + DDoc1 = parse_lib(#{}, Body), + DDoc2 = parse_views(DDoc1, Body), + DDoc3 = parse_filters(DDoc2, Body), + parse_vdu(DDoc3, Body). + +parse_lib(#{} = DDoc, #{<<"lib">> := Lib}) -> + DDoc#{lib => Lib}; +parse_lib(#{} = DDoc, #{}) -> + DDoc. + +parse_views(#{} = DDoc, #{<<"views">> := #{} = Views}) -> + Fun = fun(Name, #{} = MapRed, Acc) -> + case MapRed of + #{<<"map">> := Map, <<"reduce">> := <<"_", _/binary>>} -> + Acc#{Name => #{map => Map}}; + #{<<"map">> := Map, <<"reduce">> := <<_/binary>> = Red} -> + Acc#{Name => #{map => Map, reduce => Red}}; + #{<<"map">> := Map} -> + Acc#{Name => #{map => Map}}; + #{} -> + Acc + end + end, + maps:fold(Fun, DDoc, Views); +parse_views(#{} = DDoc, #{}) -> + DDoc. + +parse_filters(#{} = DDoc, #{<<"filters">> := Filters}) -> + case map_size(Filters) > 0 of + true -> DDoc#{filters => Filters}; + false -> DDoc + end; +parse_filters(#{} = DDoc, #{}) -> + DDoc. + +parse_vdu(#{} = DDoc, #{<<"validate_doc_update">> := <<_/binary>> = VDU}) -> + DDoc#{validate_doc_update => VDU}; +parse_vdu(#{} = DDoc, #{}) -> + DDoc. + +reset_per_db_state(#st{qjs_proc = QjsProc, sm_proc = SmProc} = St) -> + proc_stop(QjsProc), + proc_stop(SmProc), + St#st{ + ddocs = #{}, + docs = [], + qjs_proc = undefined, + sm_proc = undefined, + ddoc_cnt = 0 + }. + +start_or_reset_procs(#st{} = St) -> + St1 = start_or_reset_qjs_proc(St), + start_or_reset_sm_proc(St1). + +start_or_reset_qjs_proc(#st{qjs_proc = undefined} = St) -> + Cmd = couch_quickjs:mainjs_cmd(), + {ok, Pid} = couch_os_process:start_link(Cmd), + Proc = proc(Pid), + true = proc_reset(Proc), + St#st{qjs_proc = Proc}; +start_or_reset_qjs_proc(#st{qjs_proc = #proc{} = Proc} = St) -> + try + true = proc_reset(Proc), + St + catch + _:_ -> + proc_stop(Proc), + start_or_reset_qjs_proc(St#st{qjs_proc = undefined}) + end. + +start_or_reset_sm_proc(#st{sm_proc = undefined} = St) -> + Cmd = os:getenv("COUCHDB_QUERY_SERVER_JAVASCRIPT"), + {ok, Pid} = couch_os_process:start_link(Cmd), + Proc = proc(Pid), + true = proc_reset(Proc), + St#st{sm_proc = Proc}; +start_or_reset_sm_proc(#st{sm_proc = #proc{} = Proc} = St) -> + try + true = proc_reset(Proc), + St + catch + _:_ -> + proc_stop(Proc), + start_or_reset_sm_proc(St#st{sm_proc = undefined}) + end. + +lib_validate(#st{}, undefined) -> + ok; +lib_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, #{} = Lib) -> + QjsRes = add_lib(Qjs, Lib), + SmRes = add_lib(Sm, Lib), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {add_lib, QjsRes, SmRes}}) + end. + +add_lib(#proc{} = Proc, #{} = Lib) -> + try + true = prompt(Proc, [<<"add_lib">>, Lib]), + ok + catch + Tag:Err -> + {error, {Tag, Err}} + end. + +views_validate(#st{}, undefined) -> + ok; +views_validate(#st{} = St, #{} = Views) -> + Fun = fun(Name, #{} = View) -> view_validate(St, Name, View) end, + maps:foreach(Fun, Views). + +view_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, Name, View) -> + #{map := MapSrc} = View, + QjsMapRes = add_fun(Qjs, MapSrc), + SmMapRes = add_fun(Sm, MapSrc), + case QjsMapRes == SmMapRes of + true -> ok; + false -> throw({validate, {add_fun, map, Name, QjsMapRes, SmMapRes}}) + end, + RedSrc = maps:get(red, View, undefined), + QjsRedRes = add_fun(Qjs, RedSrc), + SmRedRes = add_fun(Sm, RedSrc), + case QjsRedRes == SmRedRes of + true -> ok; + false -> throw({validate, {add_fun, red, Name, QjsRedRes, SmRedRes}}) + end. + +add_fun(#proc{}, undefined) -> + ok; +add_fun(#proc{} = Proc, <<_/binary>> = FunSrc) -> + try + true = prompt(Proc, [<<"add_fun">>, FunSrc]), + ok + catch + Tag:Err -> + {error, {Tag, Err}} + end. + +filters_validate(#st{}, undefined) -> + ok; +filters_validate(#st{} = St, #{} = Filters) -> + Fun = fun(Name, Filter) -> filter_validate(St, Name, Filter) end, + maps:foreach(Fun, Filters). + +filter_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, Name, Filter) -> + QjsRes = add_fun(Qjs, Filter), + SmRes = add_fun(Sm, Filter), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {filter, Name, QjsRes, SmRes}}) + end. + +vdu_validate(#st{}, undefined) -> + ok; +vdu_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, VDU) -> + QjsRes = add_fun(Qjs, VDU), + SmRes = add_fun(Sm, VDU), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {vdu, QjsRes, SmRes}}) + end. + +reset_procs(#st{qjs_proc = #proc{} = QjsProc, sm_proc = #proc{} = SmProc}) -> + true = proc_reset(QjsProc), + true = proc_reset(SmProc), + ok. + +proc(Pid) -> + #proc{ + pid = Pid, + prompt_fun = {couch_os_process, prompt}, + set_timeout_fun = {couch_os_process, set_timeout}, + stop_fun = {couch_os_process, stop} + }. + +prompt(#proc{} = Proc, Prompt) -> + couch_query_servers:proc_prompt(Proc, Prompt). + +proc_set_timeout(Proc, Timeout) -> + {Mod, Func} = Proc#proc.set_timeout_fun, + apply(Mod, Func, [Proc#proc.pid, Timeout]). + +teach_ddoc_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, DDocId, DDoc) -> + QjsRes = teach_ddoc(Qjs, DDocId, DDoc), + SmRes = teach_ddoc(Sm, DDocId, DDoc), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {teach_ddoc, DDocId, QjsRes, SmRes}}) + end. + +teach_ddoc(#proc{} = Proc, DDocId, DDoc) -> + try + true = prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) + catch + Tag:Err -> + {error, {Tag, Err}} + end. + +proc_reset(#proc{} = Proc) -> + Timeout = config:get_integer("couchdb", "os_process_timeout", 5000), + Cfg = [{<<"reduce_limit">>, true}, {<<"timeout">>, Timeout}], + Result = prompt(Proc, [<<"reset">>, {Cfg}]), + proc_set_timeout(Proc, Timeout), + Result. + +proc_stop(undefined) -> + ok; +proc_stop(#proc{pid = Pid} = Proc) -> + unlink(Pid), + Ref = monitor(process, Pid), + {Mod, Func} = Proc#proc.stop_fun, + apply(Mod, Func, [Pid]), + receive + {'DOWN', Ref, _, _, _} -> ok + end. + +init_config(#st{} = St) -> + St#st{ + max_ddocs = cfg_int("max_ddocs", St#st.max_ddocs), + max_shards = cfg_int("max_shards", St#st.max_shards), + max_docs = cfg_int("max_docs", St#st.max_docs), + max_step = cfg_int("max_step", St#st.max_step), + batch_size = cfg_int("batch_size", St#st.batch_size) + }. + +cfg_int(Key, Default) when is_list(Key), is_integer(Default) -> + config:get_integer(atom_to_list(?MODULE), Key, Default). diff --git a/src/couch_scanner/src/couch_scanner_server.erl b/src/couch_scanner/src/couch_scanner_server.erl new file mode 100644 index 00000000000..c818f9ed66c --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_server.erl @@ -0,0 +1,217 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% Scanner plugin server. +% +% This is gen_server starts, stops and reschedules plugin processes. +% + +-module(couch_scanner_server). + +-export([ + start_link/0, + status/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +-define(SCHED_INTERVAL_SEC, 5). +-define(PENALTY_BASE_SEC, 60). +-define(MAX_PENALTY_SEC, 8 * 3600). +-define(HEAL_THRESHOLD_SEC, 5 * 60). + +-record(sched, { + start_time = 0, + error_count = 0, + reschedule = 0 +}). + +-record(st, { + pids = #{}, + scheduling = #{}, + tref +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +status() -> + gen_server:call(?MODULE, status, infinity). + +% Private + +init(_Args) -> + process_flag(trap_exit, true), + {ok, #st{}, interval_msec()}. + +terminate(_Reason, #st{pids = Pids} = St) -> + ToStop = maps:keys(Pids), + lists:foldl(fun stop_plugin/2, St, ToStop), + ok. + +handle_call(status, _From, #st{} = St) -> + Fun = fun(_, Sched) -> + #{ + start_time => Sched#sched.start_time, + error_count => Sched#sched.error_count, + reschedule => Sched#sched.reschedule + } + end, + SchedMaps = maps:map(Fun, St#st.scheduling), + {reply, #{pids => St#st.pids, scheduling => SchedMaps}, St}; +handle_call(Msg, _From, #st{} = St) -> + couch_log:error("~p : unknown call ~p", [?MODULE, Msg]), + {reply, {error, {invalid_call, Msg}}, St}. + +handle_cast(Msg, #st{} = St) -> + couch_log:error("~p : unknown cast ~p", [?MODULE, Msg]), + {noreply, St}. + +handle_info(timeout, #st{} = St) -> + St1 = + case in_maintenance() of + true -> stop_in_maintenance(St); + false -> start_stop_cfg(St) + end, + {noreply, schedule_timeout(St1)}; +handle_info({'EXIT', Pid, Reason}, #st{pids = Pids} = St) -> + case maps:filter(fun(_, P) -> P =:= Pid end, Pids) of + Map when map_size(Map) == 1 -> + [{Id, Pid}] = maps:to_list(Map), + St1 = St#st{pids = maps:remove(Id, Pids)}, + {noreply, handle_exit(Id, Reason, St1)}; + Map when map_size(Map) == 0 -> + {noreply, St} + end; +handle_info(Msg, St) -> + couch_log:error("~p : unknown info message ~p", [?MODULE, Msg]), + {noreply, St}. + +stop_in_maintenance(#st{pids = Pids} = St) -> + case map_size(Pids) > 0 of + true -> + couch_log:info("~p stopping in maintenance mode", [?MODULE]), + lists:foldl(fun stop_plugin/2, St, maps:keys(Pids)); + false -> + St + end. + +start_stop_cfg(#st{pids = Pids, scheduling = Scheduling} = St) -> + PluginIds = plugins(), + RunningIds = maps:keys(Pids), + ToStart = PluginIds -- RunningIds, + ToStop = RunningIds -- PluginIds, + St1 = lists:foldl(fun stop_plugin/2, St, ToStop), + lists:foreach(fun couch_scanner_checkpoint:reset/1, ToStop), + ToRemove = maps:keys(Scheduling) -- PluginIds, + St2 = St1#st{scheduling = maps:without(ToRemove, Scheduling)}, + lists:foldl(fun start_plugin/2, St2, ToStart). + +stop_plugin(Id, #st{} = St) -> + #st{pids = Pids, scheduling = Scheduling} = St, + {Pid, Pids1} = maps:take(Id, Pids), + #{Id := #sched{} = Sched} = Scheduling, + couch_log:info("~p : stopping ~s", [?MODULE, Id]), + ok = couch_scanner_plugin:stop(Pid), + Sched1 = Sched#sched{start_time = 0, reschedule = 0}, + Scheduling1 = Scheduling#{Id := Sched1}, + St#st{pids = Pids1, scheduling = Scheduling1}. + +start_plugin(Id, #st{pids = Pids, scheduling = Scheduling} = St) -> + Sched = maps:get(Id, Scheduling, #sched{}), + NowSec = tsec(), + case NowSec >= Sched#sched.reschedule of + true -> + couch_log:info("~p : starting ~s", [?MODULE, Id]), + Pids1 = Pids#{Id => couch_scanner_plugin:spawn_link(Id)}, + Sched1 = Sched#sched{ + start_time = NowSec, + reschedule = 0 + }, + Scheduling1 = Scheduling#{Id => Sched1}, + St#st{pids = Pids1, scheduling = Scheduling1}; + false -> + St + end. + +plugins() -> + Fun = fun + ({K, "true"}, Acc) -> + FullName = "couch_scanner_plugin_" ++ K, + [list_to_binary(FullName) | Acc]; + ({_, _}, Acc) -> + Acc + end, + lists:foldl(Fun, [], config:get("couch_scanner_plugins")). + +handle_exit(Id, Reason, #st{} = St) -> + #st{scheduling = Scheduling} = St, + #{Id := Sched} = Scheduling, + Sched1 = sched_exit_update(Id, Sched, Reason), + St#st{scheduling = Scheduling#{Id := Sched1}}. + +sched_exit_update(Id, #sched{} = Sched, {shutdown, {reschedule, TSec}}) -> + couch_log:notice("~p : ~s rescheduled after ~p", [?MODULE, Id, TSec]), + Sched#sched{start_time = 0, error_count = 0, reschedule = TSec}; +sched_exit_update(Id, #sched{} = Sched, Norm) when + Norm == shutdown; Norm == normal +-> + couch_log:notice("~p : ~s finished", [?MODULE, Id]), + Sched#sched{start_time = 0, error_count = 0, reschedule = infinity}; +sched_exit_update(Id, #sched{} = Sched, Error) -> + couch_log:error("~p : ~s exited with error ~p", [?MODULE, Id, Error]), + #sched{start_time = StartSec, error_count = ErrorCount} = Sched, + Sched1 = Sched#sched{start_time = 0}, + NowSec = tsec(), + % If process has been running successfully for a while without crashing + % reset (forgive) its previous errors. + case NowSec - StartSec =< heal_threshold_sec() of + true -> penalize(NowSec, Sched1#sched{error_count = 1}); + false -> penalize(NowSec, Sched1#sched{error_count = ErrorCount + 1}) + end. + +penalize(NowSec, #sched{error_count = ErrorCount} = Sched) -> + PenaltySec = ?PENALTY_BASE_SEC * (1 bsl (ErrorCount - 1)), + PenaltySec1 = min(PenaltySec, max_penalty_sec()), + Sched#sched{reschedule = NowSec + PenaltySec1}. + +in_maintenance() -> + "false" /= config:get("couchdb", "maintenance_mode", "false"). + +tsec() -> + erlang:system_time(second). + +schedule_timeout(#st{tref = TRef} = St) -> + case TRef of + undefined -> ok; + _ when is_reference(TRef) -> erlang:cancel_timer(TRef) + end, + TRef1 = erlang:send_after(interval_msec(), self(), timeout), + St#st{tref = TRef1}. + +interval_msec() -> + 1000 * cfg_int("sched_interval_sec", ?SCHED_INTERVAL_SEC). + +heal_threshold_sec() -> + cfg_int("heal_threshold_sec", ?HEAL_THRESHOLD_SEC). + +max_penalty_sec() -> + cfg_int("max_penalty_sec", ?MAX_PENALTY_SEC). + +cfg_int(Key, Default) when is_list(Key), is_integer(Default) -> + config:get_integer("couch_scanner", Key, Default). diff --git a/src/couch_scanner/src/couch_scanner_sup.erl b/src/couch_scanner/src/couch_scanner_sup.erl new file mode 100644 index 00000000000..7baf0037a9d --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_sup.erl @@ -0,0 +1,34 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_sup). + +-behaviour(supervisor). + +-export([ + start_link/0, + init/1 +]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + Children = [ + #{ + id => couch_scanner_server, + start => {couch_scanner_server, start_link, []}, + shutdown => 5000 + } + ], + SupFlags = #{strategy => rest_for_one, intensity => 25, period => 1}, + {ok, {SupFlags, Children}}. diff --git a/src/couch_scanner/test/eunit/couch_scanner_test.erl b/src/couch_scanner/test/eunit/couch_scanner_test.erl new file mode 100644 index 00000000000..5598271090c --- /dev/null +++ b/src/couch_scanner/test/eunit/couch_scanner_test.erl @@ -0,0 +1,34 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_test). + +-include_lib("couch/include/couch_eunit.hrl"). + +couch_scanner_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_scanner) + ] + }. + +setup() -> + test_util:start_couch(). + +teardown(Ctx) -> + test_util:stop_couch(Ctx). + +t_scanner(_Ctx) -> + ok.