Permalink
Browse files

Merge remote-tracking branch 'origin/2.0.1'

Conflicts:
	src/mapreduce/test/01-map.t

Change-Id: I1c8f67f97d1c6f0db05094745a67d715dae3a215
  • Loading branch information...
2 parents 7f5b21e + 059b672 commit a5e20cf9b255580394a362d229d3c74aa3c229b5 @fdmanana fdmanana committed Jan 28, 2013
@@ -472,25 +472,38 @@ load_doc(Db, PartitionId, DocInfo, MapQueue, Group, InitialBuild) ->
do_maps(Group, MapQueue, WriteQueue) ->
+ #set_view_group{
+ set_name = SetName,
+ name = DDocId,
+ type = Type
+ } = Group,
case couch_work_queue:dequeue(MapQueue) of
closed ->
couch_work_queue:close(WriteQueue);
{ok, Queue, _QueueSize} ->
+ ViewCount = length(Group#set_view_group.views),
Items = lists:foldr(
fun({Seq, #doc{id = Id, deleted = true}, PartitionId}, Acc) ->
Item = {Seq, Id, PartitionId, []},
[Item | Acc];
({Seq, #doc{id = Id, deleted = false} = Doc, PartitionId}, Acc) ->
try
{ok, Result} = couch_set_view_mapreduce:map(Doc),
- Item = {Seq, Id, PartitionId, Result},
+ {Result2, _} = lists:foldr(
+ fun({error, Reason}, {AccRes, Pos}) ->
+ ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping"
+ " document `~s` for view `~s`: ~s",
+ Args = [SetName, Type, DDocId, Id,
+ view_name(Group, Pos), couch_util:to_binary(Reason)],
+ ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
+ {[[] | AccRes], Pos - 1};
+ (KVs, {AccRes, Pos}) ->
+ {[KVs | AccRes], Pos - 1}
+ end,
+ {[], ViewCount}, Result),
+ Item = {Seq, Id, PartitionId, Result2},
[Item | Acc]
catch _:{error, Reason} ->
- #set_view_group{
- set_name = SetName,
- name = DDocId,
- type = Type
- } = Group,
ErrorMsg = "Bucket `~s`, ~s group `~s`, error mapping document `~s`: ~s",
Args = [SetName, Type, DDocId, Id, couch_util:to_binary(Reason)],
?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
@@ -503,6 +516,17 @@ do_maps(Group, MapQueue, WriteQueue) ->
end.
+view_name(#set_view_group{views = Views}, ViewPos) ->
+ V = lists:nth(ViewPos, Views),
+ case V#set_view.map_names of
+ [] ->
+ [{Name, _} | _] = V#set_view.reduce_funs;
+ [Name | _] ->
+ ok
+ end,
+ Name.
+
+
do_writes(Acc) ->
#writer_acc{
kvs = Kvs,
@@ -39,13 +39,13 @@ update_btree(Bt, FilePath, BufferSize, PurgeFun, PurgeAcc) ->
(catch file:advise(Fd, 0, 0, sequential)),
try
update_btree_loop(
- Fd, Bt, BufferSize, PurgeFun, PurgeAcc, [], 0, 0, 0)
+ Fd, Bt, BufferSize, PurgeFun, PurgeAcc, [], 0, 0, 0, 0, 0)
after
ok = file:close(Fd)
end.
update_btree_loop(Fd, Bt, BufferSize, PurgeFun, PurgeAcc,
- Acc, AccSize, Inserted, Deleted) ->
+ Acc, AccSize, Inserted, Deleted, FlushStartOffset, FlushEndOffset) ->
case file:read(Fd, 4) of
{ok, <<Len:32>>} ->
{ok, ActionBin} = file:read(Fd, Len),
@@ -60,21 +60,26 @@ update_btree_loop(Fd, Bt, BufferSize, PurgeFun, PurgeAcc,
Inserted2 = Inserted + 1,
Deleted2 = Deleted
end,
+ FlushEndOffset2 = FlushEndOffset + 4 + Len,
case AccSize2 >= BufferSize of
true ->
+ _ = file:advise(Fd, FlushStartOffset, FlushEndOffset2, dont_need),
Actions = lists:reverse(Acc2),
{ok, [], PurgeAcc2, Bt2} = couch_btree:query_modify_raw(
Bt, Actions, PurgeFun, PurgeAcc),
ok = couch_file:flush(Bt#btree.fd),
update_btree_loop(Fd, Bt2, BufferSize,
- PurgeFun, PurgeAcc2, [], 0, Inserted2, Deleted2);
+ PurgeFun, PurgeAcc2, [], 0,
+ Inserted2, Deleted2, FlushEndOffset2, FlushEndOffset2);
false ->
update_btree_loop(Fd, Bt, BufferSize,
- PurgeFun, PurgeAcc, Acc2, AccSize2, Inserted2, Deleted2)
+ PurgeFun, PurgeAcc, Acc2, AccSize2,
+ Inserted2, Deleted2, FlushStartOffset, FlushEndOffset2)
end;
eof when Acc == [] ->
{ok, PurgeAcc, Bt, Inserted, Deleted};
eof ->
+ _ = file:advise(Fd, FlushStartOffset, FlushEndOffset, dont_need),
Actions = lists:reverse(Acc),
{ok, [], PurgeAcc2, Bt2} = couch_btree:query_modify_raw(Bt, Actions, PurgeFun, PurgeAcc),
ok = couch_file:flush(Bt#btree.fd),
@@ -58,7 +58,7 @@ num_docs() -> 1000.
main(_) ->
test_util:init_code_path(),
- etap:plan(17),
+ etap:plan(21),
case (catch test()) of
ok ->
etap:end_tests();
@@ -77,6 +77,8 @@ test() ->
etap:diag("Testing map function with a runtime error"),
test_map_runtime_error(),
+ etap:diag("Testing map function with a runtime error for a group of a views"),
+ test_map_runtime_error_multiple_views(),
etap:diag("Testing map function with invalid syntax"),
test_map_syntax_error(),
@@ -222,6 +224,64 @@ test_map_runtime_error() ->
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()).
+test_map_runtime_error_multiple_views() ->
+ couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
+ couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
+
+ DDocId = <<"_design/test">>,
+ DDoc = {[
+ {<<"meta">>, {[{<<"id">>, DDocId}]}},
+ {<<"json">>, {[
+ {<<"views">>, {[
+ {<<"test1">>, {[
+ {<<"map">>, <<"function(doc, meta) { emit(doc.value, 1); }">>}
+ ]}},
+ {<<"test2">>, {[
+ {<<"map">>, <<"function(doc, meta) { emit(doc.value.foo.bar, 2); }">>}
+ ]}},
+ {<<"test3">>, {[
+ {<<"map">>, <<"function(doc, meta) { emit(doc.value, 3); }">>}
+ ]}}
+ ]}}
+ ]}}
+ ]},
+ populate_set(DDoc, 4),
+
+ ok = configure_view_group(DDocId, [0, 1, 2, 3], []),
+ GroupPid = couch_set_view:get_group_pid(test_set_name(), DDocId),
+ MonRef = erlang:monitor(process, GroupPid),
+
+ QueryResult2 = (catch query_map_view(DDocId, <<"test2">>, false)),
+ etap:is(QueryResult2, {ok, []}, "Map view test2 query returned 0 rows"),
+
+ QueryResult1 = (catch query_map_view(DDocId, <<"test1">>, false)),
+ ExpectedRows1 = [
+ {{{json, <<"1">>}, <<"doc1">>}, {json, <<"1">>}},
+ {{{json, <<"2">>}, <<"doc2">>}, {json, <<"1">>}},
+ {{{json, <<"3">>}, <<"doc3">>}, {json, <<"1">>}},
+ {{{json, <<"4">>}, <<"doc4">>}, {json, <<"1">>}}
+ ],
+ etap:is(QueryResult1, {ok, ExpectedRows1}, "Map view test1 query returned 4 rows"),
+
+ QueryResult3 = (catch query_map_view(DDocId, <<"test3">>, false)),
+ ExpectedRows3 = [
+ {{{json, <<"1">>}, <<"doc1">>}, {json, <<"3">>}},
+ {{{json, <<"2">>}, <<"doc2">>}, {json, <<"3">>}},
+ {{{json, <<"3">>}, <<"doc3">>}, {json, <<"3">>}},
+ {{{json, <<"4">>}, <<"doc4">>}, {json, <<"3">>}}
+ ],
+ etap:is(QueryResult3, {ok, ExpectedRows3}, "Map view test3 query returned 4 rows"),
+
+ receive
+ {'DOWN', MonRef, _, _, _} ->
+ etap:bail("view group died")
+ after 5000 ->
+ etap:is(is_process_alive(GroupPid), true, "View group is still alive")
+ end,
+ couch_util:shutdown_sync(GroupPid),
+ couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()).
+
+
test_map_syntax_error() ->
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
@@ -492,7 +552,7 @@ query_map_view(DDocId, ViewName, Stale) ->
},
{ok, _, Rows} = couch_set_view:fold(Group, View, FoldFun, [], ViewArgs),
couch_set_view:release_group(Group),
- {ok, Rows}.
+ {ok, lists:reverse(Rows)}.
query_reduce_view(DDocId, ViewName, Stale) ->
@@ -394,8 +394,12 @@ btree_by_seq_split(#doc_info{id=Id, local_seq=Seq, rev={RevPos, RevId},
{<<Seq:48>>, Val}.
btree_by_seq_join(<<Seq:48>>, Val) ->
- <<SizeId:12,SizeBody:28,Deleted:1,Bp:47,RevPos:48,Meta:8,
+ <<SizeId:12,SizeBody:28,Deleted:1,Bp:47,RevPos0:48,Meta:8,
Id:SizeId/binary,RevId/binary>> = Val,
+ % It is possible when upgrading from 1.8.x to 2.0.0 to create items
+ % on disk with a revision number of 0, which is not a valid revision
+ % number. We fix this on load here and in ep-engine.
+ RevPos = case RevPos0 of 0 -> 1; X -> X end,
#doc_info{
id = binary:copy(Id),
local_seq = Seq,
@@ -412,7 +416,11 @@ btree_by_id_split(#doc_info{id=Id, local_seq=Seq, rev={RevPos,RevId},
{Id, Val}.
btree_by_id_join(Id, Bin) ->
- <<Seq:48,Size:32,DeletedBit:1,Bp:47,RevPos:48,Meta:8,RevId/binary>> = Bin,
+ <<Seq:48,Size:32,DeletedBit:1,Bp:47,RevPos0:48,Meta:8,RevId/binary>> = Bin,
+ % It is possible when upgrading from 1.8.x to 2.0.0 to create items
+ % on disk with a revision number of 0, which is not a valid revision
+ % number. We fix this on load here and in ep-engine.
+ RevPos = case RevPos0 of 0 -> 1; X -> X end,
#doc_info{
id = Id,
local_seq = Seq,
@@ -175,14 +175,27 @@ do_maps(Group, MapQueue, WriteQueue, AccItems, AccItemsSize) ->
end,
couch_work_queue:close(WriteQueue);
{ok, Queue, QueueSize} ->
+ ViewCount = length(Group#group.views),
Items = lists:foldr(
fun({Seq, #doc{id = Id, deleted = true}}, Acc) ->
Item = {Seq, Id, []},
[Item | Acc];
({Seq, #doc{id = Id, deleted = false} = Doc}, Acc) ->
try
{ok, Result} = couch_view_mapreduce:map(Doc),
- Item = {Seq, Id, Result},
+ {Result2, _} = lists:foldr(
+ fun({error, Reason}, {AccRes, Pos}) ->
+ ErrorMsg = "View group `~s`, error mapping document "
+ " `~s` for view `~s`: ~s",
+ Args = [Group#group.name, Id, view_name(Group, Pos),
+ couch_util:to_binary(Reason)],
+ ?LOG_MAPREDUCE_ERROR(ErrorMsg, Args),
+ {[[] | AccRes], Pos - 1};
+ (KVs, {AccRes, Pos}) ->
+ {[KVs | AccRes], Pos - 1}
+ end,
+ {[], ViewCount}, Result),
+ Item = {Seq, Id, Result2},
[Item | Acc]
catch _:{error, Reason} ->
ErrorMsg = "View group `~s`, error mapping document `~s`: ~s",
@@ -205,6 +218,17 @@ do_maps(Group, MapQueue, WriteQueue, AccItems, AccItemsSize) ->
end.
+view_name(#group{views = Views}, ViewPos) ->
+ V = lists:nth(ViewPos, Views),
+ case V#view.map_names of
+ [] ->
+ [{Name, _} | _] = V#view.reduce_funs;
+ [Name | _] ->
+ ok
+ end,
+ Name.
+
+
do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs, Acc) ->
case couch_work_queue:dequeue(WriteQueue) of
closed ->
@@ -29,7 +29,8 @@ mapreduce_cxx_srcs = \
mapreduce_cxx_hdrs = \
erl_nif_compat.h \
- mapreduce.h
+ mapreduce.h \
+ nif_stl_allocator.h
endif
mapreduce_file_collection = \
mapreduce.app.in \
Oops, something went wrong.

0 comments on commit a5e20cf

Please sign in to comment.