Skip to content


Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: ce7204b7eb
Fetching contributors…

Cannot retrieve contributors at this time

207 lines (167 sloc) 6.8 kb
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-export([start_link/0, get_index/4, get_index/3, get_index/2]).
-export([config_change/2, update_notify/1]).
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
-define(BY_SIG, couchdb_indexes_by_sig).
-define(BY_PID, couchdb_indexes_by_pid).
-define(BY_DB, couchdb_indexes_by_db).
-record(st, {root_dir}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_index(Module, DbName, DDoc) ->
get_index(Module, DbName, DDoc, nil).
get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) ->
couch_util:with_db(DbName, fun(Db) ->
get_index(Module, Db, DDoc, Fun)
get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
case couch_db:open_doc(Db, DDoc, [ejson_body]) of
{ok, Doc} -> get_index(Module, Db, Doc, Fun);
Error -> Error
get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
{ok, InitState} = Module:init(Db, DDoc),
{ok, FunResp} = Fun(InitState),
{ok, Pid} = get_index(Module, InitState),
{ok, Pid, FunResp};
get_index(Module, Db, DDoc, _Fun) ->
{ok, InitState} = Module:init(Db, DDoc),
get_index(Module, InitState).
get_index(Module, IdxState) ->
DbName = Module:get(db_name, IdxState),
Sig = Module:get(signature, IdxState),
case ets:lookup(?BY_SIG, {DbName, Sig}) of
[{_, Pid}] when is_pid(Pid) ->
{ok, Pid};
_ ->
Args = {Module, IdxState, DbName, Sig},
gen_server:call(?MODULE, {get_index, Args}, infinity)
init([]) ->
process_flag(trap_exit, true),
couch_config:register(fun ?MODULE:config_change/2),
ets:new(?BY_SIG, [protected, set, named_table]),
ets:new(?BY_PID, [private, set, named_table]),
ets:new(?BY_DB, [protected, bag, named_table]),
couch_db_update_notifier:start_link(fun ?MODULE:update_notify/1),
RootDir = couch_index_util:root_dir(),
% Deprecation warning if it wasn't index_dir
case couch_config:get("couchdb", "index_dir") of
undefined ->
Msg = "Deprecation warning: 'view_index_dir' is now 'index_dir'",
?LOG_ERROR(Msg, []);
_ -> ok
{ok, #st{root_dir=RootDir}}.
terminate(_Reason, _State) ->
Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
lists:map(fun couch_util:shutdown_sync/1, Pids),
handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
case ets:lookup(?BY_SIG, {DbName, Sig}) of
[] ->
spawn_link(fun() -> new_index(Args) end),
ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
{noreply, State};
[{_, Waiters}] when is_list(Waiters) ->
ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
{noreply, State};
[{_, Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
[{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
add_to_ets(DbName, Sig, DDocId, Pid),
{reply, ok, State};
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
[{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
ets:delete(?BY_SIG, {DbName, Sig}),
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
reset_indexes(DbName, State#st.root_dir),
{reply, ok, State}.
handle_cast({reset_indexes, DbName}, State) ->
reset_indexes(DbName, State#st.root_dir),
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, Server) ->
case ets:lookup(?BY_PID, Pid) of
[{Pid, {DbName, Sig}}] ->
[{DbName, {DDocId, Sig}}] =
ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
rem_from_ets(DbName, Sig, DDocId, Pid);
[] when Reason /= normal ->
_Else ->
{noreply, Server}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
new_index({Mod, IdxState, DbName, Sig}) ->
DDocId = Mod:get(idx_name, IdxState),
case couch_index:start_link({Mod, IdxState}) of
{ok, Pid} ->
ok = gen_server:call(
?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}),
Error ->
ok = gen_server:call(
?MODULE, {async_error, {DbName, DDocId, Sig}, Error})
reset_indexes(DbName, Root) ->
% shutdown all the updaters and clear the files, the db got changed
Fun = fun({_, {DDocId, Sig}}) ->
[{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
rem_from_ets(DbName, Sig, DDocId, Pid)
lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
Path = couch_index_util:index_dir("", DbName),
couch_file:nuke_dir(Root, Path).
add_to_ets(DbName, Sig, DDocId, Pid) ->
ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
rem_from_ets(DbName, Sig, DDocId, Pid) ->
ets:delete(?BY_SIG, {DbName, Sig}),
ets:delete(?BY_PID, Pid),
ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
config_change("couchdb", "view_index_dir") ->
exit(whereis(?MODULE), config_change);
config_change("couchdb", "index_dir") ->
exit(whereis(?MODULE), config_change).
update_notify({deleted, DbName}) ->
gen_server:cast(?MODULE, {reset_indexes, DbName});
update_notify({created, DbName}) ->
gen_server:cast(?MODULE, {reset_indexes, DbName});
update_notify({ddoc_updated, {DbName, DDocId}}) ->
fun({_DbName, {_DDocId, Sig}}) ->
case ets:lookup(?BY_SIG, {DbName, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(IndexPid, ddoc_updated));
[] ->
ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}}));
update_notify(_) ->
Jump to Line
Something went wrong with that request. Please try again.