Permalink
Browse files

Merge pull request #40 from cloudant/13421-changes-decays2

Use a separate mailbox for 'db_updated' messages

BugzID: 13421
  • Loading branch information...
kocolosk committed Jun 14, 2012
2 parents 9eac8f3 + f598244 commit ab8d4f19e4d7853e408c39e810d532825337cfe7
Showing with 124 additions and 31 deletions.
  1. +108 −0 src/fabric_db_update_listener.erl
  2. +16 −31 src/fabric_view_changes.erl
@@ -0,0 +1,108 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_update_listener).
+
+-export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(Parent, ParentRef, DbName, Timeout) ->
+ Notifiers = start_update_notifiers(DbName),
+ MonRefs = lists:usort([{rexi_server, Node} || {Node, _Ref} <- Notifiers]),
+ RexiMon = rexi_monitor:start(MonRefs),
+ %% Add calling controller node as rexi end point as this controller will
+ %% receive messages from it
+ Workers = [{Parent, ParentRef} | Notifiers],
+ try
+ receive_results(Workers, {Workers, Parent, unset}, Timeout)
+ after
+ rexi_monitor:stop(RexiMon),
+ stop_update_notifiers(Notifiers)
+ end.
+
+start_update_notifiers(DbName) ->
+ lists:map(fun(#shard{node=Node, name=Name}) ->
+ {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
+ end, mem3:shards(DbName)).
+
+% rexi endpoint
+start_update_notifier(DbName) ->
+ {Caller, Ref} = get(rexi_from),
+ Fun = fun({_, X}) when X == DbName ->
+ erlang:send(Caller, {Ref, db_updated}); (_) -> ok end,
+ Id = {couch_db_update_notifier, make_ref()},
+ ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
+ receive {gen_event_EXIT, Id, Reason} ->
+ rexi:reply({gen_event_EXIT, DbName, Reason})
+ end.
+
+stop_update_notifiers(Notifiers) ->
+ [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
+
+stop({Pid, Ref}) ->
+ erlang:send(Pid, {Ref, done}).
+
+wait_db_updated({Pid, Ref}) ->
+ erlang:send(Pid, {Ref, get_state}),
+ receive
+ {state, Pid, State} -> State
+ end.
+
+receive_results(Workers, State, Timeout) ->
+ case rexi_utils:recv(Workers, 2, fun handle_message/3, State,
+ infinity, Timeout) of
+ {timeout, {NewWorkers, Parent, waiting}} ->
+ erlang:send(Parent, {state, self(), timeout}),
+ receive_results(NewWorkers, {NewWorkers, Parent, unset}, Timeout);
+ {timeout, {NewWorkers, Parent, _State}} ->
+ receive_results(NewWorkers, {NewWorkers, Parent, timeout}, Timeout);
+ {_, NewState} ->
+ {ok, NewState}
+ end.
+
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, Parent, State}) ->
+ NewWorkers = lists:filter(fun({_Node, Ref}) -> NodeRef =/= Ref end, Workers),
+ case NewWorkers of
+ [] ->
+ {error, {nodedown, <<"progress not possible">>}};
+ _ ->
+ {ok, {NewWorkers, Parent, State}}
+ end;
+handle_message({rexi_EXIT, Reason}, Worker, {Workers, Parent, State}) ->
+ NewWorkers = lists:delete(Worker,Workers),
+ case NewWorkers of
+ [] ->
+ {error, Reason};
+ _ ->
+ {ok, {NewWorkers, Parent, State}}
+ end;
+handle_message(db_updated, {_Worker, _From}, {Workers, Parent, waiting}) ->
+ % propagate message to calling controller
+ erlang:send(Parent, {state, self(), updated}),
+ {ok, {Workers, Parent, unset}};
+handle_message(db_updated, _Worker, {Workers, Parent, _State}) ->
+ {ok, {Workers, Parent, updated}};
+handle_message(get_state, {_Worker, _From}, {Workers, Parent, unset}) ->
+ {ok, {Workers, Parent, waiting}};
+handle_message(get_state, {_Worker, _From}, {Workers, Parent, State}) ->
+ erlang:send(Parent, {state, self(), State}),
+ {ok, {Workers, Parent, unset}};
+handle_message(done, _, _) ->
+ {stop, ok}.
+
+
+
@@ -1,4 +1,4 @@
-% Copyright 2010 Cloudant
+% Copyright 2012 Cloudant
%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
@@ -14,34 +14,40 @@
-module(fabric_view_changes).
--export([go/5, start_update_notifier/1, pack_seqs/1]).
+-export([go/5, pack_seqs/1]).
-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("eunit/include/eunit.hrl").
+-import(fabric_db_update_listener, [wait_db_updated/1]).
+
go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
Feed == "longpoll" ->
Args = make_changes_args(Options),
Since = get_start_seq(DbName, Args),
case validate_start_seq(DbName, Since) of
ok ->
{ok, Acc} = Callback(start, Acc0),
- Notifiers = start_update_notifiers(DbName),
{Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
+ Ref = make_ref(),
+ Parent = self(),
+ UpdateListener = {spawn_link(fabric_db_update_listener, go,
+ [Parent, Ref, DbName, Timeout]),
+ Ref},
try
keep_sending_changes(
DbName,
Args,
Callback,
Since,
Acc,
- Timeout
+ Timeout,
+ UpdateListener
)
after
- stop_update_notifiers(Notifiers),
- couch_changes:get_rest_db_updated()
+ fabric_db_update_listener:stop(UpdateListener)
end;
Error ->
Callback(Error, Acc0)
@@ -66,15 +72,15 @@ go(DbName, "normal", Options, Callback, Acc0) ->
Callback(Error, Acc0)
end.
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) ->
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen) ->
#changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
{ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
#collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
LastSeq = pack_seqs(NewSeqs),
if Limit > Limit2, Feed == "longpoll" ->
Callback({stop, LastSeq}, AccOut);
true ->
- case {Heartbeat, wait_db_updated(Timeout)} of
+ case {Heartbeat, wait_db_updated(UpListen)} of
{undefined, timeout} ->
Callback({stop, LastSeq}, AccOut);
_ ->
@@ -85,7 +91,8 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) ->
Callback,
LastSeq,
AccTimeout,
- Timeout
+ Timeout,
+ UpListen
)
end
end.
@@ -273,24 +280,6 @@ do_unpack_seqs(Opaque, DbName) ->
end
end, binary_to_term(couch_util:decodeBase64Url(Opaque))).
-start_update_notifiers(DbName) ->
- lists:map(fun(#shard{node=Node, name=Name}) ->
- {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
- end, mem3:shards(DbName)).
-
-% rexi endpoint
-start_update_notifier(DbName) ->
- {Caller, _} = get(rexi_from),
- Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end,
- Id = {couch_db_update_notifier, make_ref()},
- ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
- receive {gen_event_EXIT, Id, Reason} ->
- rexi:reply({gen_event_EXIT, DbName, Reason})
- end.
-
-stop_update_notifiers(Notifiers) ->
- [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
-
changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) ->
{change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}};
changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) ->
@@ -306,10 +295,6 @@ find_replacement_shards(#shard{range=Range}, AllShards) ->
% TODO make this moar betta -- we might have split or merged the partition
[Shard || Shard <- AllShards, Shard#shard.range =:= Range].
-wait_db_updated(Timeout) ->
- receive db_updated -> couch_changes:get_rest_db_updated()
- after Timeout -> timeout end.
-
validate_start_seq(DbName, Seq) ->
try unpack_seqs(Seq, DbName) of _Any ->
ok

0 comments on commit ab8d4f1

Please sign in to comment.