Permalink
Browse files

Use the new MapReduce component added to CouchDB

Computing the spatial results for a document is now done
by the new MapReduce component (an Erlang NIF) added to
CouchDB, that is, it no longer uses CouchJS.
CouchJS was removed from CouchDB.

Change-Id: I13b9f3fe675aa7680cc43fdbce1afa377e179ebb
Reviewed-on: http://review.couchbase.org/14152
Tested-by: Volker Mische <volker.mische@gmail.com>
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
1 parent ee8c395 commit c47805edd8abc93fe9b372fd1d175bc0a8e4e095 @fdmanana fdmanana committed with Damien Katz Mar 21, 2012
View
@@ -15,7 +15,7 @@ builddir:
buildandtest: all test
runtests:
- ERL_FLAGS="-pa ${COUCH_SRC} -pa ${COUCH_SRC}/../etap -pa ${COUCH_SRC}/../snappy -pa ${COUCH_SRC}/../../test/etap -pa ${COUCH_SRC}/../couch_set_view/ebin -pa ${COUCH_SRC}/../mochiweb -pa ${COUCH_SRC}/../ibrowse -pa ${COUCH_SRC}/../erlang-oauth -pa ${COUCH_SRC}/../ejson" prove ./test/*.t
+ ERL_FLAGS="-pa ${COUCH_SRC} -pa ${COUCH_SRC}/../etap -pa ${COUCH_SRC}/../snappy -pa ${COUCH_SRC}/../../test/etap -pa ${COUCH_SRC}/../couch_set_view/ebin -pa ${COUCH_SRC}/../mochiweb -pa ${COUCH_SRC}/../ibrowse -pa ${COUCH_SRC}/../erlang-oauth -pa ${COUCH_SRC}/../ejson -pa ${COUCH_SRC}/../mapreduce" prove ./test/*.t
check: clean builddir compileforcheck runtests
rm -rf build
@@ -29,13 +29,15 @@ couchTests.spatial = function(debug) {
}, */
spatial : {
basicIndex : stringFun(function(doc) {
- emit({
- type: "Point",
- coordinates: [doc.loc[0], doc.loc[1]]
- }, doc.string);
+ if (doc.loc) {
+ emit({
+ type: "Point",
+ coordinates: [doc.loc[0], doc.loc[1]]
+ }, doc.string);
+ }
}),
dontEmitAll : stringFun(function(doc) {
- if (doc._id>5) {
+ if (doc._id > 5 && doc.loc) {
emit({
type: "Point",
coordinates: [doc.loc[0], doc.loc[1]]
@@ -26,8 +26,7 @@
lib,
id_btree=nil, % the back-index
current_seq=0,
- purge_seq=0,
- query_server=nil
+ purge_seq=0
% waiting_delayed_commit=nil
}).
@@ -503,8 +503,7 @@ get_group_info(State) ->
reset_group(#spatial_group{indexes=Indexes}=Group) ->
Indexes2 = [Index#spatial{treepos=nil,treeheight=0} || Index <- Indexes],
- Group#spatial_group{db=nil,fd=nil,query_server=nil,current_seq=0,
- indexes=Indexes2}.
+ Group#spatial_group{db=nil,fd=nil,current_seq=0,indexes=Indexes2}.
reset_file(Db, Fd, DbName, #spatial_group{sig=Sig,name=Name} = Group) ->
?LOG_DEBUG("Resetting spatial group index \"~s\" in db ~s", [Name, DbName]),
@@ -33,7 +33,8 @@ update(Owner, Group) ->
#spatial_group{
db = #db{name=DbName} = Db,
name = GroupName,
- current_seq = Seq
+ current_seq = Seq,
+ indexes = Indexes
%purge_seq = PurgeSeq
} = Group,
% XXX vmx: what are purges? when do they happen?
@@ -65,6 +66,8 @@ update(Owner, Group) ->
{total_changes, TotalChanges}
]),
couch_task_status:set_update_frequency(500),
+ {ok, MapCtx} = mapreduce:start_map_context([I#spatial.def || I <- Indexes]),
+ EmptyResults = [[] || _ <- Indexes],
{ok, _, {_,{UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}}
= couch_db:enum_docs_since(Db, Seq,
@@ -75,20 +78,20 @@ update(Owner, Group) ->
{changes_done, ChangesProcessed}
]),
%?LOG_DEBUG("enum_doc_since: ~p", [Acc]),
- {ok, {ChangesProcessed+1, process_doc(Db, Owner, DocInfo, Acc)}}
+ Acc2 = process_doc(Db, Owner, MapCtx, EmptyResults, DocInfo, Acc),
+ {ok, {ChangesProcessed+1, Acc2}}
end, {0, {[], Group, IndexEmptyKVs, []}}, []),
%?LOG_DEBUG("enum_doc_since results: ~p~n~p~n~p", [UncomputedDocs, ViewKVsToAdd, DocIdViewIdKeys]),
- {Group4, Results} = spatial_compute(Group3, UncomputedDocs),
+ Results = spatial_docs(MapCtx, UncomputedDocs, EmptyResults),
% Output is way to huge
- %?LOG_DEBUG("spatial_compute results: ~p", [Results]),
+ %?LOG_DEBUG("spatial_docs results: ~p", [Results]),
{ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
- couch_query_servers:stop_doc_map(Group4#spatial_group.query_server),
NewSeq = couch_db:get_update_seq(Db),
-?LOG_DEBUG("new seq num: ~p", [NewSeq]),
- {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
+ ?LOG_DEBUG("new seq num: ~p", [NewSeq]),
+ {ok, Group4} = write_changes(Group3, ViewKVsToAdd2, DocIdViewIdKeys2,
NewSeq),
- exit({new_group, Group5#spatial_group{query_server=nil}}).
+ exit({new_group, Group4}).
@@ -130,55 +133,43 @@ view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{Vie
NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
-
-% Pendant to couch_view_updater:view_compute/2
-spatial_compute(Group, []) ->
- {Group, []};
-spatial_compute(#spatial_group{def_lang=DefLang, lib=Lib, query_server=QueryServerIn}=Group, Docs) ->
- {ok, QueryServer} =
- case QueryServerIn of
- nil -> % spatial funs not started
- Functions = [Index#spatial.def || Index <- Group#spatial_group.indexes],
- couch_query_servers:start_doc_map(DefLang, Functions, Lib);
- _ ->
- {ok, QueryServerIn}
- end,
- {ok, Results} = spatial_docs(QueryServer, Docs),
- {Group#spatial_group{query_server=QueryServer}, Results}.
-
-% Pendant to couch_query_servers:map_docs/2
-spatial_docs(Proc, Docs) ->
- % send the documents
- Results = lists:map(
- fun(Doc) ->
- Json = couch_doc:to_json_obj(Doc, []),
-
- % NOTE vmx: perhaps should map_doc renamed to something more
- % general as it can be used for most indexers
- FunsResults = couch_query_servers:proc_prompt(Proc, [<<"map_doc">>, Json]),
- % the results are a json array of function map yields like this:
- % [FunResults1, FunResults2 ...]
- % where funresults is are json arrays of key value pairs:
- % [[Geom1, Value1], [Geom2, Value2]]
- % Convert the key, value pairs to tuples like
- % [{Bbox1, {Geom1, Value1}}, {Bbox, {Geom2, Value2}}]
- lists:map(
- fun(FunRs) ->
- case FunRs of
- [] -> [];
- % do some post-processing of the result documents
- FunRs -> process_results(FunRs)
- end
- end,
- FunsResults)
- end,
- Docs),
- {ok, Results}.
+spatial_docs(MapCtx, Docs, EmptyResults) ->
+ spatial_docs(MapCtx, Docs, EmptyResults, []).
+
+spatial_docs(_MapCtx, [], _EmptyResults, Acc) ->
+ lists:reverse(Acc);
+spatial_docs(MapCtx, [Doc | RestDocs], EmptyResults, Acc) ->
+ JsonDoc = couch_doc:to_raw_json_binary(Doc),
+ % NOTE vmx: perhaps should map_doc renamed to something more
+ % general as it can be used for most indexers
+ case mapreduce:map_doc(MapCtx, JsonDoc) of
+ {ok, FunsResults} ->
+ % the results are a json array of function map yields like this:
+ % [FunResults1, FunResults2 ...]
+ % where funresults is are json arrays of key value pairs:
+ % [{Geom1Json, Value1Json}, {Geom2Json, Value2Json}]
+ % Convert the key, value pairs to tuples like
+ % [{Bbox1, {Geom1, Value1}}, {Bbox2, {Geom2, Value2}}]
+ SpatialResults = lists:map(
+ fun(FunRs) ->
+ case FunRs of
+ [] -> [];
+ % do some post-processing of the result documents
+ FunRs -> process_results(FunRs)
+ end
+ end,
+ FunsResults),
+ spatial_docs(MapCtx, RestDocs, EmptyResults, [SpatialResults | Acc]);
+ {error, Reason} ->
+ ?LOG_ERROR("Error computing spatial result for document `~s`: ~p",
+ [Doc#doc.id, Reason]),
+ spatial_docs(MapCtx, RestDocs, EmptyResults, [EmptyResults | Acc])
+ end.
% This fun computes once for each document
% This is from an old revision (796805) of couch_view_updater
-process_doc(Db, Owner, DocInfo, {Docs, Group, IndexKVs, DocIdIndexIdKeys}) ->
+process_doc(Db, Owner, MapCtx, EmptyResults, DocInfo, {Docs, Group, IndexKVs, DocIdIndexIdKeys}) ->
#spatial_group{ design_options = DesignOptions } = Group,
#doc_info{id=DocId, deleted=Deleted} = DocInfo,
LocalSeq = proplists:get_value(<<"local_seq">>,
@@ -204,10 +195,10 @@ process_doc(Db, Owner, DocInfo, {Docs, Group, IndexKVs, DocIdIndexIdKeys}) ->
case couch_util:should_flush() of
true ->
- {Group1, Results} = spatial_compute(Group, Docs2),
+ Results = spatial_docs(MapCtx, Docs2, EmptyResults),
{ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2,
Results, IndexKVs, DocIdIndexIdKeys2),
- {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
+ {ok, Group2} = write_changes(Group, ViewKVs3, DocIdViewIdKeys3,
DocInfo#doc_info.local_seq),
if is_pid(Owner) ->
ok = gen_server:cast(Owner, {partial_update, self(), Group2});
@@ -282,7 +273,9 @@ process_results(Results) ->
[process_result(Result)|Acc]
end, [], Results).
-process_result([{Geo}|[Value]]) ->
+process_result({K, V}) ->
+ {Geo} = ?JSON_DECODE(K),
+ Value = ?JSON_DECODE(V),
Type = proplists:get_value(<<"type">>, Geo),
Bbox = case Type of
<<"GeometryCollection">> ->
View
@@ -14,6 +14,7 @@
% the License.
-define(MOD, couch_spatial_updater).
+-define(JSON_ENCODE(V), ejson:encode(V)).
main(_) ->
code:add_pathz(filename:dirname(escript:script_name())),
@@ -118,7 +119,7 @@ test_process_result_geometrycollection() ->
{[{<<"type">>,<<"LineString">>},
{<<"coordinates">>,[[101.0,0.0],[102.0,1.0]]}]}]}]},
{Bbox, {Geom, <<"somedoc">>}} = ?MOD:process_result(
- [Geojson, <<"somedoc">>]),
+ {?JSON_ENCODE(Geojson), ?JSON_ENCODE(<<"somedoc">>)}),
etap:is(Geom,
{'GeometryCollection', [
{'Point', [100.0,0.0]},
@@ -161,7 +162,7 @@ test_process_result_point() ->
Geojson = {[{<<"type">>,<<"Point">>},
{<<"coordinates">>,[100.0,0.0]}]},
{Bbox, {Geom, <<"somedoc">>}} = ?MOD:process_result(
- [Geojson, <<"somedoc">>]),
+ {?JSON_ENCODE(Geojson), ?JSON_ENCODE(<<"somedoc">>)}),
etap:is(Geom, {'Point', [100.0,0.0]},
"Point was processed correctly"),
etap:is(Bbox, {100.0, 0.0, 100.0, 0.0},
@@ -172,7 +173,7 @@ test_process_result_point_bbox() ->
{<<"coordinates">>,[100.0,0.0]},
{<<"bbox">>,[100.0,0.0,105.54,8.614]}]},
{Bbox, {Geom, <<"somedoc">>}} = ?MOD:process_result(
- [Geojson, <<"somedoc">>]),
+ {?JSON_ENCODE(Geojson), ?JSON_ENCODE(<<"somedoc">>)}),
etap:is(Geom, {'Point', [100.0,0.0]},
"Point was processed correctly (with pre set bounding box)"),
etap:is(Bbox, {100.0, 0.0, 105.54, 8.614},
@@ -183,7 +184,7 @@ test_process_result_linestring() ->
Geojson = {[{<<"type">>,<<"LineString">>},
{<<"coordinates">>,[[101.0,0.0],[102.0,1.0]]}]},
{Bbox, {Geom, <<"somedoc">>}} = ?MOD:process_result(
- [Geojson, <<"somedoc">>]),
+ {?JSON_ENCODE(Geojson), ?JSON_ENCODE(<<"somedoc">>)}),
etap:is(Geom, {'LineString', [[101.0,0.0],[102.0,1.0]]},
"LineString was processed correctly"),
etap:is(Bbox, {101.0, 0.0, 102.0, 1.0},
@@ -195,7 +196,7 @@ test_process_result_linestring_toosmallbbox() ->
{<<"coordinates">>,[[101.0,0.0],[102.0,1.0]]},
{<<"bbox">>,[101.0,0.0,101.54,0.614]}]},
{Bbox, {Geom, <<"somedoc">>}} = ?MOD:process_result(
- [Geojson, <<"somedoc">>]),
+ {?JSON_ENCODE(Geojson), ?JSON_ENCODE(<<"somedoc">>)}),
etap:is(Geom, {'LineString', [[101.0,0.0],[102.0,1.0]]},
"LineString was processed correctly (with too small bounding box)"),
etap:is(Bbox, {101.0, 0.0, 101.54, 0.614},

0 comments on commit c47805e

Please sign in to comment.