Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

264 lines (230 sloc) 8.478 kb
-module(couch_proc_manager).
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]).
-include("couch_db.hrl").
-record(state, {tab}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_proc_count() ->
gen_server:call(?MODULE, get_proc_count).
init([]) ->
process_flag(trap_exit, true),
{ok, #state{tab = ets:new(procs, [ordered_set, {keypos, #proc.pid}])}}.
handle_call(get_table, _From, State) ->
{reply, State#state.tab, State};
handle_call(get_proc_count, _From, State) ->
{reply, ets:info(State#state.tab, size), State};
handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
{Client, _} = From,
Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
IterFun = fun(Proc, Acc) ->
case lists:member(DDocKey, Proc#proc.ddoc_keys) of
true ->
{stop, assign_proc(State#state.tab, Client, Proc)};
false ->
{ok, Acc}
end
end,
TeachFun = fun(Proc0, Acc) ->
try
{ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
{stop, assign_proc(State#state.tab, Client, Proc1)}
catch _:_ ->
{ok, Acc}
end
end,
try iter_procs(State#state.tab, Lang, IterFun, nil) of
{not_found, _} ->
case iter_procs(State#state.tab, Lang, TeachFun, nil) of
{not_found, _} ->
spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]),
{noreply, State};
{ok, Proc} ->
{reply, {ok, Proc, get_query_server_config()}, State}
end;
{ok, Proc} ->
{reply, {ok, Proc, get_query_server_config()}, State}
catch error:Reason ->
?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
{reply, {error, Reason}, State}
end;
handle_call({get_proc, Lang}, {Client, _} = From, State) ->
IterFun = fun(Proc, _Acc) ->
{stop, assign_proc(State#state.tab, Client, Proc)}
end,
try iter_procs(State#state.tab, Lang, IterFun, nil) of
{not_found, _} ->
spawn_link(?MODULE, new_proc, [From, Lang]),
{noreply, State};
{ok, Proc} ->
{reply, {ok, Proc, get_query_server_config()}, State}
catch error:Reason ->
?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
{reply, {error, Reason}, State}
end;
handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
erlang:demonitor(Ref, [flush]),
% We need to check if the process is alive here, as the client could be
% handing us a #proc{} with a dead one. We would have already removed the
% #proc{} from our own table, so the alternative is to do a lookup in the
% table before the insert. Don't know which approach is cheaper.
return_proc(State#state.tab, Proc),
{reply, true, State};
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) ->
Limit = couch_config:get("query_server_config", "os_process_soft_limit", "100"),
case ets:info(Tab, size) > list_to_integer(Limit) of
true ->
ets:delete(Tab, Pid),
case is_process_alive(Pid) of
true ->
unlink(Pid),
gen_server:cast(Pid, stop);
_ ->
ok
end;
_ ->
ok
end,
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) ->
link(Proc0#proc.pid),
Proc = assign_proc(State#state.tab, Client, Proc0),
gen_server:reply(From, {ok, Proc, get_query_server_config()}),
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State) ->
?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
ets:delete(State#state.tab, Pid),
{noreply, State};
handle_info({'DOWN', Ref, _, _, _Reason}, State) ->
case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of
[] ->
ok;
[#proc{} = Proc] ->
return_proc(State#state.tab, Proc)
end,
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
terminate(_Reason, #state{tab=Tab}) ->
ets:foldl(fun(#proc{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
iter_procs(Tab, Lang, Fun, Acc) when is_binary(Lang) ->
iter_procs(Tab, binary_to_list(Lang), Fun, Acc);
iter_procs(Tab, Lang, Fun, Acc) ->
Pattern = #proc{lang=Lang, client=nil, _='_'},
MSpec = [{Pattern, [], ['$_']}],
case ets:select_reverse(Tab, MSpec, 25) of
'$end_of_table' ->
{not_found, Acc};
Continuation ->
iter_procs(Continuation, Fun, Acc)
end.
iter_procs({[], Continuation0}, Fun, Acc) ->
case ets:select_reverse(Continuation0) of
'$end_of_table' ->
{not_found, Acc};
Continuation1 ->
iter_procs(Continuation1, Fun, Acc)
end;
iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) ->
case Fun(Proc, Acc0) of
{ok, Acc1} ->
iter_procs({Rest, Continuation}, Fun, Acc1);
{stop, Acc1} ->
{ok, Acc1}
end.
new_proc(From, Lang) ->
case new_proc_int(From, Lang) of
{ok, Proc} ->
exit({ok, Proc, From});
Error ->
gen_server:reply(From, {error, Error})
end.
new_proc(From, Lang, DDoc, DDocKey) ->
case new_proc_int(From, Lang) of
{ok, NewProc} ->
case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
{ok, Proc} ->
exit({ok, Proc, From});
{error, Reason} ->
gen_server:reply(From, {error, Reason})
end;
Error ->
gen_server:reply(From, {error, Error})
end.
new_proc_int(From, Lang) when is_binary(Lang) ->
new_proc_int(From, binary_to_list(Lang));
new_proc_int(From, Lang) when is_list(Lang) ->
case couch_config:get("query_servers", Lang) of
undefined ->
case couch_config:get("native_query_servers", Lang) of
undefined ->
gen_server:reply(From, {unknown_query_language, Lang});
SpecStr ->
{ok, {M,F,A}} = couch_util:parse_term(SpecStr),
{ok, Pid} = apply(M, F, A),
make_proc(Pid, Lang, M)
end;
Command ->
{ok, Pid} = couch_os_process:start_link(Command),
make_proc(Pid, Lang, couch_os_process)
end.
make_proc(Pid, Lang, Mod) ->
Proc = #proc{
lang = Lang,
pid = Pid,
prompt_fun = {Mod, prompt},
set_timeout_fun = {Mod, set_timeout},
stop_fun = {Mod, stop}
},
unlink(Pid),
{ok, Proc}.
assign_proc(Tab, Client, #proc{client=nil}=Proc0) ->
Proc = Proc0#proc{client = erlang:monitor(process, Client)},
ets:insert(Tab, Proc),
Proc.
return_proc(Tab, #proc{pid=Pid} = Proc) ->
case is_process_alive(Pid) of true ->
gen_server:cast(Pid, garbage_collect),
ets:insert(Tab, Proc#proc{client=nil});
false ->
ets:delete(Tab, Pid)
end.
get_query_server_config() ->
Limit = couch_config:get("query_server_config", "reduce_limit", "true"),
{[{<<"reduce_limit">>, list_to_atom(Limit)}]}.
proc_with_ddoc(DDoc, DDocKey, Procs) ->
Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
case lists:dropwhile(Filter, Procs) of
[DDocProc|_] ->
{ok, DDocProc};
[] ->
teach_any_proc(DDoc, DDocKey, Procs)
end.
teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
try
teach_ddoc(DDoc, DDocKey, Proc)
catch _:_ ->
teach_any_proc(DDoc, DDocKey, Rest)
end;
teach_any_proc(_, _, []) ->
{error, noproc}.
teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
% send ddoc over the wire
% we only share the rev with the client we know to update code
% but it only keeps the latest copy, per each ddoc, around.
true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
DDocId, couch_doc:to_json_obj(DDoc, [])]),
% we should remove any other ddocs keys for this docid
% because the query server overwrites without the rev
Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
% add ddoc to the proc
{ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
Jump to Line
Something went wrong with that request. Please try again.