Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rel/overlay/etc/default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ view_index_dir = {{view_index_dir}}
;
;time_seq_min_time = 1754006400

; Clustered index cleanup deduplication hold-off. How long to wait before
; running clean up per clustered db.
;index_cleanup_delay_msec = 30000

[bt_engine_cache]
; Memory used for btree engine cache. This is a cache for top levels of
; database btrees (id tree, seq tree) and a few terms from the db header. Value
Expand Down
3 changes: 2 additions & 1 deletion src/couch/src/couch_secondary_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ init([]) ->
{query_servers, {couch_proc_manager, start_link, []}},
{vhosts, {couch_httpd_vhost, start_link, []}},
{uuids, {couch_uuids, start, []}},
{disk_manager, {couch_disk_monitor, start_link, []}}
{disk_manager, {couch_disk_monitor, start_link, []}},
{couch_index_cleanup, {couch_index_cleanup, start_link, []}}
] ++ couch_index_servers(),

MaybeHttp =
Expand Down
98 changes: 98 additions & 0 deletions src/couch_index/src/couch_index_cleanup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_index_cleanup).
-behaviour(gen_server).

-export([
start_link/0,
schedule/1
]).

-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).

-export([
handle_db_event/3
]).

-define(DEFAULT_DELAY_MSEC, 30000).

-record(st, {
pending = #{} :: #{binary() => reference()}
}).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

schedule(DbName) when is_binary(DbName) ->
gen_server:cast(?MODULE, {schedule, DbName, fanout}).

init([]) ->
{ok, _} = couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
{ok, #st{}}.

handle_call(Msg, _From, #st{} = St) ->
{stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.

handle_cast({schedule, DbName, Mode}, #st{pending = Pending} = St) ->
case maps:is_key(DbName, Pending) of
true ->
{noreply, St};
false ->
case Mode of
fanout -> fanout(DbName);
no_fanout -> ok
end,
TRef = erlang:send_after(delay_msec(), self(), {run_cleanup, DbName}),
Comment thread
janl marked this conversation as resolved.
{noreply, St#st{pending = Pending#{DbName => TRef}}}
end;
handle_cast(Msg, St) ->
{stop, {invalid_cast, Msg}, St}.

handle_info({run_cleanup, DbName}, #st{pending = Pending} = St) ->
spawn(fun() ->
try
fabric:cleanup_index_files_this_node(DbName)
catch
Class:Reason:Stack ->
WArgs = [?MODULE, DbName, Class, Reason, Stack],
couch_log:warning("~p: cleanup ~s failed ~p:~p~n~p", WArgs)
end
end),
{noreply, St#st{pending = maps:remove(DbName, Pending)}};
handle_info(Msg, St) ->
{stop, {invalid_info, Msg}, St}.

handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, _DDocId}, St) ->
% Clustered dbs only
schedule(mem3:dbname(DbName)),
{ok, St};
handle_db_event(_DbName, _Event, St) ->
{ok, St}.

fanout(DbName) ->
try mem3:shards(DbName) of
Shards ->
Nodes = lists:usort([mem3:node(S) || S <- Shards]) -- [node()],
Args = {schedule, DbName, no_fanout},
lists:foreach(fun(N) -> gen_server:cast({?MODULE, N}, Args) end, Nodes)
catch
_:_ -> ok
end.

delay_msec() ->
config:get_integer("couchdb", "index_cleanup_delay_msec", ?DEFAULT_DELAY_MSEC).
68 changes: 23 additions & 45 deletions src/couch_index/src/couch_index_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1, openers/1]).
-export([aggregate_queue_len/0, names/0]).

% Cluster cleanup helpers (used by couch_mrview_cleanup)
-export([shard_entries/1, shard_index_pid/2, forget_ddoc_binding/3]).

% Exported for callbacks
-export([
handle_config_change/5,
Expand Down Expand Up @@ -358,51 +361,9 @@ handle_db_event(DbName, created, St) ->
handle_db_event(DbName, deleted, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
%% this handle_db_event function must not crash (or it takes down the couch_index_server)
try
DDocResult = couch_util:with_db(DbName, fun(Db) ->
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
end),
LocalShards = mem3:local_shards(mem3:dbname(DbName)),
DbShards = [mem3:name(Sh) || Sh <- LocalShards],
lists:foreach(
fun(DbShard) ->
lists:foreach(
fun({_DbShard, {_DDocId, Sig}}) ->
% check if there are other ddocs with the same Sig for the same db
SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
if
length(SigDDocs) > 1 ->
% remove records from by_db for this DDoc
Args = [DbShard, DDocId, Sig],
gen_server:cast(St#st.server_name, {rem_from_ets, Args});
true ->
% single DDoc with this Sig - close couch_index processes
case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(
IndexPid, {ddoc_updated, DDocResult}
));
[] ->
[]
end
end
end,
ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
)
end,
DbShards
),
{ok, St}
catch
Class:Reason:Stack ->
couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p, stack ~p", [
?MODULE, Class, DbName, Reason, Stack
]),
gen_server:cast(St#st.server_name, {rem_from_ets, [DbName, Reason]}),
{ok, St}
end;
handle_db_event(<<"shards/", _/binary>>, {ddoc_updated, _DDocId}, St) ->
%% Cluster dbs cleanup is handled by couch_index_cleanup
{ok, St};
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(
fun({_DbName, {_DDocId, Sig}}) ->
Expand Down Expand Up @@ -437,6 +398,23 @@ by_db(Arg) ->
openers(Arg) ->
name("couchdb_indexes_openers", Arg).

% Return {DDocId, Sig} entries for a shard. Used by cluster cleanup
shard_entries(ShardName) when is_binary(ShardName) ->
Rows = ets:match_object(by_db(ShardName), {ShardName, '_'}),
[Entry || {_ShardName, Entry} <- Rows].

% Return indexer Pid for {ShardName, Sig} or not_found
shard_index_pid(ShardName, Sig) when is_binary(ShardName) ->
case ets:lookup(by_sig(ShardName), {ShardName, Sig}) of
[{_, Pid}] when is_pid(Pid) -> {ok, Pid};
_ -> not_found
end.

% Remove {ShardName, {DDocId, Sig}} row from by_db. The indexer process is left
% as is. This is for removing one of the ddocs pointing to the same sig
forget_ddoc_binding(ShardName, DDocId, Sig) when is_binary(ShardName) ->
gen_server:cast(server_name(ShardName), {rem_from_ets, [ShardName, DDocId, Sig]}).

name(BaseName, Arg) when is_list(Arg) ->
name(BaseName, ?l2b(Arg));
name(BaseName, Arg) when is_binary(Arg) ->
Expand Down
178 changes: 0 additions & 178 deletions src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl

This file was deleted.

Loading