Skip to content

Commit

Permalink
Revert "Revert "More efficient set view indexing""
Browse files Browse the repository at this point in the history
This reverts commit 7dd003e.

It also makes sure the optimized code path is not triggered
only for Windows.

Change-Id: Iac62e9539265f22fc98293da18f1aad905a872f6
Reviewed-on: http://review.couchbase.org/12115
Reviewed-by: Bin Cui <bin.cui@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information
fdmanana committed Jan 6, 2012
1 parent 51f34c3 commit ae7e175
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 9 deletions.
72 changes: 67 additions & 5 deletions src/couch_set_view/src/couch_set_view_updater.erl
Expand Up @@ -17,7 +17,10 @@
-include("couch_db.hrl").
-include_lib("couch_set_view/include/couch_set_view.hrl").

-define(QUEUE_ITEMS, 500).
-define(QUEUE_MAX_ITEMS, 500).
-define(QUEUE_MAX_SIZE, 100 * 1024).
-define(MIN_FLUSH_BATCH_SIZE, 500).
-define(MIN_MAP_BATCH_SIZE, 500).
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).

-record(writer_acc, {
Expand Down Expand Up @@ -48,12 +51,17 @@ update(Owner, Group, NewSeqs) ->
0, lists:zip(NewSeqs, SinceSeqs)),

{ok, MapQueue} = couch_work_queue:new(
[{max_size, 100000}, {max_items, ?QUEUE_ITEMS}]),
[{max_size, ?QUEUE_MAX_SIZE}, {max_items, ?QUEUE_MAX_ITEMS}]),
{ok, WriteQueue} = couch_work_queue:new(
[{max_size, 100000}, {max_items, ?QUEUE_ITEMS}]),
[{max_size, ?QUEUE_MAX_SIZE}, {max_items, ?QUEUE_MAX_ITEMS}]),

spawn_link(fun() ->
do_maps(add_query_server(Group), MapQueue, WriteQueue)
case can_do_batched_maps(Group) of
true ->
do_batched_maps(add_query_server(Group), MapQueue, WriteQueue, []);
false->
do_maps(add_query_server(Group), MapQueue, WriteQueue)
end
end),

Parent = self(),
Expand Down Expand Up @@ -258,6 +266,18 @@ load_doc(Db, PartitionId, DocInfo, MapQueue, DocOpts, IncludeDesign) ->
end
end.


can_do_batched_maps(#set_view_group{def_lang = <<"erlang">>}) ->
true;
can_do_batched_maps(_Group) ->
case os:type() of
{win32, _} ->
false;
_ ->
true
end.


do_maps(#set_view_group{query_server = Qs} = Group, MapQueue, WriteQueue) ->
case couch_work_queue:dequeue(MapQueue) of
closed ->
Expand All @@ -278,14 +298,56 @@ do_maps(#set_view_group{query_server = Qs} = Group, MapQueue, WriteQueue) ->
end.


% TODO: batch by byte size as well, not just changes #
do_batched_maps(#set_view_group{query_server = Qs} = Group, MapQueue, WriteQueue, Acc) ->
case couch_work_queue:dequeue(MapQueue) of
closed ->
compute_map_results(Group, WriteQueue, Acc),
couch_work_queue:close(WriteQueue),
couch_query_servers:stop_doc_map(Qs);
{ok, Queue} ->
Acc2 = Acc ++ Queue,
case length(Acc2) >= ?MIN_MAP_BATCH_SIZE of
true ->
compute_map_results(Group, WriteQueue, Acc2),
do_batched_maps(Group, MapQueue, WriteQueue, []);
false ->
do_batched_maps(Group, MapQueue, WriteQueue, Acc2)
end
end.


compute_map_results(_Group, _WriteQueue, []) ->
ok;
compute_map_results(#set_view_group{query_server = Qs}, WriteQueue, Queue) ->
{Deleted, NotDeleted} = lists:partition(
fun({_Seq, Doc, _PartId}) -> Doc#doc.deleted end,
Queue),
NotDeletedDocs = [Doc || {_Seq, Doc, _PartId} <- NotDeleted],
{ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, NotDeletedDocs),
lists:foreach(
fun({MapResults, {Seq, Doc, PartId}}) ->
Item = {Seq, Doc#doc.id, PartId, MapResults},
ok = couch_work_queue:queue(WriteQueue, Item)
end,
lists:zip(MapResultList, NotDeleted)),
lists:foreach(
fun({Seq, #doc{id = Id, deleted = true}, PartId}) ->
Item = {Seq, Id, PartId, []},
ok = couch_work_queue:queue(WriteQueue, Item)
end,
Deleted).


% TODO: batch by byte size as well, not just changes #
do_writes(#writer_acc{kvs = Kvs, write_queue = WriteQueue} = Acc) ->
case couch_work_queue:dequeue(WriteQueue) of
closed ->
#writer_acc{group = NewGroup} = flush_writes(Acc#writer_acc{final_batch = true}),
NewGroup;
{ok, Queue} ->
Kvs2 = Kvs ++ Queue,
case length(Kvs2) >= ?QUEUE_ITEMS of
case length(Kvs2) >= ?MIN_FLUSH_BATCH_SIZE of
true ->
Acc2 = flush_writes(Acc#writer_acc{kvs = Kvs2});
false ->
Expand Down
11 changes: 10 additions & 1 deletion src/couchdb/couch_native_process.erl
Expand Up @@ -42,7 +42,7 @@

-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
handle_info/2]).
-export([set_timeout/2, prompt/2]).
-export([set_timeout/2, prompt/2, prompt_many/2]).

-define(STATE, native_proc_state).
-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
Expand All @@ -62,6 +62,15 @@ set_timeout(Pid, TimeOut) ->
prompt(Pid, Data) when is_list(Data) ->
gen_server:call(Pid, {prompt, Data}).

prompt_many(Pid, DataList) ->
prompt_many(Pid, DataList, []).

prompt_many(_Pid, [], Acc) ->
{ok, lists:reverse(Acc)};
prompt_many(Pid, [Data | Rest], Acc) ->
Result = prompt(Pid, Data),
prompt_many(Pid, Rest, [Result | Acc]).

% gen_server callbacks
init([]) ->
{ok, #evstate{ddocs=dict:new()}}.
Expand Down
34 changes: 31 additions & 3 deletions src/couchdb/couch_os_process.erl
Expand Up @@ -14,13 +14,13 @@
-behaviour(gen_server).

-export([start_link/1, start_link/2, start_link/3, stop/1]).
-export([set_timeout/2, prompt/2]).
-export([set_timeout/2, prompt/2, prompt_many/2]).
-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).

-include("couch_db.hrl").

-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]).
-define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]).

-record(os_proc,
{command,
Expand All @@ -42,7 +42,7 @@ stop(Pid) ->

% Read/Write API
set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
ok = gen_server:call(Pid, {set_timeout, TimeOut}).
ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity).

% Used by couch_db_update_notifier.erl
send(Pid, Data) ->
Expand All @@ -57,6 +57,32 @@ prompt(Pid, Data) ->
throw(Error)
end.

prompt_many(Pid, DataList) ->
OsProc = gen_server:call(Pid, get_os_proc, infinity),
true = port_connect(OsProc#os_proc.port, self()),
try
send_many(OsProc, DataList),
receive_many(length(DataList), OsProc, [])
after
% Can throw badarg error, when OsProc Pid is dead or port was closed
% by the readline function on error/timeout.
(catch port_connect(OsProc#os_proc.port, Pid)),
unlink(OsProc#os_proc.port),
drop_port_messages(OsProc#os_proc.port)
end.

send_many(_OsProc, []) ->
ok;
send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
Writer(OsProc, Data),
send_many(OsProc, Rest).

receive_many(0, _OsProc, Acc) ->
{ok, lists:reverse(Acc)};
receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
Line = Reader(OsProc),
receive_many(N - 1, OsProc, [Line | Acc]).

drop_port_messages(Port) ->
receive
{Port, _} ->
Expand Down Expand Up @@ -176,6 +202,8 @@ terminate(_Reason, #os_proc{port=Port}) ->
catch port_close(Port),
ok.

handle_call(get_os_proc, _From, OsProc) ->
{reply, OsProc, OsProc};
handle_call({set_timeout, TimeOut}, _From, OsProc) ->
{reply, ok, OsProc#os_proc{timeout=TimeOut}};
handle_call({prompt, Data}, _From, OsProc) ->
Expand Down
13 changes: 13 additions & 0 deletions src/couchdb/couch_query_servers.erl
Expand Up @@ -20,6 +20,7 @@
-export([reduce/3, rereduce/3,validate_doc_update/5]).
-export([filter_docs/5]).
-export([filter_view/3]).
-export([map_docs_raw/2]).

-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).

Expand All @@ -33,6 +34,7 @@
lang,
ddoc_keys = [],
prompt_fun,
prompt_many_fun,
set_timeout_fun,
stop_fun
}).
Expand Down Expand Up @@ -83,6 +85,16 @@ map_docs(Proc, Docs) ->
Docs),
{ok, Results}.

map_docs_raw(Proc, DocList) ->
{Mod, Fun} = Proc#proc.prompt_many_fun,
CommandList = lists:map(
fun(Doc) ->
EJson = couch_doc:to_json_obj(Doc, []),
[<<"map_doc">>, EJson]
end,
DocList),
Mod:Fun(Proc#proc.pid, CommandList).

map_doc_raw(Proc, Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
{ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
Expand Down Expand Up @@ -479,6 +491,7 @@ new_process(Langs, LangLimits, Lang) ->
pid=Pid,
% Called via proc_prompt, proc_set_timeout, and proc_stop
prompt_fun={Mod, prompt},
prompt_many_fun={Mod, prompt_many},
set_timeout_fun={Mod, set_timeout},
stop_fun={Mod, stop}}};
_ ->
Expand Down

0 comments on commit ae7e175

Please sign in to comment.