From 79e9f8ab53e18d096c5a9dee192eb503f6356dd2 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Wed, 9 Aug 2023 17:04:15 -0400 Subject: [PATCH] Implement a background ddoc scanner [WIP] - plugin API completed - db / shard / ddoc traversal completed - ddoc compilation check implemented - make traversal skipping / and limits configurable TODO: - feed sample documents to map and reduce function and compare results - add tests - more plugins? The app scans all the dbs and docs. It has a plugin system to allow gathering various things from a cluster. The first use is to scan all the javascript design docs and run them through the new QuickJS javascript engine. Other possible uses: - Detect features used in design docs - Gather total db and view sizes - Scan for document features (docs of certain sizes, contained certain fields and values). - Scan replication dbs/doc features The plugins are managed as individual process by the couch_scanner_server with the start_link/1 and stop/1 functions. After a plugin runner is spawned, the only thing couch_scanner_server does is wait for it to exit. The plugin runner process may exit normally, crash, or exit with {shutdown, {reschedule, TSec}} if they want to reschedule to run again at some point the future (next day, a week later, etc). After the process starts, it will load and validate the plugin module. Then, it will start scanning all the dbs and docs on the local node. Shard ranges will be scanned only on one of the cluster nodes to avoid duplicating work. For instance, if there are 2 shard ranges, 0-7, 8-f, with copies on nodes n1, n2, n3. Then, 0-7 might be scanned on n1 only, and 8-f on n3. The plugin API is the following (as OTP callback definitions): ```erlang -callback start(EJson :: #{}) -> {ok, St :: term()} | {reschedule, TSec :: integer()}. -callback resume(Json :: #{}) -> {ok, St :: term()} | {reschedule, TSec :: integer()}. -callback stop(St :: term()) -> {ok | {reschedule, TSec :: integer()}, EJson :: #{}}. -callback checkpoint(St :: term()) -> {ok, EJson :: #{}}. -callback db(St :: term(), DbName :: binary()) -> {ok | skip | stop, St1 :: term()}. -callback ddoc(St :: term(), DbName :: binary(), #doc{}) -> {ok | stop, St1 :: term()}. -callback shards(St :: term(), [#shard{}]) -> {[#shard{}], St1 :: term()}. -callback db_opened(St :: term(), Db :: term()) -> {ok, St :: term()}. -callback doc_id(St :: term(), DocId :: binary(), Db :: term()) -> {ok | skip | stop, St1 :: term()}. -callback doc(St :: term(), Db :: term(), #doc{}) -> {ok | stop, St1 :: term()}. -callback db_closing(St :: term(), Db :: term()) -> {ok, St1 :: term()}. ``` --- rebar.config.script | 1 + rel/reltool.config | 2 + src/couch_scanner/README.md | 4 + src/couch_scanner/src/couch_scanner.app.src | 29 ++ src/couch_scanner/src/couch_scanner.erl | 20 + src/couch_scanner/src/couch_scanner_app.erl | 23 + .../src/couch_scanner_checkpoint.erl | 105 ++++ .../src/couch_scanner_plugin.erl | 489 ++++++++++++++++++ .../src/couch_scanner_plugin.hrl | 21 + .../src/couch_scanner_plugin_quickjs.erl | 361 +++++++++++++ .../src/couch_scanner_server.erl | 217 ++++++++ src/couch_scanner/src/couch_scanner_sup.erl | 34 ++ .../test/eunit/couch_scanner_test.erl | 34 ++ 13 files changed, 1340 insertions(+) create mode 100644 src/couch_scanner/README.md create mode 100644 src/couch_scanner/src/couch_scanner.app.src create mode 100644 src/couch_scanner/src/couch_scanner.erl create mode 100644 src/couch_scanner/src/couch_scanner_app.erl create mode 100644 src/couch_scanner/src/couch_scanner_checkpoint.erl create mode 100644 src/couch_scanner/src/couch_scanner_plugin.erl create mode 100644 src/couch_scanner/src/couch_scanner_plugin.hrl create mode 100644 src/couch_scanner/src/couch_scanner_plugin_quickjs.erl create mode 100644 src/couch_scanner/src/couch_scanner_server.erl create mode 100644 src/couch_scanner/src/couch_scanner_sup.erl create mode 100644 src/couch_scanner/test/eunit/couch_scanner_test.erl diff --git a/rebar.config.script b/rebar.config.script index 1bc63675519..60ae4484258 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -142,6 +142,7 @@ SubDirs = [ "src/smoosh", "src/weatherreport", "src/couch_prometheus", + "src/couch_scanner", "rel" ]. diff --git a/rel/reltool.config b/rel/reltool.config index 5cd070ccfe9..9751659df53 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -63,6 +63,7 @@ snappy, weatherreport, couch_prometheus, + couch_scanner, %% extra nouveau, @@ -127,6 +128,7 @@ {app, snappy, [{incl_cond, include}]}, {app, weatherreport, [{incl_cond, include}]}, {app, couch_prometheus, [{incl_cond, include}]}, + {app, couch_scanner, [{incl_cond, include}]}, %% extra {app, nouveau, [{incl_cond, include}]}, diff --git a/src/couch_scanner/README.md b/src/couch_scanner/README.md new file mode 100644 index 00000000000..9b8bf66b724 --- /dev/null +++ b/src/couch_scanner/README.md @@ -0,0 +1,4 @@ +Couch Scanner +================ + +Traverse all dbs periodically and emit various reports diff --git a/src/couch_scanner/src/couch_scanner.app.src b/src/couch_scanner/src/couch_scanner.app.src new file mode 100644 index 00000000000..961e9e80e31 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner.app.src @@ -0,0 +1,29 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, couch_scanner, [ + {description, "CouchDB Scanner"}, + {vsn, git}, + {registered, [ + couch_scanner_server + ]}, + {applications, [ + kernel, + stdlib, + crypto, + config, + couch_log, + couch_stats, + fabric + ]}, + {mod, {couch_scanner_app, []}} +]}. diff --git a/src/couch_scanner/src/couch_scanner.erl b/src/couch_scanner/src/couch_scanner.erl new file mode 100644 index 00000000000..ca901dd609a --- /dev/null +++ b/src/couch_scanner/src/couch_scanner.erl @@ -0,0 +1,20 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner). + +-export([ + status/0 +]). + +status() -> + couch_scanner_server:status(). diff --git a/src/couch_scanner/src/couch_scanner_app.erl b/src/couch_scanner/src/couch_scanner_app.erl new file mode 100644 index 00000000000..23c4093dc41 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_app.erl @@ -0,0 +1,23 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + couch_scanner_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/couch_scanner/src/couch_scanner_checkpoint.erl b/src/couch_scanner/src/couch_scanner_checkpoint.erl new file mode 100644 index 00000000000..acf94248e5e --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_checkpoint.erl @@ -0,0 +1,105 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_checkpoint). + +-export([ + write/2, + read/1, + reset/1 +]). + +-include_lib("couch/include/couch_db.hrl"). + +write(<>, #{} = State) -> + with_db(fun(Db) -> update_doc(Db, doc_id(Plugin), State) end). + +read(<>) -> + with_db(fun(Db) -> load_doc(Db, doc_id(Plugin)) end). + +reset(<>) -> + with_db(fun(Db) -> delete_doc(Db, doc_id(Plugin)) end). + +% Private functions + +doc_id(Plugin) -> + % Dashes are more conventional for doc ids + Plugin1 = binary:replace(Plugin, <<"_">>, <<"-">>, [global]), + <>. + +delete_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = {_, RevList}}} -> + {ok, _} = couch_db:delete_doc(Db, DocId, RevList), + ok; + {not_found, _} -> + ok + end. + +update_doc(Db, DocId, #{} = Body) -> + EJsonBody = ?JSON_DECODE(?JSON_ENCODE(Body#{<<"_id">> => DocId})), + Doc = couch_doc:from_json_obj(EJsonBody), + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = Revs}} -> + {ok, _} = couch_db:update_doc(Db, Doc#doc{revs = Revs}, []); + {not_found, _} -> + {ok, _} = couch_db:update_doc(Db, Doc, []) + end, + ok. + +load_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, [ejson_body]) of + {ok, #doc{body = EJsonBody}} -> + ?JSON_DECODE(?JSON_ENCODE(EJsonBody), [return_maps]); + {not_found, _} -> + not_found + end. + +with_db(Fun) when is_function(Fun, 1) -> + DbName = config:get("mem3", "shards_db", "_dbs"), + case mem3_util:ensure_exists(DbName) of + {ok, Db} -> + try + Fun(Db) + after + catch couch_db:close(Db) + end; + Else -> + throw(Else) + end. + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +couch_scanner_checkpoint_test_() -> + { + foreach, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + [ + ?TDEF_FE(t_read_write_reset) + ] + }. + +t_read_write_reset(_) -> + Plugin = <<"scanner_plugin_abc">>, + ok = reset(Plugin), + ?assertEqual(ok, write(Plugin, #{<<"foo">> => 1})), + ?assertEqual(#{<<"foo">> => 1}, read(Plugin)), + ?assertEqual(ok, write(Plugin, #{<<"bar">> => 2})), + ?assertEqual(#{<<"bar">> => 2}, read(Plugin)), + ?assertEqual(not_found, read(<<"scanner_plugin_other">>)), + ?assertEqual(ok, reset(Plugin)), + ?assertEqual(not_found, read(Plugin)). + +-endif. diff --git a/src/couch_scanner/src/couch_scanner_plugin.erl b/src/couch_scanner/src/couch_scanner_plugin.erl new file mode 100644 index 00000000000..cd5f1d89204 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_plugin.erl @@ -0,0 +1,489 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under1 +% the License. + +% Scanner plugin runner process +% +% This is the process which is spawned and run for each enabled plugin. +% +% A number of these processes are managed by the couch_scanner_server via +% start_link/1 and stop/1 functions. After a plugin runner is spawned, the only +% thing couch_scanner_server does is wait for it to exit. +% +% The plugin runner process may exit normally, crash, or exit with {shutdown, +% {reschedule, TSec}} if they want to reschedule to run again at some point the +% future (next day, a week later, etc). +% +% After the process starts, it will load and validate the plugin module. Then, +% it will start scanning all the dbs and docs on the local node. Shard ranges +% will be scanned only on one of the cluster nodes to avoid duplicating work. +% For instance, if there are 2 shard ranges, 0-7, 8-f, with copies on nodes n1, +% n2, n3. Then, 0-7 might be scanned on n1 only, and 8-f on n3. +% +% The plugin API defined in the behavior definition section. +% +% The start/1 function is called when the plugin starts running. It returns +% some context (St), which can be any Erlang term. All subsequent function +% calls will be called with the same St object, and may return an updated +% version of it. +% +% If the plugin hasn't finished runing and has resumed running after the node +% was restarted or an error happened, the resume/1 function will be called. +% That's the difference between start and resume: start/1 is called when the +% scan starts from the beginning (first db, first shard, ...), and resume/1 is +% called when the scanning hasn't finished and has to continue. +% +% The checkpoint/1 callback is periodically called to checkpoint the scanning +% progress. start/1 and resume/1 function will be called with the last saved +% checkpoint map value. +% +% The stop/1 callback is called when the scan has finished. The stop callback +% should return ok or {reschedule, TSec} and final checkpoint map. The last +% checkoint will be written and then, if {reschedule, TSec} is specified, it +% will be rescheduled to re-run again at the specified time. +% +% As the cluster dbs, shards, ddocs and individual docs are discovered during +% scanning, the appropriate callbacks will be called. Most callbacks, besides +% the updated St object, can reply with ok, skip or stop tags. The meaning of +% those are: +% +% * ok - continue to the next object +% +% * skip - skip the current object and don't scan its internal (ex: skip a db and +% don't scan its ddocs, but continue with the next db) +% +% * stop - stop scanning any remaining objects of that type (ex: don't scan any more dbs) +% + +-module(couch_scanner_plugin). + +-export([ + % Main plugin process API + spawn_link/1, + stop/1, + % Utility functions + new_scan_id/0, + log/5, + tsec/0, + ejson_map/1, + config_match_patterns/2, + % Internal export + run/1 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +% Behaviour callback definitions + +-callback start(EJson :: #{}) -> + {ok, St :: term()} | {reschedule, TSec :: integer()}. +-callback resume(Json :: #{}) -> + {ok, St :: term()} | {reschedule, TSec :: integer()}. +-callback stop(St :: term()) -> + {ok | {reschedule, TSec :: integer()}, EJson :: #{}}. +-callback checkpoint(St :: term()) -> + {ok, EJson :: #{}}. +-callback db(St :: term(), DbName :: binary()) -> + {ok | skip | stop, St1 :: term()}. +-callback ddoc(St :: term(), DbName :: binary(), #doc{}) -> + {ok | stop, St1 :: term()}. +-callback shards(St :: term(), [#shard{}]) -> + {[#shard{}], St1 :: term()}. +-callback db_opened(St :: term(), Db :: term()) -> + {ok, St :: term()}. +-callback doc_id(St :: term(), DocId :: binary(), Db :: term()) -> + {ok | skip | stop, St1 :: term()}. +-callback doc(St :: term(), Db :: term(), #doc{}) -> + {ok | stop, St1 :: term()}. +-callback db_closing(St :: term(), Db :: term()) -> + {ok, St1 :: term()}. + +-define(CHECKPOINT_INTERVAL_SEC, 10). +-define(STOP_TIMEOUT_MSEC, 5000). + +-record(st, { + id, + mod, + pst, + cursor, + shards_db, + db, + checkpoint_tsec = 0, + skip_dbs, + skip_ddocs, + skip_docs +}). + +spawn_link(<>) -> + proc_lib:spawn_link(?MODULE, run, [Id]). + +stop(Pid) when is_pid(Pid) -> + unlink(Pid), + Ref = erlang:monitor(process, Pid), + Pid ! stop, + receive + {'DOWN', Ref, _, _, _} -> ok + after ?STOP_TIMEOUT_MSEC -> + exit(Pid, kill), + receive + {'DOWN', Ref, _, _, _} -> ok + end + end, + ok. + +% Utilities used by plugins + +new_scan_id() -> + TSec = integer_to_binary(erlang:system_time(second)), + Rand = string:lowercase(binary:encode_hex(crypto:strong_rand_bytes(6))), + <>. + +log(Level, Mod, Fmt, Args, #{} = Meta) when + is_atom(Level), is_atom(Mod), is_list(Fmt), is_list(Args) +-> + {MetaFmt, MetaArgs} = log_format_meta(Mod, Meta), + couch_log:Level(lists:flatten([MetaFmt, Fmt]), MetaArgs ++ Args). + +tsec() -> + erlang:system_time(second). + +ejson_map(Obj) -> + jiffy:decode(jiffy:encode(Obj), [return_maps]). + +config_match_patterns(Module, Type) -> + Section = atom_to_list(Module) ++ "." ++ Type, + Fun = fun + ({K, "true"}, Acc) -> [list_to_binary(K) | Acc]; + ({_, _}, Acc) -> Acc + end, + Items = lists:foldl(Fun, [], config:get(Section)), + case Items of + [] -> undefined; + [<<_/binary>> | _] -> binary:compile_pattern(Items) + end. + +% Main run function + +run(<>) -> + Mod = plugin_mod(Id), + St = #st{id = Id, mod = Mod}, + St1 = init_config(St), + St2 = init_from_checkpoint(St1), + St3 = scan_dbs(St2), + finalize(St3). + +% Private functions + +scan_dbs(#st{cursor = Cursor} = St) -> + DbsDbName = mem3_sync:shards_db(), + ioq:set_io_priority({system, DbsDbName}), + {ok, Db} = mem3_util:ensure_exists(DbsDbName), + St1 = St#st{shards_db = Db}, + Opts = [{start_key, Cursor}], + try + {ok, St2} = couch_db:fold_docs(Db, fun scan_dbs_fold/2, St1, Opts), + St2#st{shards_db = undefined} + after + couch_db:close(Db) + end. + +scan_dbs_fold(#full_doc_info{} = FDI, #st{shards_db = Db} = Acc) -> + Acc1 = Acc#st{cursor = FDI#full_doc_info.id}, + Acc2 = maybe_checkpoint(Acc1), + case couch_db:open_doc(Db, FDI, [ejson_body]) of + {ok, #doc{id = <<"_design/", _/binary>>}} -> + {ok, Acc2}; + {ok, #doc{id = DbName, body = Body}} -> + scan_db(shards(DbName, Body), Acc2) + end. + +scan_db([], #st{} = St) -> + {ok, St}; +scan_db([_ | _] = Shards, #st{} = St) -> + #st{cursor = DbName, mod = Mod, pst = PSt, skip_dbs = Skip} = St, + case match_skip_pat(DbName, Skip) of + false -> + {Go, PSt1} = Mod:db(PSt, DbName), + St1 = St#st{pst = PSt1}, + case Go of + ok -> + St2 = fold_ddocs(fun scan_ddocs_fold/2, St1), + {Shards1, St3} = shards_callback(St2, Shards), + St4 = scan_shards(Shards1, St3), + {ok, St4}; + skip -> + {ok, St1}; + stop -> + {stop, St1} + end; + true -> + {ok, St} + end. + +scan_ddocs_fold({meta, _}, #st{} = Acc) -> + {ok, Acc}; +scan_ddocs_fold({row, RowProps}, #st{} = Acc) -> + DDoc = couch_util:get_value(doc, RowProps), + scan_ddoc(couch_doc:from_json_obj(DDoc), Acc); +scan_ddocs_fold(complete, #st{} = Acc) -> + {ok, Acc}; +scan_ddocs_fold({error, Error}, _Acc) -> + exit({shutdown, {scan_ddocs_fold, Error}}). + +scan_shards([], #st{} = St) -> + St; +scan_shards([#shard{} = Shard | Rest], #st{} = St) -> + St1 = maybe_checkpoint(St), + St2 = scan_docs(St1, Shard), + scan_shards(Rest, St2). + +scan_ddoc(#doc{id = DDocId} = DDoc, #st{} = St) -> + #st{cursor = DbName, mod = Mod, pst = PSt, skip_ddocs = Skip} = St, + case match_skip_pat(DDocId, Skip) of + false -> + {Go, PSt1} = Mod:ddoc(PSt, DbName, DDoc), + St1 = St#st{pst = PSt1}, + case Go of + ok -> {ok, St1}; + skip -> {ok, St1}; + stop -> {stop, St1} + end; + true -> + {ok, St} + end. + +scan_docs(#st{} = St, #shard{name = ShardDbName}) -> + case couch_db:open_int(ShardDbName, [?ADMIN_CTX]) of + {ok, Db} -> + St1 = St#st{db = Db}, + St2 = db_opened_callback(St1), + {ok, St3} = couch_db:fold_docs(Db, fun scan_docs_fold/2, St2, []), + St4 = db_closing_callback(St3), + couch_db:close(Db), + St4#st{db = undefined}; + {not_found, _} -> + St + end. + +scan_docs_fold(#full_doc_info{id = Id} = FDI, #st{} = St) -> + #st{db = Db, mod = Mod, pst = PSt, skip_dbs = Skip} = St, + case match_skip_pat(Id, Skip) of + false -> + {Go, PSt1} = Mod:doc_id(PSt, Id, Db), + St1 = St#st{pst = PSt1}, + case Go of + ok -> scan_doc(FDI, St1); + skip -> {ok, St1}; + stop -> {stop, St1} + end; + true -> + {ok, St} + end. + +scan_doc(#full_doc_info{} = FDI, #st{} = St) -> + St1 = maybe_checkpoint(St), + #st{db = Db, mod = Mod, pst = PSt} = St1, + {ok, #doc{} = Doc} = couch_db:open_doc(Db, FDI, [ejson_body]), + {Go, PSt1} = Mod:doc(PSt, Db, Doc), + case Go of + ok -> {ok, St1#st{pst = PSt1}}; + stop -> {stop, St1#st{pst = PSt1}} + end. + +maybe_checkpoint(#st{checkpoint_tsec = LastCheckpointTSec} = St) -> + receive + stop -> + checkpoint(St), + exit({shutdown, stop}) + after 0 -> + ok + end, + erlang:garbage_collect(), + case tsec() - LastCheckpointTSec > ?CHECKPOINT_INTERVAL_SEC of + true -> checkpoint(St); + false -> St + end. + +checkpoint(#st{} = St) -> + #st{id = Id, mod = Mod, pst = PSt, cursor = Cursor} = St, + JsonPSt = checkpoint_callback(Mod, PSt), + EJson = #{ + <<"cursor">> => Cursor, + <<"pst">> => JsonPSt, + <<"state">> => <<"running">> + }, + ok = couch_scanner_checkpoint:write(Id, EJson), + St#st{checkpoint_tsec = tsec()}. + +init_from_checkpoint(#st{id = Id, mod = Mod} = St) -> + case couch_scanner_checkpoint:read(Id) of + #{<<"state">> := <<"running">>, <<"cursor">> := Cur, <<"pst">> := EJson} -> + PSt1 = resume_callback(Mod, EJson), + St#st{pst = PSt1, cursor = Cur, checkpoint_tsec = tsec()}; + not_found -> + PSt1 = start_callback(Mod, #{}), + Cur = <<>>, + ok = start_checkpoint(Id, Mod, Cur, PSt1), + St#st{pst = PSt1, cursor = Cur, checkpoint_tsec = tsec()}; + #{<<"state">> := <<"finished">>, <<"pst">> := EJson} -> + PSt1 = start_callback(Mod, EJson), + Cur = <<>>, + ok = start_checkpoint(Id, Mod, Cur, PSt1), + St#st{pst = PSt1, cursor = Cur, checkpoint_tsec = tsec()} + end. + +start_callback(Mod, EJson) when is_atom(Mod) -> + case Mod:start(EJson) of + {ok, PSt} -> PSt; + {reschedule, TSec} -> exit({shutdown, {reschedule, TSec}}) + end. + +resume_callback(Mod, EJson) when is_atom(Mod) -> + case Mod:resume(EJson) of + {ok, PSt} -> PSt; + {reschedule, TSec} -> exit({shutdown, {reschedule, TSec}}) + end. + +db_opened_callback(#st{pst = PSt, mod = Mod, db = Db} = St) -> + {ok, PSt1} = Mod:db_opened(PSt, Db), + St#st{pst = PSt1}. + +db_closing_callback(#st{pst = PSt, mod = Mod, db = Db} = St) -> + {ok, PSt1} = Mod:db_closing(PSt, Db), + St#st{pst = PSt1}. + +shards_callback(#st{pst = PSt, mod = Mod} = St, Shards) -> + {Shards1, PSt1} = Mod:shards(PSt, Shards), + {Shards1, St#st{pst = PSt1}}. + +finalize(#st{id = Id, mod = Mod, pst = PSt}) -> + {Go, #{} = PStEJson} = Mod:stop(PSt), + EJson = #{ + <<"cursor">> => <<>>, + <<"pst">> => ejson_map(PStEJson), + <<"state">> => <<"finished">> + }, + ok = couch_scanner_checkpoint:write(Id, EJson), + case Go of + ok -> ok; + {reschedule, TSec} -> exit({shutdown, {reschedule, TSec}}) + end. + +start_checkpoint(Id, Mod, Cur, PSt1) -> + PStEJson = checkpoint_callback(Mod, PSt1), + EJson = #{ + <<"cursor">> => Cur, + <<"pst">> => PStEJson, + <<"state">> => <<"running">> + }, + ok = couch_scanner_checkpoint:write(Id, EJson). + +checkpoint_callback(Mod, PSt) -> + {ok, #{} = PStEJson} = Mod:checkpoint(PSt), + ejson_map(PStEJson). + +plugin_mod(<>) -> + Mod = binary_to_atom(Plugin), + case code:ensure_loaded(Mod) of + {module, _} -> + check_callbacks(Mod), + Mod; + {error, Error} -> + error({?MODULE, {missing_plugin_module, Mod, Error}}) + end. + +check_callbacks(Mod) when is_atom(Mod) -> + Cbks = [ + {start, 1}, + {resume, 1}, + {stop, 1}, + {checkpoint, 1}, + {db, 2}, + {ddoc, 3}, + {shards, 2}, + {db_opened, 2}, + {doc_id, 3}, + {doc, 3}, + {db_closing, 2} + ], + Fun = fun({F, A}) -> + case erlang:function_exported(Mod, F, A) of + true -> ok; + false -> error({?MODULE, {undefined_plugin_fun, Mod, F, A}}) + end + end, + lists:foreach(Fun, Cbks). + +shards(DbName, {Props = [_ | _]}) -> + Shards = lists:sort(mem3_util:build_shards(DbName, Props)), + Fun = fun({R, SList}) -> + case mem3_util:rotate_list({DbName, R}, SList) of + [#shard{node = N} = S | _] when N =:= node() -> + {true, S}; + [_ | _] -> + false + end + end, + lists:filtermap(Fun, shards_by_range(lists:sort(Shards))). + +shards_by_range(Shards) -> + Fun = fun(#shard{range = R} = S, Acc) -> orddict:append(R, S, Acc) end, + Dict = lists:foldl(Fun, orddict:new(), Shards), + orddict:to_list(Dict). + +fold_ddocs(Fun, #st{cursor = DbName} = Acc) -> + QArgs = #mrargs{ + include_docs = true, + extra = [{namespace, <<"_design">>}] + }, + {ok, Acc1} = fabric:all_docs(DbName, [?ADMIN_CTX], Fun, Acc, QArgs), + Acc1. + +init_config(#st{mod = Mod} = St) -> + St#st{ + skip_dbs = config_match_patterns(Mod, "skip_dbs"), + skip_ddocs = config_match_patterns(Mod, "skip_ddocs"), + skip_docs = config_match_patterns(Mod, "skip_docs") + }. + +match_skip_pat(<<_/binary>>, undefined) -> + false; +match_skip_pat(<<_/binary>> = Bin, Pat) -> + binary:match(Bin, Pat) /= nomatch. + +log_format_meta(Mod, #{} = Meta) -> + SId = {"s:~s ", maps:get(sid, Meta, undefined)}, + Fun = {"f:~s ", maps:get(fn, Meta, undefined)}, + Db = {"db:~s ", format_db(maps:get(db, Meta, undefined))}, + DDocId = {"ddoc:~s ", maps:get(ddoc, Meta, undefined)}, + DocId = {"doc:~s ", maps:get(doc, Meta, undefined)}, + FmtArgs = [{"~s ", Mod}, SId, Fun, Db, DDocId, DocId], + lists:unzip([{Fmt, Arg} || {Fmt, Arg} <- FmtArgs, Arg /= undefined]). + +format_db(undefined) -> + undefined; +format_db(Db) when is_list(Db) -> + format_db(list_to_binary(Db)); +format_db(#shard{dbname = Db, range = [B, E]}) -> + {BStr, EStr} = {hex(B), hex(E)}, + <>; +format_db(Db) when is_tuple(Db) -> + format_db(couch_db:name(Db)); +format_db(<<"shards/", B:8/binary, "-", E:8/binary, "/", Rest/binary>>) -> + [Db, _] = binary:split(Rest, <<".">>), + <>; +format_db(<>) -> + Db. + +hex(Val) when is_integer(Val) -> + string:lowercase(erlang:integer_to_binary(Val, 16)). diff --git a/src/couch_scanner/src/couch_scanner_plugin.hrl b/src/couch_scanner/src/couch_scanner_plugin.hrl new file mode 100644 index 00000000000..1a0db3da7af --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_plugin.hrl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +-define(LOG(LVL, FMT, ARGS, META), couch_scanner_plugin:log(LVL, ?MODULE, FMT, ARGS, maps:merge(META, #{fn => ?FUNCTION_NAME}))). +-define(INFO(FMT, ARGS, META), ?LOG(info, FMT, ARGS, META)). +-define(INFO(FMT, ARGS), ?LOG(info, FMT, ARGS, #{})). +-define(INFO(META), ?LOG(info, "", [], META)). +-define(INFO(), ?LOG(info, "", [], #{})). +-define(WARN(FMT, ARGS, META), ?LOG(warn, FMT, ARGS, META)). diff --git a/src/couch_scanner/src/couch_scanner_plugin_quickjs.erl b/src/couch_scanner/src/couch_scanner_plugin_quickjs.erl new file mode 100644 index 00000000000..2c82bd99586 --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_plugin_quickjs.erl @@ -0,0 +1,361 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_plugin_quickjs). +-behaviour(couch_scanner_plugin). + +-export([ + start/1, + resume/1, + stop/1, + checkpoint/1, + db/2, + ddoc/3, + shards/2, + db_opened/2, + doc_id/3, + doc/3, + db_closing/2 +]). + +-include("couch_scanner_plugin.hrl"). + +-record(st, { + sid, + ddocs = #{}, + docs = [], + qjs_proc, + sm_proc, + ddoc_cnt = 0, + doc_cnt = 0, + doc_step = 0, + max_ddocs = 100, + max_shards = 4, + max_docs = 1000, + max_step = 1000, + batch_size = 100 +}). + +% DDoc fields + +-define(FILTERS, <<"filters">>). +-define(VIEWS, <<"views">>). +-define(MAP, <<"map">>). +-define(REDUCE, <<"reduce">>). +-define(LIB, <<"lib">>). +-define(VDU, <<"validate_doc_update">>). + +% Behavior callbacks + +start(#{} = JsonSt) -> + St = #st{sid = couch_scanner_plugin:new_scan_id()}, + case JsonSt of + #{<<"sid">> := OldScanId, <<"finished_at">> := OldTSec} -> + Msg = "old sid::~s finished at:~p", + Args = [OldScanId, OldTSec], + ?INFO(Msg, Args, #{sid => St#st.sid}); + #{} -> + ?INFO("No old scan results found", [], #{sid => St#st.sid}) + end, + % {reschedule, TSec} + {ok, init_config(St)}. + +resume(#{} = JsonSt) -> + #{<<"sid">> := SId} = JsonSt, + ?INFO(#{sid => SId}), + St = #st{sid = SId}, + % {reschedule, TSec} + {ok, init_config(St)}. + +stop(#st{sid = SId}) -> + TSec = couch_scanner_plugin:tsec(), + FinalJsonSt = #{<<"sid">> => SId, <<"finished_at">> => TSec}, + ?INFO("finished at ~p", [TSec], #{sid => SId}), + % {{reschedule, TSec}, #{}} + {ok, FinalJsonSt}. + +checkpoint(#st{sid = SId}) -> + ?INFO(#{sid => SId}), + {ok, #{<<"sid">> => SId}}. + +db(#st{sid = SId} = St, DbName) -> + St1 = reset_per_db_state(St), + ?INFO(#{sid => SId, db => DbName}), + {ok, St1}. + +ddoc(#st{ddoc_cnt = C, max_ddocs = M} = St, _, _) when C > M -> + {stop, St}; +ddoc(#st{sid = SId} = St, DbName, #doc{id = DDocId} = DDoc) -> + ?INFO(#{sid => SId, db => DbName, ddoc => DDocId}), + #st{ddoc_cnt = Cnt} = St, + St1 = St#st{ddoc_cnt = Cnt + 1}, + #doc{body = {Props = [_ | _]}} = DDoc, + case couch_util:get_value(<<"language">>, Props, <<"javascript">>) of + <<"javascript">> -> {ok, process_ddoc(St1, DbName, DDoc)}; + _ -> {ok, St1} + end. + +shards(#st{max_shards = Max, ddocs = DDocs} = St, Shards) -> + case {map_size(DDocs), lists:sublist(Shards, Max)} of + {0, _} -> {[], St}; + {_, []} -> {[], St}; + {_, Shards1} -> {Shards1, St} + end. + +db_opened(#st{} = St, Db) -> + #st{max_docs = MaxDocs, max_step = MaxStep} = St, + {ok, DocTotal} = couch_db:get_doc_count(Db), + Step = min(MaxStep, max(1, DocTotal div MaxDocs)), + {ok, St#st{doc_cnt = 0, doc_step = Step, docs = []}}. + +doc_id(#st{doc_cnt = C, max_docs = M} = St, _, _) when C > M -> + {stop, St}; +doc_id(#st{doc_cnt = C, doc_step = S} = St, _, _) when C rem S == 0 -> + {skip, St#st{doc_cnt = C + 1}}; +doc_id(#st{doc_cnt = C} = St, _, _) -> + {ok, St#st{doc_cnt = C + 1}}. + +doc(#st{docs = Docs, batch_size = B} = St, _Db, Doc) when length(Docs) < B -> + {ok, St#st{docs = [Doc | Docs]}}; +doc(#st{docs = Docs} = St, Db, Doc) -> + St1 = process_docs(St#st{docs = []}, Db, lists:reverse([Doc | Docs])), + % {stop, St} + {ok, St1}. + +db_closing(#st{docs = Docs} = St, Db) -> + St1 = process_docs(St, Db, Docs), + {ok, St1#st{doc_cnt = 0, doc_step = 0, docs = []}}. + +% Private + +process_docs(#st{} = St, Db, Docs) -> + #st{sid = SId, ddocs = DDocs} = St, + Meta = #{sid => SId, db => Db}, + ?INFO("docs:~p ddocs:~p", [length(Docs), map_size(DDocs)], Meta), + St. + +process_ddoc(#st{} = St, DbName, #doc{} = DDoc0) -> + #st{sid = SId, ddocs = DDocs} = St, + #doc{id = DDocId, body = Body} = DDoc0, + DDoc = couch_scanner_plugin:ejson_map(Body), + case map_size(DDoc) > 0 of + true -> + St1 = start_or_reset_procs(St), + try + lib_validate(St1, maps:get(?LIB, DDoc, undefined)), + views_validate(St1, maps:get(?VIEWS, DDoc, undefined)), + filters_validate(St1, maps:get(?FILTERS, DDoc, undefined)), + vdu_validate(St1, maps:get(?VDU, DDoc, undefined)), + reset_procs(St1), + teach_ddoc_validate(St1, DDocId, DDoc), + St1#st{ddocs = DDocs#{DDocId => DDoc}} + catch + throw:{validate, Error} -> + Meta = #{sid => SId, db => DbName, ddoc => DDocId}, + ?WARN("Validation failed ~p", [Error], Meta), + St1 + end; + false -> + St + end. + +reset_per_db_state(#st{qjs_proc = QjsProc, sm_proc = SmProc} = St) -> + proc_stop(QjsProc), + proc_stop(SmProc), + St#st{ + ddocs = #{}, + docs = [], + qjs_proc = undefined, + sm_proc = undefined, + ddoc_cnt = 0 + }. + +start_or_reset_procs(#st{} = St) -> + St1 = start_or_reset_qjs_proc(St), + start_or_reset_sm_proc(St1). + +start_or_reset_qjs_proc(#st{qjs_proc = undefined} = St) -> + Cmd = couch_quickjs:mainjs_cmd(), + {ok, Pid} = couch_os_process:start_link(Cmd), + Proc = proc(Pid), + true = proc_reset(Proc), + St#st{qjs_proc = Proc}; +start_or_reset_qjs_proc(#st{qjs_proc = #proc{} = Proc} = St) -> + try + true = proc_reset(Proc), + St + catch + _:_ -> + proc_stop(Proc), + start_or_reset_qjs_proc(St#st{qjs_proc = undefined}) + end. + +start_or_reset_sm_proc(#st{sm_proc = undefined} = St) -> + Cmd = os:getenv("COUCHDB_QUERY_SERVER_JAVASCRIPT"), + {ok, Pid} = couch_os_process:start_link(Cmd), + Proc = proc(Pid), + true = proc_reset(Proc), + St#st{sm_proc = Proc}; +start_or_reset_sm_proc(#st{sm_proc = #proc{} = Proc} = St) -> + try + true = proc_reset(Proc), + St + catch + _:_ -> + proc_stop(Proc), + start_or_reset_sm_proc(St#st{sm_proc = undefined}) + end. + +lib_validate(#st{}, undefined) -> + ok; +lib_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, #{} = Lib) -> + QjsRes = add_lib(Qjs, Lib), + SmRes = add_lib(Sm, Lib), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {add_lib, QjsRes, SmRes}}) + end. + +add_lib(#proc{} = Proc, #{} = Lib) -> + try + true = prompt(Proc, [<<"add_lib">>, Lib]), + ok + catch + Tag:Err -> + {error, {Tag, Err}} + end. + +views_validate(#st{}, undefined) -> + ok; +views_validate(#st{} = St, #{} = Views) -> + Fun = fun(Name, #{} = View) -> view_validate(St, Name, View) end, + maps:foreach(Fun, Views). + +view_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, Name, View) -> + #{?MAP := MapSrc} = View, + QjsMapRes = add_fun(Qjs, MapSrc), + SmMapRes = add_fun(Sm, MapSrc), + case QjsMapRes == SmMapRes of + true -> ok; + false -> throw({validate, {add_fun, map, Name, QjsMapRes, SmMapRes}}) + end, + RedSrc = maps:get(?REDUCE, View, undefined), + QjsRedRes = add_fun(Qjs, RedSrc), + SmRedRes = add_fun(Sm, RedSrc), + case QjsRedRes == SmRedRes of + true -> ok; + false -> throw({validate, {add_fun, red, Name, QjsRedRes, SmRedRes}}) + end. + +add_fun(#proc{}, undefined) -> + ok; +add_fun(#proc{}, <<"_", _/binary>>) -> + % Built-in reduce likely + ok; +add_fun(#proc{} = Proc, <<_/binary>> = FunSrc) -> + try + true = prompt(Proc, [<<"add_fun">>, FunSrc]), + ok + catch + Tag:Err -> + {error, {Tag, Err}} + end. + +filters_validate(#st{}, undefined) -> + ok; +filters_validate(#st{} = St, #{} = Filters) -> + Fun = fun(Name, Filter) -> filter_validate(St, Name, Filter) end, + maps:foreach(Fun, Filters). + +filter_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, Name, Filter) -> + QjsRes = add_fun(Qjs, Filter), + SmRes = add_fun(Sm, Filter), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {filter, Name, QjsRes, SmRes}}) + end. + +vdu_validate(#st{}, undefined) -> + ok; +vdu_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, VDU) -> + QjsRes = add_fun(Qjs, VDU), + SmRes = add_fun(Sm, VDU), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {vdu, QjsRes, SmRes}}) + end. + +reset_procs(#st{qjs_proc = #proc{} = QjsProc, sm_proc = #proc{} = SmProc}) -> + true = proc_reset(QjsProc), + true = proc_reset(SmProc), + ok. + +proc(Pid) -> + #proc{ + pid = Pid, + prompt_fun = {couch_os_process, prompt}, + set_timeout_fun = {couch_os_process, set_timeout}, + stop_fun = {couch_os_process, stop} + }. + +prompt(#proc{} = Proc, Prompt) -> + couch_query_servers:proc_prompt(Proc, Prompt). + +proc_set_timeout(Proc, Timeout) -> + {Mod, Func} = Proc#proc.set_timeout_fun, + apply(Mod, Func, [Proc#proc.pid, Timeout]). + +teach_ddoc_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, DDocId, DDoc) -> + QjsRes = teach_ddoc(Qjs, DDocId, DDoc), + SmRes = teach_ddoc(Sm, DDocId, DDoc), + case QjsRes == SmRes of + true -> ok; + false -> throw({validate, {teach_ddoc, DDocId, QjsRes, SmRes}}) + end. + +teach_ddoc(#proc{} = Proc, DDocId, DDoc) -> + try + true = prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) + catch + Tag:Err -> + {error, {Tag, Err}} + end. + +proc_reset(#proc{} = Proc) -> + Timeout = config:get_integer("couchdb", "os_process_timeout", 5000), + Cfg = [{<<"reduce_limit">>, true}, {<<"timeout">>, Timeout}], + Result = prompt(Proc, [<<"reset">>, {Cfg}]), + proc_set_timeout(Proc, Timeout), + Result. + +proc_stop(undefined) -> + ok; +proc_stop(#proc{pid = Pid} = Proc) -> + unlink(Pid), + Ref = monitor(process, Pid), + {Mod, Func} = Proc#proc.stop_fun, + apply(Mod, Func, [Pid]), + receive + {'DOWN', Ref, _, _, _} -> ok + end. + +init_config(#st{} = St) -> + St#st{ + max_ddocs = cfg_int("max_ddocs", St#st.max_ddocs), + max_shards = cfg_int("max_shards", St#st.max_shards), + max_docs = cfg_int("max_docs", St#st.max_docs), + max_step = cfg_int("max_step", St#st.max_step), + batch_size = cfg_int("batch_size", St#st.batch_size) + }. + +cfg_int(Key, Default) when is_list(Key), is_integer(Default) -> + config:get_integer(atom_to_list(?MODULE), Key, Default). diff --git a/src/couch_scanner/src/couch_scanner_server.erl b/src/couch_scanner/src/couch_scanner_server.erl new file mode 100644 index 00000000000..c818f9ed66c --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_server.erl @@ -0,0 +1,217 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% Scanner plugin server. +% +% This is gen_server starts, stops and reschedules plugin processes. +% + +-module(couch_scanner_server). + +-export([ + start_link/0, + status/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +-define(SCHED_INTERVAL_SEC, 5). +-define(PENALTY_BASE_SEC, 60). +-define(MAX_PENALTY_SEC, 8 * 3600). +-define(HEAL_THRESHOLD_SEC, 5 * 60). + +-record(sched, { + start_time = 0, + error_count = 0, + reschedule = 0 +}). + +-record(st, { + pids = #{}, + scheduling = #{}, + tref +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +status() -> + gen_server:call(?MODULE, status, infinity). + +% Private + +init(_Args) -> + process_flag(trap_exit, true), + {ok, #st{}, interval_msec()}. + +terminate(_Reason, #st{pids = Pids} = St) -> + ToStop = maps:keys(Pids), + lists:foldl(fun stop_plugin/2, St, ToStop), + ok. + +handle_call(status, _From, #st{} = St) -> + Fun = fun(_, Sched) -> + #{ + start_time => Sched#sched.start_time, + error_count => Sched#sched.error_count, + reschedule => Sched#sched.reschedule + } + end, + SchedMaps = maps:map(Fun, St#st.scheduling), + {reply, #{pids => St#st.pids, scheduling => SchedMaps}, St}; +handle_call(Msg, _From, #st{} = St) -> + couch_log:error("~p : unknown call ~p", [?MODULE, Msg]), + {reply, {error, {invalid_call, Msg}}, St}. + +handle_cast(Msg, #st{} = St) -> + couch_log:error("~p : unknown cast ~p", [?MODULE, Msg]), + {noreply, St}. + +handle_info(timeout, #st{} = St) -> + St1 = + case in_maintenance() of + true -> stop_in_maintenance(St); + false -> start_stop_cfg(St) + end, + {noreply, schedule_timeout(St1)}; +handle_info({'EXIT', Pid, Reason}, #st{pids = Pids} = St) -> + case maps:filter(fun(_, P) -> P =:= Pid end, Pids) of + Map when map_size(Map) == 1 -> + [{Id, Pid}] = maps:to_list(Map), + St1 = St#st{pids = maps:remove(Id, Pids)}, + {noreply, handle_exit(Id, Reason, St1)}; + Map when map_size(Map) == 0 -> + {noreply, St} + end; +handle_info(Msg, St) -> + couch_log:error("~p : unknown info message ~p", [?MODULE, Msg]), + {noreply, St}. + +stop_in_maintenance(#st{pids = Pids} = St) -> + case map_size(Pids) > 0 of + true -> + couch_log:info("~p stopping in maintenance mode", [?MODULE]), + lists:foldl(fun stop_plugin/2, St, maps:keys(Pids)); + false -> + St + end. + +start_stop_cfg(#st{pids = Pids, scheduling = Scheduling} = St) -> + PluginIds = plugins(), + RunningIds = maps:keys(Pids), + ToStart = PluginIds -- RunningIds, + ToStop = RunningIds -- PluginIds, + St1 = lists:foldl(fun stop_plugin/2, St, ToStop), + lists:foreach(fun couch_scanner_checkpoint:reset/1, ToStop), + ToRemove = maps:keys(Scheduling) -- PluginIds, + St2 = St1#st{scheduling = maps:without(ToRemove, Scheduling)}, + lists:foldl(fun start_plugin/2, St2, ToStart). + +stop_plugin(Id, #st{} = St) -> + #st{pids = Pids, scheduling = Scheduling} = St, + {Pid, Pids1} = maps:take(Id, Pids), + #{Id := #sched{} = Sched} = Scheduling, + couch_log:info("~p : stopping ~s", [?MODULE, Id]), + ok = couch_scanner_plugin:stop(Pid), + Sched1 = Sched#sched{start_time = 0, reschedule = 0}, + Scheduling1 = Scheduling#{Id := Sched1}, + St#st{pids = Pids1, scheduling = Scheduling1}. + +start_plugin(Id, #st{pids = Pids, scheduling = Scheduling} = St) -> + Sched = maps:get(Id, Scheduling, #sched{}), + NowSec = tsec(), + case NowSec >= Sched#sched.reschedule of + true -> + couch_log:info("~p : starting ~s", [?MODULE, Id]), + Pids1 = Pids#{Id => couch_scanner_plugin:spawn_link(Id)}, + Sched1 = Sched#sched{ + start_time = NowSec, + reschedule = 0 + }, + Scheduling1 = Scheduling#{Id => Sched1}, + St#st{pids = Pids1, scheduling = Scheduling1}; + false -> + St + end. + +plugins() -> + Fun = fun + ({K, "true"}, Acc) -> + FullName = "couch_scanner_plugin_" ++ K, + [list_to_binary(FullName) | Acc]; + ({_, _}, Acc) -> + Acc + end, + lists:foldl(Fun, [], config:get("couch_scanner_plugins")). + +handle_exit(Id, Reason, #st{} = St) -> + #st{scheduling = Scheduling} = St, + #{Id := Sched} = Scheduling, + Sched1 = sched_exit_update(Id, Sched, Reason), + St#st{scheduling = Scheduling#{Id := Sched1}}. + +sched_exit_update(Id, #sched{} = Sched, {shutdown, {reschedule, TSec}}) -> + couch_log:notice("~p : ~s rescheduled after ~p", [?MODULE, Id, TSec]), + Sched#sched{start_time = 0, error_count = 0, reschedule = TSec}; +sched_exit_update(Id, #sched{} = Sched, Norm) when + Norm == shutdown; Norm == normal +-> + couch_log:notice("~p : ~s finished", [?MODULE, Id]), + Sched#sched{start_time = 0, error_count = 0, reschedule = infinity}; +sched_exit_update(Id, #sched{} = Sched, Error) -> + couch_log:error("~p : ~s exited with error ~p", [?MODULE, Id, Error]), + #sched{start_time = StartSec, error_count = ErrorCount} = Sched, + Sched1 = Sched#sched{start_time = 0}, + NowSec = tsec(), + % If process has been running successfully for a while without crashing + % reset (forgive) its previous errors. + case NowSec - StartSec =< heal_threshold_sec() of + true -> penalize(NowSec, Sched1#sched{error_count = 1}); + false -> penalize(NowSec, Sched1#sched{error_count = ErrorCount + 1}) + end. + +penalize(NowSec, #sched{error_count = ErrorCount} = Sched) -> + PenaltySec = ?PENALTY_BASE_SEC * (1 bsl (ErrorCount - 1)), + PenaltySec1 = min(PenaltySec, max_penalty_sec()), + Sched#sched{reschedule = NowSec + PenaltySec1}. + +in_maintenance() -> + "false" /= config:get("couchdb", "maintenance_mode", "false"). + +tsec() -> + erlang:system_time(second). + +schedule_timeout(#st{tref = TRef} = St) -> + case TRef of + undefined -> ok; + _ when is_reference(TRef) -> erlang:cancel_timer(TRef) + end, + TRef1 = erlang:send_after(interval_msec(), self(), timeout), + St#st{tref = TRef1}. + +interval_msec() -> + 1000 * cfg_int("sched_interval_sec", ?SCHED_INTERVAL_SEC). + +heal_threshold_sec() -> + cfg_int("heal_threshold_sec", ?HEAL_THRESHOLD_SEC). + +max_penalty_sec() -> + cfg_int("max_penalty_sec", ?MAX_PENALTY_SEC). + +cfg_int(Key, Default) when is_list(Key), is_integer(Default) -> + config:get_integer("couch_scanner", Key, Default). diff --git a/src/couch_scanner/src/couch_scanner_sup.erl b/src/couch_scanner/src/couch_scanner_sup.erl new file mode 100644 index 00000000000..7baf0037a9d --- /dev/null +++ b/src/couch_scanner/src/couch_scanner_sup.erl @@ -0,0 +1,34 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_sup). + +-behaviour(supervisor). + +-export([ + start_link/0, + init/1 +]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + Children = [ + #{ + id => couch_scanner_server, + start => {couch_scanner_server, start_link, []}, + shutdown => 5000 + } + ], + SupFlags = #{strategy => rest_for_one, intensity => 25, period => 1}, + {ok, {SupFlags, Children}}. diff --git a/src/couch_scanner/test/eunit/couch_scanner_test.erl b/src/couch_scanner/test/eunit/couch_scanner_test.erl new file mode 100644 index 00000000000..5598271090c --- /dev/null +++ b/src/couch_scanner/test/eunit/couch_scanner_test.erl @@ -0,0 +1,34 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_scanner_test). + +-include_lib("couch/include/couch_eunit.hrl"). + +couch_scanner_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_scanner) + ] + }. + +setup() -> + test_util:start_couch(). + +teardown(Ctx) -> + test_util:stop_couch(Ctx). + +t_scanner(_Ctx) -> + ok.