Permalink
Browse files

MB-6860: Fix spatial index file descriptors leaks

On the spatial indexes the file descriptors leaked, when a design
document was updated. The issue (and fix) is similar to the
Apache CouchDB issue COUCHDB-1309 [1].

In Couchbase the design document only lives in the master database,
hence the ddoc_updated event is only triggered for that database,
but we need to clean up the spatial indexes for all vBuckets.

The solution is to save not only a reference of the spatial view
group but also a reference to the database that contains the
design document. This way a cleanup on all vBuckets can be done.

[1] https://issues.apache.org/jira/browse/COUCHDB-1309

Change-Id: Ia9fc277f163a7adbce8a47b0a4212d6ec606858c
Reviewed-on: http://review.couchbase.org/21592
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Filipe David Borba Manana <fdmanana@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information...
1 parent 7a4cdc5 commit 9668209e09e447c560e7e59c1306c86de229f38f @vmx vmx committed with Farshid Ghods Oct 12, 2012
@@ -36,22 +36,74 @@ start_link() ->
init([]) ->
RootDir = couch_config:get("couchdb", "view_index_dir"),
- ets:new(couch_spatial_groups_by_db, [bag, private, named_table]),
+ ets:new(couch_spatial_groups_by_db, [bag, protected, named_table]),
ets:new(spatial_group_servers_by_sig, [set, protected, named_table]),
ets:new(couch_spatial_groups_by_updater, [set, private, named_table]),
+
+ couch_db_update_notifier:start_link(
+ fun({deleted, DbName}) ->
+ gen_server:cast(couch_spatial, {reset_indexes, DbName});
+ ({created, DbName}) ->
+ gen_server:cast(couch_spatial, {reset_indexes, DbName});
+ ({ddoc_updated, {DbName, #doc{id = DDocId} = DDoc}}) ->
+ % In Coucbase, the ddoc_updated event is only triggered on the the
+ % master database. This means that we need to keep track of all
+ % the spatial indexes on the vBucket databases. Hence the
+ % `couch_spatial_groups_by_db` ets table keeps track not only of
+ % the databases the spatial groups belong to, but also of the
+ % master database which contains the design document
+ % (`ForeignDbName`).
+ % In case there is no foreign database (non Couchbase spatial
+ % groups), the database the group belongs to, is used (as it
+ % contains the design document).
+ % When the ddoc_updated event is triggered, we loop through the
+ % `couch_spatial_groups_by_db` ets table matching the
+ % *databases that contain the design document* to get all the
+ % groups that belong to that one (which is one for every vBucket
+ % in Couchbase).
+ case ets:match_object(couch_spatial_groups_by_db,
+ {{'_', DbName}, {DDocId, '$1'}}) of
+ [] ->
+ ok;
+ Groups ->
+ lists:foreach(fun({{DbName1, _}, {_, Sig}}) ->
+ update_group(DbName1, DDoc, Sig)
+ end, Groups)
+ end;
+ (_Else) ->
+ ok
+ end),
process_flag(trap_exit, true),
+ ok = couch_file:init_delete_dir(RootDir),
{ok, #spatial{root_dir=RootDir}}.
-add_to_ets(Pid, DbName, Sig) ->
+update_group(DbName, DDoc, Sig) ->
+ case ets:lookup(spatial_group_servers_by_sig, {DbName, Sig}) of
+ [{_, GroupPid}] ->
+ NewSig = case DDoc#doc.deleted of
+ true ->
+ <<>>;
+ false ->
+ DDoc2 = couch_doc:with_ejson_body(DDoc),
+ list_to_binary(couch_spatial_group:get_signature(DDoc2))
+ end,
+ (catch gen_server:cast(GroupPid, {ddoc_updated, NewSig}));
+ [] ->
+ ok
+ end.
+
+add_to_ets(Pid, DbName, ForeignDbName, DDocId, Sig) ->
true = ets:insert(couch_spatial_groups_by_updater, {Pid, {DbName, Sig}}),
true = ets:insert(spatial_group_servers_by_sig, {{DbName, Sig}, Pid}),
- true = ets:insert(couch_spatial_groups_by_db, {DbName, Sig}).
+ true = ets:insert(
+ couch_spatial_groups_by_db, {{DbName, ForeignDbName}, {DDocId, Sig}}).
-delete_from_ets(Pid, DbName, Sig) ->
+delete_from_ets(Pid, DbName, ForeignDbName, DDocId, Sig) ->
true = ets:delete(couch_spatial_groups_by_updater, Pid),
true = ets:delete(spatial_group_servers_by_sig, {DbName, Sig}),
- true = ets:delete_object(couch_spatial_groups_by_db, {DbName, Sig}).
+ true = ets:delete_object(
+ couch_spatial_groups_by_db, {{DbName, ForeignDbName}, {DDocId, Sig}}).
% For foreign Design Documents (stored in a different DB)
get_group_server({DbName, GroupDbName}, GroupId) when is_binary(GroupId) ->
@@ -150,29 +202,28 @@ cleanup_index_files(Db) ->
DeleteFiles).
delete_index_dir(RootDir, DbName) ->
- couch_view:nuke_dir(RootDir ++ "/." ++ ?b2l(DbName) ++ "_design").
+ geocouch_duplicates:nuke_dir(
+ RootDir, RootDir ++ "/." ++ ?b2l(DbName) ++ "_design").
list_index_files(Db) ->
% call server to fetch the index files
RootDir = couch_config:get("couchdb", "view_index_dir"),
filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++
"_design"++"/*.spatial").
-% XXX NOTE vmx: I don't know when this case happens
do_reset_indexes(DbName, Root) ->
% shutdown all the updaters and clear the files, the db got changed
- Names = ets:lookup(couch_spatial_groups_by_db, DbName),
+ Names = ets:lookup(couch_spatial_groups_by_db, {DbName, '_'}),
lists:foreach(
- fun({_DbName, Sig}) ->
+ fun({{_DbName, ForeignDbName}, {DDocId, Sig}}) ->
?LOG_DEBUG("Killing update process for spatial group ~s. in database ~s.", [Sig, DbName]),
[{_, Pid}] = ets:lookup(spatial_group_servers_by_sig, {DbName, Sig}),
- exit(Pid, kill),
- receive {'EXIT', Pid, _} ->
- delete_from_ets(Pid, DbName, Sig)
- end
+ couch_util:shutdown_sync(Pid),
+ delete_from_ets(Pid, DbName, ForeignDbName, DDocId, Sig)
end, Names),
delete_index_dir(Root, DbName),
- file:delete(Root ++ "/." ++ ?b2l(DbName) ++ "_temp").
+ RootDelDir = couch_config:get("couchdb", "view_index_dir"),
+ couch_file:delete(RootDelDir, Root ++ "/." ++ ?b2l(DbName) ++ "_temp").
% counterpart in couch_view is get_map_view/4
get_spatial_index(Db, GroupId, Name, Stale) ->
@@ -201,36 +252,47 @@ get_spatial_index0(Name, [#spatial{index_names=IndexNames}=Index|Rest]) ->
terminate(_Reason, _Srv) ->
ok.
-handle_call({get_group_server, DbName,
- #spatial_group{name=GroupId,sig=Sig}=Group}, _From,
- #spatial{root_dir=Root}=Server) ->
+handle_call({get_group_server, DbName, #spatial_group{sig=Sig} = Group}, From,
+ #spatial{root_dir=Root} = Server) ->
case ets:lookup(spatial_group_servers_by_sig, {DbName, Sig}) of
[] ->
- ?LOG_DEBUG("Spawning new group server for spatial group ~s in database ~s.",
- [GroupId, DbName]),
- case (catch couch_spatial_group:start_link({Root, DbName, Group})) of
- {ok, NewPid} ->
- add_to_ets(NewPid, DbName, Sig),
- {reply, {ok, NewPid}, Server};
- {error, invalid_view_seq} ->
- do_reset_indexes(DbName, Root),
- case (catch couch_spatial_group:start_link({Root, DbName, Group})) of
- {ok, NewPid} ->
- add_to_ets(NewPid, DbName, Sig),
- {reply, {ok, NewPid}, Server};
- Error ->
- {reply, Error, Server}
- end;
- Error ->
- {reply, Error, Server}
- end;
+ spawn_monitor(fun() -> new_group(Root, DbName, Group) end),
+ ets:insert(spatial_group_servers_by_sig, {{DbName, Sig}, [From]}),
+ {noreply, Server};
+ [{_, WaitList}] when is_list(WaitList) ->
+ ets:insert(spatial_group_servers_by_sig, {{DbName, Sig}, [From | WaitList]}),
+ {noreply, Server};
[{_, ExistingPid}] ->
{reply, {ok, ExistingPid}, Server}
- end.
+ end;
+handle_call({reset_indexes, DbName}, _From, #spatial{root_dir=Root}=Server) ->
+ do_reset_indexes(DbName, Root),
+ {reply, ok, Server}.
+
+handle_cast({reset_indexes, DbName}, #spatial{root_dir=Root}=Server) ->
+ do_reset_indexes(DbName, Root),
+ {noreply, Server}.
+
+new_group(Root, DbName, Group) ->
+ #spatial_group{
+ name = GroupId,
+ sig = Sig,
+ dbname = ForeignDbName
+ } = Group,
+ ?LOG_DEBUG("Spawning new group server for spatial group ~s in database ~s.",
+ [GroupId, DbName]),
+ case (catch couch_spatial_group:start_link({Root, DbName, Group})) of
+ {ok, NewPid} ->
+ unlink(NewPid),
+ exit({DbName, ForeignDbName, GroupId, Sig, {ok, NewPid}});
+ {error, invalid_view_seq} ->
+ ok = gen_server:call(couch_spatial, {reset_indexes, DbName}),
+ new_group(Root, DbName, Group);
+ Error ->
+ exit({DbName, ForeignDbName, GroupId, Sig, Error})
+ end.
-handle_cast(foo,State) ->
- {noreply, State}.
% Cleanup on exit, e.g. resetting the group information stored in ETS tables
handle_info({'EXIT', FromPid, Reason}, Server) ->
@@ -242,12 +304,20 @@ handle_info({'EXIT', FromPid, Reason}, Server) ->
exit(Reason);
true -> ok
end;
- [{_, {DbName, GroupId}}] ->
- delete_from_ets(FromPid, DbName, GroupId)
+ [{_, {DbName, Sig}}] ->
+ [{{DbName, ForeignDbName}, {DDocId, Sig}}] = ets:match_object(
+ couch_spatial_groups_by_db, {{DbName, '_'}, {'$1', Sig}}),
+ delete_from_ets(FromPid, DbName, ForeignDbName, DDocId, Sig)
end,
{noreply, Server};
-handle_info(_Msg, Server) ->
+handle_info({'DOWN', _, _, _, {DbName, ForeignDbName, DDocId, Sig, Reply}}, Server) ->
+ [{_, WaitList}] = ets:lookup(spatial_group_servers_by_sig, {DbName, Sig}),
+ [gen_server:reply(From, Reply) || From <- WaitList],
+ case Reply of {ok, NewPid} ->
+ link(NewPid),
+ add_to_ets(NewPid, DbName, ForeignDbName, DDocId, Sig);
+ _ -> ok end,
{noreply, Server}.
code_change(_OldVsn, State, _Extra) ->
@@ -18,6 +18,9 @@
% XXX vmx 2011-11-30: CouchDB doesn't need 'db' any more.
% Check if/why GeoCouch still needs it.
db=nil,
+ % Store the name of the database where the design doc came
+ % from. This is used to determine if a foreign design doc was used
+ dbname = nil,
fd=nil,
name, % design document ID
def_lang,
@@ -35,7 +35,8 @@
compactor_pid=nil,
waiting_commit=false,
waiting_list=[],
- ref_counter=nil
+ ref_counter=nil,
+ shutdown=false
}).
@@ -244,8 +245,31 @@ handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
true -> ok
end,
{noreply, State#group_state{group=NewGroup, waiting_commit=true}};
-handle_cast(_Msg, State) ->
- {noreply, State}.
+
+handle_cast({ddoc_updated, NewSig}, State) ->
+ #group_state{
+ waiting_list = Waiters,
+ group = #spatial_group{sig = CurSig} = Group
+ } = State,
+ ?LOG_INFO("Spatial view group `~s`, signature `~s', "
+ "design document was updated~n"
+ " new signature: ~s~n"
+ " shutdown flag: ~s~n"
+ " waiting clients: ~p~n",
+ [Group#spatial_group.name, couch_util:to_hex(?b2l(CurSig)),
+ couch_util:to_hex(?b2l(NewSig)), State#group_state.shutdown,
+ length(Waiters)]),
+ case NewSig of
+ CurSig ->
+ {noreply, State#group_state{shutdown = false}};
+ _ ->
+ case Waiters of
+ [] ->
+ {stop, normal, State};
+ _ ->
+ {noreply, State#group_state{shutdown = true}}
+ end
+ end.
handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{ok, Db} = couch_db:open_int(DbName, []),
@@ -272,6 +296,7 @@ handle_info({'EXIT', FromPid, {new_group, #spatial_group{db=Db}=Group}},
updater_pid=UpPid,
ref_counter=RefCounter,
waiting_list=WaitList,
+ shutdown=Shutdown,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
ok = couch_db:close(Db),
if not WaitingCommit ->
@@ -280,8 +305,13 @@ handle_info({'EXIT', FromPid, {new_group, #spatial_group{db=Db}=Group}},
end,
case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
- {noreply, State#group_state{waiting_commit=true, waiting_list=[],
- group=Group#spatial_group{db=nil}, updater_pid=nil}};
+ case Shutdown of
+ true ->
+ {stop, normal, State};
+ false ->
+ {noreply, State#group_state{waiting_commit=true, waiting_list=[],
+ group=Group#spatial_group{db=nil}, updater_pid=nil}}
+ end;
StillWaiting ->
% we still have some waiters, reopen the database and reupdate the index
{ok, Db2} = couch_db:open_int(DbName, []),
@@ -344,7 +374,8 @@ open_db_group(DbName, GroupId) ->
case couch_db:open_doc(Db, GroupId, [ejson_body]) of
{ok, Doc} ->
couch_db:close(Db),
- {ok, design_doc_to_spatial_group(Doc)};
+ DbGroup = design_doc_to_spatial_group(Doc),
+ {ok, DbGroup#spatial_group{dbname=DbName}};
Else ->
couch_db:close(Db),
Else
@@ -453,7 +484,7 @@ get_index_header_data(#spatial_group{current_seq=Seq, purge_seq=PurgeSeq,
}.
delete_index_file(RootDir, DbName, GroupSig) ->
- file:delete(index_file_name(RootDir, DbName, GroupSig)).
+ couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)).
index_file_name(RootDir, DbName, GroupSig) ->
couch_view_group:design_root(RootDir, DbName) ++
@@ -18,7 +18,8 @@
-include("couch_db.hrl").
-export([start_list_resp/6, send_non_empty_chunk/2, sort_lib/1,
- make_arity_3_fun/1, parse_int_param/1, parse_positive_int_param/1]).
+ make_arity_3_fun/1, parse_int_param/1, parse_positive_int_param/1,
+ nuke_dir/2]).
% From couch_httpd_show
start_list_resp(QServer, LName, Req, Db, Head, Etag) ->
@@ -104,3 +105,28 @@ parse_positive_int_param(Val) ->
Msg = io_lib:format(Fmt, [Val]),
throw({query_parse_error, ?l2b(Msg)})
end.
+
+% From couch_view
+nuke_dir(RootDelDir, Dir) ->
+ case file:list_dir(Dir) of
+ {error, enoent} -> ok; % doesn't exist
+ {ok, Files} ->
+ lists:foreach(
+ fun(File)->
+ Full = Dir ++ "/" ++ File,
+ case couch_file:delete(RootDelDir, Full, false) of
+ ok -> ok;
+ % Directory doesn't exist
+ {error, enoent} -> ok;
+ {error, eperm} ->
+ ok = nuke_dir(RootDelDir, Full)
+ end
+ end,
+ Files),
+ case file:del_dir(Dir) of
+ ok -> ok;
+ % Directory doesn't exist (might have been deleted by some other
+ % process already)
+ {error, enoent} -> ok
+ end
+ end.

0 comments on commit 9668209

Please sign in to comment.