Skip to content

Commit a851c6e

Browse files
committed
More efficient communication with the view server
This change makes the communication between the Erlang VM and an external view server (couchjs for e.g.) more efficient by writing a series of commands into the port and reading all the responses from the external view server after doing all those writes. This minimizes the amount of time each endpoint spends blocked reading from the port. COUCHDB-1334
1 parent 7299264 commit a851c6e

File tree

4 files changed

+87
-25
lines changed

4 files changed

+87
-25
lines changed

src/couch_mrview/src/couch_mrview_updater.erl

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -130,32 +130,42 @@ map_docs(Parent, State0) ->
130130
couch_query_servers:stop_doc_map(State0#mrst.qserver),
131131
couch_work_queue:close(State0#mrst.write_queue);
132132
{ok, Dequeued} ->
133-
% Run all the non deleted docs through the view engine and
134-
% then pass the results on to the writer process.
135133
State1 = case State0#mrst.qserver of
136134
nil -> start_query_server(State0);
137135
_ -> State0
138136
end,
139-
QServer = State1#mrst.qserver,
140-
DocFun = fun
141-
({nil, Seq, _}, {SeqAcc, Results}) ->
142-
{erlang:max(Seq, SeqAcc), Results};
143-
({Id, Seq, deleted}, {SeqAcc, Results}) ->
144-
{erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
145-
({Id, Seq, Doc}, {SeqAcc, Results}) ->
146-
{ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
147-
{erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
148-
end,
149-
FoldFun = fun(Docs, Acc) ->
150-
update_task(length(Docs)),
151-
lists:foldl(DocFun, Acc, Docs)
152-
end,
153-
Results = lists:foldl(FoldFun, {0, []}, Dequeued),
154-
couch_work_queue:queue(State1#mrst.write_queue, Results),
137+
{ok, MapResults} = compute_map_results(State1, Dequeued),
138+
couch_work_queue:queue(State1#mrst.write_queue, MapResults),
155139
map_docs(Parent, State1)
156140
end.
157141

158142

143+
compute_map_results(#mrst{qserver = Qs}, Dequeued) ->
144+
% Run all the non deleted docs through the view engine and
145+
% then pass the results on to the writer process.
146+
DocFun = fun
147+
({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) ->
148+
{erlang:max(Seq, SeqAcc), AccDel, AccNotDel};
149+
({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) ->
150+
{erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel};
151+
({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) ->
152+
{erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]}
153+
end,
154+
FoldFun = fun(Docs, Acc) ->
155+
lists:foldl(DocFun, Acc, Docs)
156+
end,
157+
{MaxSeq, DeletedResults, Docs} =
158+
lists:foldl(FoldFun, {0, [], []}, Dequeued),
159+
{ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs),
160+
NotDeletedResults = lists:zipwith(
161+
fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end,
162+
Docs,
163+
MapResultList),
164+
AllMapResults = DeletedResults ++ NotDeletedResults,
165+
update_task(length(AllMapResults)),
166+
{ok, {MaxSeq, AllMapResults}}.
167+
168+
159169
write_results(Parent, State) ->
160170
case couch_work_queue:dequeue(State#mrst.write_queue) of
161171
closed ->

src/couchdb/couch_native_process.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
4444
handle_info/2]).
45-
-export([set_timeout/2, prompt/2]).
45+
-export([set_timeout/2, prompt/2, prompt_many/2]).
4646

4747
-define(STATE, native_proc_state).
4848
-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
@@ -62,6 +62,15 @@ set_timeout(Pid, TimeOut) ->
6262
prompt(Pid, Data) when is_list(Data) ->
6363
gen_server:call(Pid, {prompt, Data}).
6464

65+
prompt_many(Pid, DataList) ->
66+
prompt_many(Pid, DataList, []).
67+
68+
prompt_many(_Pid, [], Acc) ->
69+
{ok, lists:reverse(Acc)};
70+
prompt_many(Pid, [Data | Rest], Acc) ->
71+
Result = prompt(Pid, Data),
72+
prompt_many(Pid, Rest, [Result | Acc]).
73+
6574
% gen_server callbacks
6675
init([]) ->
6776
{ok, #evstate{ddocs=dict:new()}}.

src/couchdb/couch_os_process.erl

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
-behaviour(gen_server).
1515

1616
-export([start_link/1, start_link/2, start_link/3, stop/1]).
17-
-export([set_timeout/2, prompt/2]).
17+
-export([set_timeout/2, prompt/2, prompt_many/2]).
1818
-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
1919
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
2020

@@ -57,6 +57,40 @@ prompt(Pid, Data) ->
5757
throw(Error)
5858
end.
5959

60+
prompt_many(Pid, DataList) ->
61+
OsProc = gen_server:call(Pid, get_os_proc, infinity),
62+
true = port_connect(OsProc#os_proc.port, self()),
63+
try
64+
send_many(OsProc, DataList),
65+
receive_many(length(DataList), OsProc, [])
66+
after
67+
% Can throw badarg error, when OsProc Pid is dead or port was closed
68+
% by the readline function on error/timeout.
69+
(catch port_connect(OsProc#os_proc.port, Pid)),
70+
unlink(OsProc#os_proc.port),
71+
drop_port_messages(OsProc#os_proc.port)
72+
end.
73+
74+
send_many(_OsProc, []) ->
75+
ok;
76+
send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
77+
Writer(OsProc, Data),
78+
send_many(OsProc, Rest).
79+
80+
receive_many(0, _OsProc, Acc) ->
81+
{ok, lists:reverse(Acc)};
82+
receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
83+
Line = Reader(OsProc),
84+
receive_many(N - 1, OsProc, [Line | Acc]).
85+
86+
drop_port_messages(Port) ->
87+
receive
88+
{Port, _} ->
89+
drop_port_messages(Port)
90+
after 0 ->
91+
ok
92+
end.
93+
6094
% Utility functions for reading and writing
6195
% in custom functions
6296
writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
@@ -175,6 +209,8 @@ terminate(_Reason, #os_proc{port=Port}) ->
175209
catch port_close(Port),
176210
ok.
177211

212+
handle_call(get_os_proc, _From, OsProc) ->
213+
{reply, OsProc, OsProc};
178214
handle_call({set_timeout, TimeOut}, _From, OsProc) ->
179215
{reply, ok, OsProc#os_proc{timeout=TimeOut}};
180216
handle_call({prompt, Data}, _From, OsProc) ->

src/couchdb/couch_query_servers.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
-export([start_link/0, config_change/1]).
1717

1818
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
19-
-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
19+
-export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]).
2020
-export([reduce/3, rereduce/3,validate_doc_update/5]).
2121
-export([filter_docs/5]).
2222
-export([filter_view/3]).
@@ -33,6 +33,7 @@
3333
lang,
3434
ddoc_keys = [],
3535
prompt_fun,
36+
prompt_many_fun,
3637
set_timeout_fun,
3738
stop_fun
3839
}).
@@ -83,10 +84,15 @@ map_docs(Proc, Docs) ->
8384
Docs),
8485
{ok, Results}.
8586

86-
map_doc_raw(Proc, Doc) ->
87-
Json = couch_doc:to_json_obj(Doc, []),
88-
{ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
89-
87+
map_docs_raw(Proc, DocList) ->
88+
{Mod, Fun} = Proc#proc.prompt_many_fun,
89+
CommandList = lists:map(
90+
fun(Doc) ->
91+
EJson = couch_doc:to_json_obj(Doc, []),
92+
[<<"map_doc">>, EJson]
93+
end,
94+
DocList),
95+
Mod:Fun(Proc#proc.pid, CommandList).
9096

9197
stop_doc_map(nil) ->
9298
ok;
@@ -479,6 +485,7 @@ new_process(Langs, LangLimits, Lang) ->
479485
pid=Pid,
480486
% Called via proc_prompt, proc_set_timeout, and proc_stop
481487
prompt_fun={Mod, prompt},
488+
prompt_many_fun={Mod, prompt_many},
482489
set_timeout_fun={Mod, set_timeout},
483490
stop_fun={Mod, stop}}};
484491
_ ->

0 commit comments

Comments
 (0)