Skip to content

Commit

Permalink
Retool to use gen_event instead of our own callback.
Browse files Browse the repository at this point in the history
I don't know if it's really an improvement, but hey, it was kind of
fun to do, and this way we can have multiple callbacks fairly easily.
  • Loading branch information
davidw committed Jun 4, 2010
1 parent 5d5fb1c commit fa49f0f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
53 changes: 39 additions & 14 deletions src/twitter_stream.erl
Expand Up @@ -33,8 +33,8 @@


%% API
-export([start_link/2]).
-export([fetch/2]).
-export([start_link/1]).
-export([fetch/2, add_handler/1, add_handler/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand All @@ -55,8 +55,18 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Url, Callback) ->
gen_server:start_link(?SERVER, ?MODULE, [Url, Callback], []).
start_link(Url) ->
gen_server:start_link(?SERVER, ?MODULE, [Url], []).

%% Add a handler to receive callbacks.

add_handler(Handler) ->
gen_server:call(?MODULE, {add_handler, Handler}).

%% Add a handler with its own eventmanager.

add_handler(EventManager, Handler) ->
gen_server:call(?MODULE, {add_handler, EventManager, Handler}).


%%====================================================================
Expand All @@ -70,10 +80,10 @@ start_link(Url, Callback) ->
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([Url, Callback]) ->
init([Url]) ->
{ok, EventMgr} = gen_event:start_link(),
gen_server:cast(?MODULE, {fetch, Url, 1}),
{ok, #state{eventmgr = EventMgr, callback = Callback, sleep = 1, url = Url}}.
{ok, #state{eventmgr = [EventMgr], sleep = 1, url = Url}}.

%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
Expand All @@ -85,6 +95,14 @@ init([Url, Callback]) ->
%% Description: Handling call messages
%%--------------------------------------------------------------------

handle_call({add_handler, EventManager, _Handler}, _From, State) ->
EventManList = State#state.eventmgr ++ [EventManager],
{reply, ok, State#state{eventmgr = EventManList}};

handle_call({add_handler, Handler}, _From, State) ->
ok = gen_event:add_handler(State#state.eventmgr, Handler, []),
{reply, ok, State};

handle_call(_Request, _From, State) ->
{reply, ok, State}.

Expand Down Expand Up @@ -136,10 +154,13 @@ handle_info({http, Rest}, State) ->
%% Ignore it if it's empty.
{_RequestId, stream, Data} when Data == <<"\r\n">> ->
ok;

%% This is where we actually do something!
{_RequestId, stream, Data} ->
%% Call the gen_event stuff...
{M, F, A} = State#state.callback,
spawn(M, F, [A ++ [fill_status_rec(mochijson2:decode(Data))]]);
Tweet = fill_status_rec(mochijson2:decode(Data)),
notify_all_managers(State, {tweet, Tweet});
% {M, F, A} = State#state.callback,
% spawn(M, F, [A ++ [Tweet]]);
%% end of streaming data - we just restart in this case,
%% because we want a never-ending stream.
{_RequestId, stream_end, Headers} ->
Expand Down Expand Up @@ -169,11 +190,6 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------


%% 3 arg version expects url of the form http://user:password@stream.twitter.com/1/statuses/sample.json
%% retry - number of times the stream is reconnected
%% sleep - secs to sleep between retries.
Expand Down Expand Up @@ -244,3 +260,12 @@ fill_status_rec(Tweet) ->
user = fill_user_rec(element(2, lists:keyfind(<<"user">>, 1, Data)))
},
Status.

%% Cycle through the list of event managers and send the message to
%% all of them.

notify_all_managers(State, Message) ->
lists:foreach(fun(Manager) ->
gen_event:notify(Manager, Message)
end,
State#state.eventmgr).
4 changes: 2 additions & 2 deletions src/twitter_stream_sup.erl
Expand Up @@ -8,8 +8,8 @@

%% returns the Spec for the supervisor (called by the
%% supervisor:start_link above)
init([Url, Callback]) ->
init([Url]) ->
Spec = [{twitter_stream,
{twitter_stream, start_link, [Url, Callback]}, % this gets run via apply(M, F, A)
{twitter_stream, start_link, [Url]}, % this gets run via apply(M, F, A)
transient, 2000, worker, [twitter_stream]}],
{ok, {{one_for_one, 10, 60}, Spec}}.

0 comments on commit fa49f0f

Please sign in to comment.