Permalink
Browse files

Async subscribe

  • Loading branch information...
1 parent 823280a commit cbed13a6191495d181bb0c7cd7fd81733da01c40 @doubleyou committed Oct 13, 2012
Showing with 13 additions and 8 deletions.
  1. +13 −8 src/dps_channel.erl
View
@@ -152,16 +152,12 @@ handle_call({publish, Msg, TS}, {Pid, _}, State = #state{messages = Msgs, limit
?MODULE:replicate(Replicator, LastTS, TS, Msg, Limit),
{reply, ok, State#state{messages = Messages, last_ts = LastTS}};
-handle_call({subscribe, Pid, TS}, _From, State = #state{messages = Messages, tag = Tag,
- subscribers = Subscribers, last_ts = LastTS}) ->
+handle_call({subscribe, Pid, TS}, From, State = #state{
+ subscribers = Subscribers}) ->
Ref = erlang:monitor(process, Pid),
- Msgs = messages_newer(Messages, TS),
- case Msgs of
- [] -> ok;
- _ -> Pid ! {dps_msg, Tag, LastTS, Msgs};
- end,
+ spawn(fun() -> async_messages(Pid, TS, From, State) end),
NewState = State#state{subscribers = [{Pid,Ref} | Subscribers]},
- {reply, length(Msgs), NewState};
+ {noreply, NewState};
handle_call({unsubscribe, Pid}, _From, State = #state{subscribers = Subscribers}) ->
{Delete, Remain} = lists:partition(fun({P,_Ref}) -> P == Pid end, Subscribers),
@@ -181,6 +177,15 @@ handle_call(_Msg, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+async_messages(Pid, TS, From, #state{messages = Messages, tag = Tag,
+ last_ts = LastTS}) ->
+ Msgs = messages_newer(Messages, TS),
+ case Msgs of
+ [] -> ok;
+ _ -> Pid ! {dps_msg, Tag, LastTS, Msgs}
+ end,
+ gen_server:reply(From, length(Msgs)).
+
distribute_message(Message, Subscribers, Pid) ->
[Sub ! Message || {Sub, _Ref} <- Subscribers, Sub =/= Pid].

0 comments on commit cbed13a

Please sign in to comment.