Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/2.0.2'
Browse files Browse the repository at this point in the history
* origin/2.0.2:
  MB-7882 No id btree lookups for dynamically added partitions
  MB-7873 Ensure blocked indexer tasks die if updater dies
  More precise compaction time calculation
  Simplify max seq numbers calculation
  Remove no longer used checkpoint logic

Change-Id: I16956c85c3f1aff39210660cc65ec11ca0817bb3
  • Loading branch information
fdmanana committed Mar 8, 2013
2 parents 919cd68 + 37acc04 commit 94aabf4
Showing 1 changed file with 16 additions and 47 deletions.
63 changes: 16 additions & 47 deletions src/couch_set_view/src/couch_set_view_updater.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
% initial index build
-define(MAX_SORT_BUFFER_SIZE, 1048576).

% incremental updates
-define(CHECKPOINT_WRITE_INTERVAL, 60000000).

% Same as in couch_btree.erl
-define(KEY_BITS, 12).
-define(MAX_KEY_SIZE, ((1 bsl ?KEY_BITS) - 1)).
Expand All @@ -52,7 +49,7 @@
tmp_dir = nil,
sort_files = nil,
sort_file_workers = [],
new_partitions = [],
initial_seqs,
write_queue_size,
max_tmp_files,
max_insert_batch_size,
Expand Down Expand Up @@ -191,7 +188,6 @@ update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
end),

Parent = self(),
unlink(BarrierEntryPid),
Writer = spawn_link(fun() ->
DDocIds = couch_set_view_util:get_ddoc_ids_with_sig(SetName, Group),
couch_task_status:add_task([
Expand All @@ -216,7 +212,7 @@ update(WriterAcc, ActiveParts, PassiveParts, BlockedTime,
view_empty_kvs = ViewEmptyKVs,
compactor_running = CompactorRunning,
write_queue_size = couch_util:get_value(max_size, WriteQueueOptions),
new_partitions = [P || {P, Seq} <- ?set_seqs(Group), Seq == 0]
initial_seqs = ?set_seqs(Group)
}),
TmpDir = WriterAcc2#writer_acc.tmp_dir,
case CompactorRunning of
Expand Down Expand Up @@ -583,33 +579,23 @@ flush_writes(#writer_acc{initial_build = false} = Acc0) ->
owner = Owner,
last_seqs = LastSeqs
} = Acc0,
{ViewKVs, DocIdViewIdKeys, NewPartIdSeqs} =
process_map_results(Kvs, ViewEmptyKVs, orddict:new()),
NewLastSeqs = orddict:merge(
fun(_, S1, S2) -> erlang:max(S1, S2) end,
LastSeqs,
NewPartIdSeqs),
{ViewKVs, DocIdViewIdKeys, NewLastSeqs} =
process_map_results(Kvs, ViewEmptyKVs, LastSeqs),
Acc1 = Acc0#writer_acc{last_seqs = NewLastSeqs},
Acc = write_to_tmp_batch_files(ViewKVs, DocIdViewIdKeys, Acc1),
#writer_acc{group = NewGroup} = Acc,
case ?set_seqs(NewGroup) =/= ?set_seqs(Group) of
true ->
Acc2 = checkpoint(Acc),
case (Acc#writer_acc.state =:= updating_active) andalso
lists:any(fun({PartId, _}) ->
((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
end, NewLastSeqs) of
lists:any(fun({PartId, _}) ->
((1 bsl PartId) band ?set_pbitmask(Group) =/= 0)
end, NewLastSeqs) of
true ->
Acc2 = checkpoint(Acc),
notify_owner(Owner, {state, updating_passive}, Parent),
Acc2#writer_acc{state = updating_passive};
false ->
case Acc#writer_acc.final_batch orelse
(?set_cbitmask(NewGroup) /= ?set_cbitmask(Group)) of
true ->
checkpoint(Acc);
false ->
maybe_checkpoint(Acc)
end
Acc2
end;
false ->
Acc
Expand Down Expand Up @@ -897,22 +883,21 @@ view_insert_doc_query_results(DocId, PartitionId, [ResultKVs | RestResults],
write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
#writer_acc{
sort_files = SortFiles,
group = #set_view_group{id_btree = IdBtree} = Group,
new_partitions = NewParts
group = #set_view_group{id_btree = IdBtree} = Group
} = WriterAcc,

{AddDocIdViewIdKeys0, RemoveDocIds, LookupDocIds} = lists:foldr(
fun({DocId, {PartId, [] = _ViewIdKeys}}, {A, B, C}) ->
BackKey = make_back_index_key(DocId, PartId),
case lists:member(PartId, NewParts) of
case is_new_partition(PartId, WriterAcc) of
true ->
{A, [BackKey | B], C};
false ->
{A, [BackKey | B], [BackKey | C]}
end;
({DocId, {PartId, _ViewIdKeys}} = KvPairs, {A, B, C}) ->
BackKey = make_back_index_key(DocId, PartId),
case lists:member(PartId, NewParts) of
case is_new_partition(PartId, WriterAcc) of
true ->
{[KvPairs | A], B, C};
false ->
Expand Down Expand Up @@ -1003,6 +988,10 @@ write_to_tmp_batch_files(ViewKeyValuesToAdd, DocIdViewIdKeys, WriterAcc) ->
maybe_update_btrees(WriterAcc2).


is_new_partition(PartId, #writer_acc{initial_seqs = InitialSeqs}) ->
couch_util:get_value(PartId, InitialSeqs, 0) == 0.


maybe_update_btrees(WriterAcc0) ->
#writer_acc{
view_empty_kvs = ViewEmptyKVs,
Expand Down Expand Up @@ -1294,23 +1283,6 @@ update_task(NumChanges) ->
]).


maybe_checkpoint(WriterAcc) ->
Before = get(last_header_commit_ts),
Now = os:timestamp(),
case (Before == undefined) orelse
(timer:now_diff(Now, Before) >= ?CHECKPOINT_WRITE_INTERVAL) of
true ->
NewWriterAcc = checkpoint(WriterAcc),
put(last_header_commit_ts, Now),
NewWriterAcc;
false ->
NewWriterAcc = maybe_fix_group(WriterAcc),
#writer_acc{owner = Owner, parent = Parent, group = Group} = NewWriterAcc,
Owner ! {partial_update, Parent, Group},
NewWriterAcc
end.


checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
#set_view_group{
set_name = SetName,
Expand All @@ -1324,9 +1296,6 @@ checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group} = Acc) ->
Acc#writer_acc{group = NewGroup}.


maybe_fix_group(#writer_acc{group = Group} = Acc) ->
NewGroup = maybe_fix_group(Group),
Acc#writer_acc{group = NewGroup};
maybe_fix_group(#set_view_group{index_header = Header} = Group) ->
receive
{new_passive_partitions, Parts} ->
Expand Down

0 comments on commit 94aabf4

Please sign in to comment.