Permalink
Browse files

More efficient communication with the view server

This change makes the communication between the Erlang VM and
an external view server (couchjs for e.g.) more efficient by
writing a series of commands into the port and reading all the
responses from the external view server after doing all those
writes. This minimizes the amount of time each endpoint spends
blocked reading from the port.

COUCHDB-1334
  • Loading branch information...
1 parent 7299264 commit a851c6e5150d14221ca018587d76214856c1555a @fdmanana fdmanana committed Nov 6, 2011
@@ -130,32 +130,42 @@ map_docs(Parent, State0) ->
couch_query_servers:stop_doc_map(State0#mrst.qserver),
couch_work_queue:close(State0#mrst.write_queue);
{ok, Dequeued} ->
- % Run all the non deleted docs through the view engine and
- % then pass the results on to the writer process.
State1 = case State0#mrst.qserver of
nil -> start_query_server(State0);
_ -> State0
end,
- QServer = State1#mrst.qserver,
- DocFun = fun
- ({nil, Seq, _}, {SeqAcc, Results}) ->
- {erlang:max(Seq, SeqAcc), Results};
- ({Id, Seq, deleted}, {SeqAcc, Results}) ->
- {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
- ({Id, Seq, Doc}, {SeqAcc, Results}) ->
- {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
- {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
- end,
- FoldFun = fun(Docs, Acc) ->
- update_task(length(Docs)),
- lists:foldl(DocFun, Acc, Docs)
- end,
- Results = lists:foldl(FoldFun, {0, []}, Dequeued),
- couch_work_queue:queue(State1#mrst.write_queue, Results),
+ {ok, MapResults} = compute_map_results(State1, Dequeued),
+ couch_work_queue:queue(State1#mrst.write_queue, MapResults),
map_docs(Parent, State1)
end.
+compute_map_results(#mrst{qserver = Qs}, Dequeued) ->
+ % Run all the non deleted docs through the view engine and
+ % then pass the results on to the writer process.
+ DocFun = fun
+ ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) ->
+ {erlang:max(Seq, SeqAcc), AccDel, AccNotDel};
+ ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) ->
+ {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel};
+ ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) ->
+ {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]}
+ end,
+ FoldFun = fun(Docs, Acc) ->
+ lists:foldl(DocFun, Acc, Docs)
+ end,
+ {MaxSeq, DeletedResults, Docs} =
+ lists:foldl(FoldFun, {0, [], []}, Dequeued),
+ {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs),
+ NotDeletedResults = lists:zipwith(
+ fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end,
+ Docs,
+ MapResultList),
+ AllMapResults = DeletedResults ++ NotDeletedResults,
+ update_task(length(AllMapResults)),
+ {ok, {MaxSeq, AllMapResults}}.
+
+
write_results(Parent, State) ->
case couch_work_queue:dequeue(State#mrst.write_queue) of
closed ->
@@ -42,7 +42,7 @@
-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
handle_info/2]).
--export([set_timeout/2, prompt/2]).
+-export([set_timeout/2, prompt/2, prompt_many/2]).
-define(STATE, native_proc_state).
-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
@@ -62,6 +62,15 @@ set_timeout(Pid, TimeOut) ->
prompt(Pid, Data) when is_list(Data) ->
gen_server:call(Pid, {prompt, Data}).
+prompt_many(Pid, DataList) ->
+ prompt_many(Pid, DataList, []).
+
+prompt_many(_Pid, [], Acc) ->
+ {ok, lists:reverse(Acc)};
+prompt_many(Pid, [Data | Rest], Acc) ->
+ Result = prompt(Pid, Data),
+ prompt_many(Pid, Rest, [Result | Acc]).
+
% gen_server callbacks
init([]) ->
{ok, #evstate{ddocs=dict:new()}}.
@@ -14,7 +14,7 @@
-behaviour(gen_server).
-export([start_link/1, start_link/2, start_link/3, stop/1]).
--export([set_timeout/2, prompt/2]).
+-export([set_timeout/2, prompt/2, prompt_many/2]).
-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
@@ -57,6 +57,40 @@ prompt(Pid, Data) ->
throw(Error)
end.
+prompt_many(Pid, DataList) ->
+ OsProc = gen_server:call(Pid, get_os_proc, infinity),
+ true = port_connect(OsProc#os_proc.port, self()),
+ try
+ send_many(OsProc, DataList),
+ receive_many(length(DataList), OsProc, [])
+ after
+ % Can throw badarg error, when OsProc Pid is dead or port was closed
+ % by the readline function on error/timeout.
+ (catch port_connect(OsProc#os_proc.port, Pid)),
+ unlink(OsProc#os_proc.port),
+ drop_port_messages(OsProc#os_proc.port)
+ end.
+
+send_many(_OsProc, []) ->
+ ok;
+send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
+ Writer(OsProc, Data),
+ send_many(OsProc, Rest).
+
+receive_many(0, _OsProc, Acc) ->
+ {ok, lists:reverse(Acc)};
+receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
+ Line = Reader(OsProc),
+ receive_many(N - 1, OsProc, [Line | Acc]).
+
+drop_port_messages(Port) ->
+ receive
+ {Port, _} ->
+ drop_port_messages(Port)
+ after 0 ->
+ ok
+ end.
+
% Utility functions for reading and writing
% in custom functions
writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
@@ -175,6 +209,8 @@ terminate(_Reason, #os_proc{port=Port}) ->
catch port_close(Port),
ok.
+handle_call(get_os_proc, _From, OsProc) ->
+ {reply, OsProc, OsProc};
handle_call({set_timeout, TimeOut}, _From, OsProc) ->
{reply, ok, OsProc#os_proc{timeout=TimeOut}};
handle_call({prompt, Data}, _From, OsProc) ->
@@ -16,7 +16,7 @@
-export([start_link/0, config_change/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
--export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]).
-export([reduce/3, rereduce/3,validate_doc_update/5]).
-export([filter_docs/5]).
-export([filter_view/3]).
@@ -33,6 +33,7 @@
lang,
ddoc_keys = [],
prompt_fun,
+ prompt_many_fun,
set_timeout_fun,
stop_fun
}).
@@ -83,10 +84,15 @@ map_docs(Proc, Docs) ->
Docs),
{ok, Results}.
-map_doc_raw(Proc, Doc) ->
- Json = couch_doc:to_json_obj(Doc, []),
- {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
-
+map_docs_raw(Proc, DocList) ->
+ {Mod, Fun} = Proc#proc.prompt_many_fun,
+ CommandList = lists:map(
+ fun(Doc) ->
+ EJson = couch_doc:to_json_obj(Doc, []),
+ [<<"map_doc">>, EJson]
+ end,
+ DocList),
+ Mod:Fun(Proc#proc.pid, CommandList).
stop_doc_map(nil) ->
ok;
@@ -479,6 +485,7 @@ new_process(Langs, LangLimits, Lang) ->
pid=Pid,
% Called via proc_prompt, proc_set_timeout, and proc_stop
prompt_fun={Mod, prompt},
+ prompt_many_fun={Mod, prompt_many},
set_timeout_fun={Mod, set_timeout},
stop_fun={Mod, stop}}};
_ ->

0 comments on commit a851c6e

Please sign in to comment.