Skip to content

Commit

Permalink
Merge pull request #455 from basho/feature/csm/openx-deliverables
Browse files Browse the repository at this point in the history
Remove undeliverables from the real time queue.
  • Loading branch information
cmeiklejohn committed Dec 4, 2013
2 parents 228a4db + adfc49a commit 67486c1
Showing 1 changed file with 20 additions and 45 deletions.
65 changes: 20 additions & 45 deletions src/riak_repl2_rtq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
overload_drops = 0 :: non_neg_integer(),

cs = [],
undeliverables = [],
shutting_down=false,
qsize_bytes = 0,
word_size=erlang:system_info(wordsize)
Expand Down Expand Up @@ -384,9 +383,7 @@ ack_seq(Name, Seq, State = #state{qtab = QTab, qseq = QSeq, cs = Cs}) ->
end, {[], QSeq}, Cs),
%% Remove any entries from the ETS table before MinSeq
NewState = cleanup(QTab, MinSeq, State),
{ShrinkBy, Undeliverables} = clear_non_deliverables(QTab, UpdCs, State#state.word_size),
Undeliverables2 = union_undeliverables(NewState#state.undeliverables, Undeliverables, MinSeq),
NewState#state{cs = UpdCs, undeliverables = Undeliverables2, qsize_bytes = NewState#state.qsize_bytes - ShrinkBy}.
NewState#state{cs = UpdCs}.

%% @private
handle_info(_Msg, State) ->
Expand Down Expand Up @@ -456,10 +453,7 @@ unregister_q(Name, State = #state{qtab = QTab, cs = Cs}) ->
lists:min([Seq || #c{aseq = Seq} <- Cs2])
end,
NewState0 = cleanup(QTab, MinSeq, State),
NewState = NewState0#state{cs = Cs2},
{ShrinkBy, Undeliverables} = clear_non_deliverables(QTab, Cs2, NewState#state.word_size),
Undeliverables2 = union_undeliverables(State#state.undeliverables, Undeliverables, 0),
{ok, NewState#state{undeliverables = Undeliverables2, qsize_bytes = NewState#state.qsize_bytes - ShrinkBy}};
{ok, NewState0#state{cs = Cs2}};
false ->
{{error, not_registered}, State}
end.
Expand Down Expand Up @@ -494,28 +488,28 @@ pull(Name, DeliverFun, State = #state{qtab = QTab, qseq = QSeq, cs = Cs}) ->
CsNames = [Consumer#c.name || Consumer <- Cs],
UpdCs = case lists:keytake(Name, #c.name, Cs) of
{value, C, Cs2} ->
[maybe_pull(QTab, QSeq, C, CsNames, DeliverFun, State#state.undeliverables) | Cs2];
[maybe_pull(QTab, QSeq, C, CsNames, DeliverFun) | Cs2];
false ->
lager:info("not_registered"),
DeliverFun({error, not_registered})
end,
State#state{cs = UpdCs}.

maybe_pull(QTab, QSeq, C = #c{cseq = CSeq}, CsNames, DeliverFun, Undeliverables) ->
maybe_pull(QTab, QSeq, C = #c{cseq = CSeq}, CsNames, DeliverFun) ->
CSeq2 = CSeq + 1,
case CSeq2 =< QSeq of
true -> % something reday
case ets:lookup(QTab, CSeq2) of
[] -> % entry removed, due to previously being unroutable
C2 = C#c{skips = C#c.skips + 1, cseq = CSeq2},
maybe_pull(QTab, QSeq, C2, CsNames, DeliverFun, Undeliverables);
maybe_pull(QTab, QSeq, C2, CsNames, DeliverFun);
[QEntry] ->
QEntry2 = set_local_forwards_meta(CsNames, QEntry),
% if the item can't be delivered due to cascading rt,
% just keep trying.
case maybe_deliver_item(C#c{deliver = DeliverFun}, QEntry2) of
{skipped, C2} ->
maybe_pull(QTab, QSeq, C2, CsNames, DeliverFun, Undeliverables);
maybe_pull(QTab, QSeq, C2, CsNames, DeliverFun);
{_WorkedOrNoFun, C2} ->
C2
end
Expand Down Expand Up @@ -613,35 +607,6 @@ ets_obj_size(Obj, _) ->
update_q_size(State = #state{qsize_bytes = CurrentQSize}, Diff) ->
State#state{qsize_bytes = CurrentQSize + Diff}.

clear_non_deliverables(QTab, ActiveConsumers, WordSize) ->
Accumulator = fun(QEntry, Acc) ->
{Seq, _, _, Meta} = QEntry,
Routed = case orddict:find(routed_clusters, Meta) of
error -> [];
{ok, V} -> V
end,
RoutableActives = [AC || AC <- ActiveConsumers, AC#c.aseq < Seq, not lists:member(AC#c.name, Routed)],
if
RoutableActives == [] ->
[Seq | Acc];
true ->
Acc
end
end,
ToDelete = ets:foldl(Accumulator, [], QTab),
DeleteFun = fun(Key, Acc) ->
[{Key, _NumItems, Bin, _Meta}] = ets:lookup(QTab, Key),
Size = ets_obj_size(Bin, WordSize),
ets:delete(QTab, Key),
Acc + Size
end,
Shrink = lists:foldl(DeleteFun, 0, ToDelete),
{Shrink, ToDelete}.

union_undeliverables(SeqSet1, SeqSet2, MinSeq) ->
Undeliverables = ordsets:union(SeqSet1, SeqSet2),
lists:filter(fun(E) -> E >= MinSeq end, Undeliverables).

%% Trim the queue if necessary
trim_q(State = #state{max_bytes = undefined}) ->
State;
Expand Down Expand Up @@ -674,11 +639,21 @@ trim_q(State = #state{qtab = QTab, qseq = QSeq, max_bytes = MaxBytes}) ->
end.

trim_q_entries(QTab, MaxBytes, Cs, State) ->
{Cs2, State2, Entries, Objects} = trim_q_entries(QTab, MaxBytes, Cs, State, 0, 0),
if
Entries + Objects > 0 ->
lager:notice("Dropped ~p objects in ~p entries due to reaching maximum queue size of ~p bytes", [Objects, Entries, MaxBytes]);
true ->
ok
end,
{Cs2, State2}.

trim_q_entries(QTab, MaxBytes, Cs, State, Entries, Objects) ->
case ets:first(QTab) of
'$end_of_table' ->
{Cs, State};
{Cs, State, Entries, Objects};
TrimSeq ->
[{_, _, Bin, _Meta}] = ets:lookup(QTab, TrimSeq),
[{_, NumObjects, Bin, _Meta}] = ets:lookup(QTab, TrimSeq),
ShrinkSize = ets_obj_size(Bin, State),
NewState = update_q_size(State, -ShrinkSize),
ets:delete(QTab, TrimSeq),
Expand All @@ -693,9 +668,9 @@ trim_q_entries(QTab, MaxBytes, Cs, State) ->
%% Rinse and repeat until meet the target or the queue is empty
case qbytes(QTab, NewState) > MaxBytes of
true ->
trim_q_entries(QTab, MaxBytes, Cs2, NewState);
trim_q_entries(QTab, MaxBytes, Cs2, NewState, Entries + 1, Objects + NumObjects);
_ ->
{Cs2, NewState}
{Cs2, NewState, Entries + 1, Objects + NumObjects}
end
end.

Expand Down

0 comments on commit 67486c1

Please sign in to comment.