Permalink
Browse files

CBD-32 Faster view merging and parsing

Change-Id: Ic80456651dc25baead3b24c74bd731a15f245548
Reviewed-on: http://review.couchbase.org/17464
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Reviewed-by: Bin Cui <bin.cui@gmail.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
Tested-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
1 parent 481fb99 commit 767de84e1d4c1b6838608e2f6c52a20996af9df9 @fdmanana fdmanana committed with Damien Katz Jun 12, 2012
Showing with 7,443 additions and 57 deletions.
  1. +5 −0 .gitignore
  2. +3 −0 Makefile.am
  3. +1 −0 configure.ac
  4. +1 −0 license.skip
  5. +7 −1 src/Makefile.am
  6. +2 −0 src/couch_index_merger/Makefile.am
  7. +113 −0 src/couch_index_merger/src/couch_http_view_streamer.erl
  8. +16 −17 src/couch_index_merger/src/couch_httpd_view_merger.erl
  9. +21 −3 src/couch_index_merger/src/couch_index_merger.erl
  10. +104 −33 src/couch_index_merger/src/couch_view_merger.erl
  11. +1 −1 src/couch_set_view/src/couch_set_view_http.erl
  12. +135 −0 src/couch_view_parser/Makefile.am
  13. +12 −0 src/couch_view_parser/couch_view_parser.app.in
  14. +950 −0 src/couch_view_parser/couch_view_parser.cc
  15. +70 −0 src/couch_view_parser/couch_view_parser.erl
  16. +137 −0 src/couch_view_parser/couch_view_parser.h
  17. +601 −0 src/couch_view_parser/couch_view_parser_nif.cc
  18. +129 −0 src/couch_view_parser/erl_nif_compat.h
  19. +958 −0 src/couch_view_parser/test/01-map-view.t
  20. +287 −0 src/couch_view_parser/test/02-reduce-view.t
  21. +22 −0 src/couch_view_parser/test/run.tpl
  22. +114 −0 src/couch_view_parser/win32/couch_view_parser.vcxproj.tpl.in
  23. +1 −0 src/couch_view_parser/win32/msbuild.bat.tpl.in
  24. +175 −0 src/couch_view_parser/yajl/yajl.c
  25. +49 −0 src/couch_view_parser/yajl/yajl_alloc.c
  26. +34 −0 src/couch_view_parser/yajl/yajl_alloc.h
  27. +103 −0 src/couch_view_parser/yajl/yajl_buf.c
  28. +57 −0 src/couch_view_parser/yajl/yajl_buf.h
  29. +69 −0 src/couch_view_parser/yajl/yajl_bytestack.h
  30. +75 −0 src/couch_view_parser/yajl/yajl_common.h
  31. +242 −0 src/couch_view_parser/yajl/yajl_encode.c
  32. +34 −0 src/couch_view_parser/yajl/yajl_encode.h
  33. +354 −0 src/couch_view_parser/yajl/yajl_gen.c
  34. +157 −0 src/couch_view_parser/yajl/yajl_gen.h
  35. +763 −0 src/couch_view_parser/yajl/yajl_lex.c
  36. +117 −0 src/couch_view_parser/yajl/yajl_lex.h
  37. +226 −0 src/couch_view_parser/yajl/yajl_parse.h
  38. +498 −0 src/couch_view_parser/yajl/yajl_parser.c
  39. +78 −0 src/couch_view_parser/yajl/yajl_parser.h
  40. +503 −0 src/couch_view_parser/yajl/yajl_tree.c
  41. +185 −0 src/couch_view_parser/yajl/yajl_tree.h
  42. +7 −0 src/couch_view_parser/yajl/yajl_version.c
  43. +23 −0 src/couch_view_parser/yajl/yajl_version.h
  44. +2 −1 src/couchdb/couch_app.erl
  45. +1 −1 test/etap/run.tpl
  46. +1 −0 utils/Makefile.am
View
@@ -74,6 +74,10 @@ src/mapreduce/mapreduce.app
src/mapreduce/.deps/
src/mapreduce/.libs/
src/mapreduce/priv
+src/couch_view_parser/couch_view_parser.app
+src/couch_view_parser/.deps/
+src/couch_view_parser/.libs/
+src/couch_view_parser/priv
src/mochiweb/mochiweb.app
src/snappy/.deps/
src/snappy/.libs/
@@ -85,6 +89,7 @@ src/snappy/*/.deps/
src/couch_set_view/ebin/
src/couch_set_view/test/run
src/couch_index_merger/ebin/
+src/couch_view_parser/test/run
src/mapreduce/test/run
test/local.ini
test/etap/.deps/
View
@@ -166,6 +166,7 @@ $(COUCHDB_PLT):
-pa src/couchdb \
-pa src/couch_set_view \
-pa src/couch_index_merger \
+ -pa src/couch_view_parser \
-pa src/mapreduce \
--apps \
compiler \
@@ -200,13 +201,15 @@ dialyzer: all $(COUCHDB_PLT)
-pa src/couchdb \
-pa src/couch_set_view \
-pa src/couch_index_merger \
+ -pa src/couch_view_parser \
-pa src/mapreduce \
-pa src/lhttpc \
-pa src/snappy \
-r \
src/couchdb \
src/couch_set_view/ebin \
src/couch_index_merger/ebin \
+ src/couch_view_parser \
src/snappy \
src/lhttpc \
src/mapreduce
View
@@ -512,6 +512,7 @@ AC_CONFIG_FILES([src/snappy/snappy-$SNAPPY_MAJOR.$SNAPPY_MINOR.$SNAPPY_PATCHLEVE
AC_CONFIG_FILES([src/ejson/Makefile])
AC_CONFIG_FILES([src/couch_set_view/Makefile])
AC_CONFIG_FILES([src/couch_index_merger/Makefile])
+AC_CONFIG_FILES([src/couch_view_parser/Makefile])
AS_IF([ test "x${ac_enable_v8}" = "xyes" ], [
AC_CONFIG_FILES([src/mapreduce/Makefile])
])
View
@@ -97,6 +97,7 @@
^src/couch_index_merger/Makefile
^src/couch_index_merger/Makefile.in
^src/couch_index_merger/ebin/.*beam
+^src/couch_view_parser/*
^stamp-h1
^test/Makefile
^test/Makefile.in
View
@@ -24,6 +24,12 @@ SUBDIRS += \
etap \
lhttpc \
mochiweb \
- snappy \
+ snappy
+
+if !WINDOWS
+SUBDIRS += couch_view_parser
+endif
+
+SUBDIRS += \
couch_set_view \
couch_index_merger
@@ -26,6 +26,7 @@ source_files = \
src/couch_index_merger.erl \
src/couch_view_merger.erl \
src/couch_view_merger_queue.erl \
+ src/couch_http_view_streamer.erl \
src/couch_httpd_view_merger.erl \
src/couch_skew.erl
@@ -34,6 +35,7 @@ compiled_files = \
ebin/couch_index_merger.beam \
ebin/couch_view_merger.beam \
ebin/couch_view_merger_queue.beam \
+ ebin/couch_http_view_streamer.beam \
ebin/couch_httpd_view_merger.beam \
ebin/couch_skew.beam
@@ -0,0 +1,113 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_http_view_streamer).
+
+-include("couch_db.hrl").
+
+-export([parse/3]).
+
+
+parse(DataFun, Queue, FromUrl) ->
+ {ok, Ctx} = couch_view_parser:start_context(),
+ JsonUrl = ?JSON_ENCODE(FromUrl),
+ ok = stream_loop(Ctx, Queue, DataFun, JsonUrl).
+
+
+stream_loop(Ctx, Queue, DataFun, Url) ->
+ case next_streamer_state(Ctx, DataFun) of
+ {ok, debug_infos, _DebugInfos} ->
+ % TODO, currently broken for reduce views.
+ % View response only gets the debug_info for the merger (local) node.
+ stream_loop(Ctx, Queue, DataFun, Url);
+ {ok, row_count, TotalRowsList} ->
+ TotalRows = list_to_integer(TotalRowsList),
+ ok = couch_view_merger_queue:queue(Queue, {row_count, TotalRows}),
+ stream_loop(Ctx, Queue, DataFun, Url);
+ {ok, rows, Rows} ->
+ lists:foreach(
+ fun(Row) ->
+ ok = couch_view_merger_queue:queue(Queue, transform_row(Row, Url))
+ end,
+ Rows),
+ stream_loop(Ctx, Queue, DataFun, Url);
+ {ok, errors, Errors} ->
+ lists:foreach(
+ fun(Error) ->
+ ok = couch_view_merger_queue:queue(Queue, make_error_item(Error, Url))
+ end,
+ Errors),
+ stream_loop(Ctx, Queue, DataFun, Url);
+ {ok, done} ->
+ ok
+ end.
+
+
+next_streamer_state(Ctx, DataFun) ->
+ case couch_view_parser:next_state(Ctx) of
+ {ok, need_more_data} ->
+ case DataFun() of
+ {ok, Chunk} ->
+ case couch_view_parser:parse_chunk(Ctx, Chunk) of
+ ok ->
+ next_streamer_state(Ctx, DataFun);
+ {error, _} = Error ->
+ throw(Error)
+ end;
+ eof ->
+ {ok, done}
+ end;
+ {error, _Reason} = Error ->
+ throw(Error);
+ Else ->
+ Else
+ end.
+
+
+% _all_docs error row
+transform_row({{Key, error}, Reason}, _Url) ->
+ RowJson = <<"{\"key\":", Key/binary, ",\"error\":", Reason/binary, "}">>,
+ {{?JSON_DECODE(Key), error}, {row_json, RowJson}};
+
+% map view rows
+transform_row({{Key, DocId}, Value}, _Url) when is_binary(Value) ->
+ RowJson = <<"{\"key\":", Key/binary, ",\"id\":", DocId/binary,
+ ",\"value\":", Value/binary, "}">>,
+ {{?JSON_DECODE(Key), ?JSON_DECODE(DocId)}, {row_json, RowJson}};
+
+transform_row({{Key, DocId}, Value, Doc}, _Url) when is_binary(Value) ->
+ RowJson = <<"{\"key\":", Key/binary, ",\"id\":", DocId/binary,
+ ",\"value\":", Value/binary, ",\"doc\":", Doc/binary, "}">>,
+ {{?JSON_DECODE(Key), ?JSON_DECODE(DocId)}, {row_json, RowJson}};
+
+transform_row({{Key, DocId}, {PartId, _Node, Value}}, Url) ->
+ RowJson = <<"{\"key\":", Key/binary, ",\"id\":", DocId/binary,
+ ",\"partition\":", PartId/binary, ",\"node\":", Url/binary,
+ ",\"value\":", Value/binary, "}">>,
+ {{?JSON_DECODE(Key), ?JSON_DECODE(DocId)}, {row_json, RowJson}};
+
+transform_row({{Key, DocId}, {PartId, _Node, Value}, Doc}, Url) ->
+ RowJson = <<"{\"key\":", Key/binary, ",\"id\":", DocId/binary,
+ ",\"partition\":", PartId/binary, ",\"node\":", Url/binary,
+ ",\"value\":", Value/binary, ",\"doc\":", Doc/binary, "}">>,
+ {{?JSON_DECODE(Key), ?JSON_DECODE(DocId)}, {row_json, RowJson}};
+
+% reduce view rows
+transform_row({Key, Value}, _Url) when is_binary(Key) ->
+ RowJson = <<"{\"key\":", Key/binary, ",\"value\":", Value/binary, "}">>,
+ % value_json, in case rereduce needs to be done by the merger
+ {?JSON_DECODE(Key), {row_json, RowJson}, {value_json, Value}}.
+
+
+make_error_item({_Node, Reason}, Url) ->
+ Json = <<"{\"from\":", Url/binary, ",\"reason\":", Reason/binary, "}">>,
+ {error, row_json, Json}.
@@ -129,7 +129,7 @@ http_sender({start, RowCount}, #sender_acc{req = Req} = SAcc) ->
Start = [<<"{\"total_rows\":">>, integer_to_list(RowCount), <<",\"rows\":[">>],
{ok, SAcc#sender_acc{resp = Resp, rows_acc = [Start], acc = <<"\r\n">>}};
-http_sender({row, Row}, SAcc) ->
+http_sender({row, Row}, SAcc) when is_binary(Row) ->
SAcc2 = maybe_flush_rows(Row, SAcc),
{ok, SAcc2#sender_acc{acc = <<",\r\n">>}};
@@ -154,20 +154,13 @@ http_sender(stop, SAcc) ->
couch_httpd:send_chunk(Resp, Buffer2),
{ok, couch_httpd:end_json_response(Resp)};
-http_sender({error, Url, Reason}, #sender_acc{on_error = continue, error_acc = ErrorAcc} = SAcc) ->
- Row = {[
- {<<"from">>, couch_index_merger:rem_passwd(Url)},
- {<<"reason">>, to_binary(Reason)}
- ]},
- ErrorAcc2 = [?JSON_ENCODE(Row) | ErrorAcc],
+http_sender({error, _, _} = Error, #sender_acc{on_error = continue, error_acc = ErrorAcc} = SAcc) ->
+ ErrorAcc2 = [make_error_row(Error) | ErrorAcc],
{ok, SAcc#sender_acc{error_acc = ErrorAcc2}};
-http_sender({error, Url, Reason}, #sender_acc{on_error = stop} = SAcc) ->
+http_sender({error, _, _} = Error, #sender_acc{on_error = stop} = SAcc) ->
#sender_acc{rows_acc = RowsAcc, req = Req, resp = Resp} = SAcc,
- Row = {[
- {<<"from">>, couch_index_merger:rem_passwd(Url)},
- {<<"reason">>, to_binary(Reason)}
- ]},
+ ErrorRow = make_error_row(Error),
case Resp of
nil ->
% we haven't started the response yet
@@ -177,26 +170,25 @@ http_sender({error, Url, Reason}, #sender_acc{on_error = stop} = SAcc) ->
Buffer1 = [
<<"{\"total_rows\":0,\"rows\":[]\r\n">>,
<<",\r\n\"errors\":[">>,
- ?JSON_ENCODE(Row),
+ ErrorRow,
<<"]">>
];
_ ->
Resp2 = Resp,
- Buffer1 = [<<"\r\n],\"errors\":[">>, ?JSON_ENCODE(Row), <<"]">>]
+ Buffer1 = [<<"\r\n],\"errors\":[">>, ErrorRow, <<"]">>]
end,
Buffer2 = [lists:reverse(RowsAcc), Buffer1, debug_info_buffer(SAcc), <<"\r\n}">>],
couch_httpd:send_chunk(Resp2, Buffer2),
couch_httpd:end_json_response(Resp2),
{stop, Resp2}.
-maybe_flush_rows(NewRow, SAcc) ->
+maybe_flush_rows(JsonRow, SAcc) ->
#sender_acc{
acc = Acc,
rows_acc = RowsAcc,
rows_acc_size = RowsAccSize
} = SAcc,
- JsonRow = ?JSON_ENCODE(NewRow),
- RowsAccSize2 = RowsAccSize + iolist_size(JsonRow),
+ RowsAccSize2 = RowsAccSize + byte_size(JsonRow),
SAcc2 = SAcc#sender_acc{
rows_acc = [[Acc, JsonRow] | RowsAcc],
rows_acc_size = RowsAccSize2
@@ -384,3 +376,10 @@ validate_on_error_param(Value) ->
Msg = io_lib:format("Invalid value (`~s`) for the parameter `on_error`."
" It must be `continue` (default) or `stop`.", [to_binary(Value)]),
throw({bad_request, Msg}).
+
+
+make_error_row({error, row_json, Json}) ->
+ Json;
+make_error_row({error, Url, Reason}) ->
+ ?JSON_ENCODE({[{<<"from">>, iolist_to_binary(Url)},
+ {<<"reason">>, to_binary(Reason)}]}).
@@ -635,7 +635,6 @@ http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
end.
run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
- EventFun = Mod:make_event_fun(MergeParams#index_merge.http_params, Queue),
{Url, Method, Headers, Body, BaseOptions} =
Mod:http_index_folder_req_details(IndexSpec, MergeParams, DDoc),
#index_merge{
@@ -646,9 +645,17 @@ run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
case lhttpc:request(Url, Method, Headers, Body, Timeout, LhttpcOptions) of
{ok, {{200, _}, _RespHeaders, Pid}} when is_pid(Pid) ->
put(streamer_pid, Pid),
- DataFun = fun() -> stream_data(Pid, Timeout) end,
try
- json_stream_parse:events(DataFun, EventFun)
+ case os:type() of
+ {win32, _} ->
+ % TODO: make couch_view_parser build and run on Windows
+ EventFun = Mod:make_event_fun(MergeParams#index_merge.http_params, Queue),
+ DataFun = fun() -> stream_data(Pid, Timeout) end,
+ json_stream_parse:events(DataFun, EventFun);
+ _ ->
+ DataFun = fun() -> next_chunk(Pid, Timeout) end,
+ ok = couch_http_view_streamer:parse(DataFun, Queue, get(from_url))
+ end
catch throw:{error, Error} ->
ok = couch_view_merger_queue:queue(Queue, {error, Url, Error})
after
@@ -696,6 +703,17 @@ stream_data(Pid, Timeout) ->
end.
+next_chunk(Pid, Timeout) ->
+ case lhttpc:get_body_part(Pid, Timeout) of
+ {ok, {http_eob, _Trailers}} ->
+ eof;
+ {ok, _Data} = Ok ->
+ Ok;
+ {error, _} = Error ->
+ throw(Error)
+ end.
+
+
stream_all(Pid, Timeout, Acc) ->
case stream_data(Pid, Timeout) of
{<<>>, _} ->
Oops, something went wrong.

0 comments on commit 767de84

Please sign in to comment.