From fa49f0f0c664f9661878937f45f5d813e4b7891a Mon Sep 17 00:00:00 2001 From: "David N. Welton" Date: Fri, 4 Jun 2010 17:44:51 +0200 Subject: [PATCH] Retool to use gen_event instead of our own callback. I don't know if it's really an improvement, but hey, it was kind of fun to do, and this way we can have multiple callbacks fairly easily. --- src/twitter_stream.erl | 53 ++++++++++++++++++++++++++++---------- src/twitter_stream_sup.erl | 4 +-- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/src/twitter_stream.erl b/src/twitter_stream.erl index 30e6113..dd3f106 100644 --- a/src/twitter_stream.erl +++ b/src/twitter_stream.erl @@ -33,8 +33,8 @@ %% API --export([start_link/2]). --export([fetch/2]). +-export([start_link/1]). +-export([fetch/2, add_handler/1, add_handler/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -55,8 +55,18 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- -start_link(Url, Callback) -> - gen_server:start_link(?SERVER, ?MODULE, [Url, Callback], []). +start_link(Url) -> + gen_server:start_link(?SERVER, ?MODULE, [Url], []). + +%% Add a handler to receive callbacks. + +add_handler(Handler) -> + gen_server:call(?MODULE, {add_handler, Handler}). + +%% Add a handler with its own eventmanager. + +add_handler(EventManager, Handler) -> + gen_server:call(?MODULE, {add_handler, EventManager, Handler}). %%==================================================================== @@ -70,10 +80,10 @@ start_link(Url, Callback) -> %% {stop, Reason} %% Description: Initiates the server %%-------------------------------------------------------------------- -init([Url, Callback]) -> +init([Url]) -> {ok, EventMgr} = gen_event:start_link(), gen_server:cast(?MODULE, {fetch, Url, 1}), - {ok, #state{eventmgr = EventMgr, callback = Callback, sleep = 1, url = Url}}. + {ok, #state{eventmgr = [EventMgr], sleep = 1, url = Url}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -85,6 +95,14 @@ init([Url, Callback]) -> %% Description: Handling call messages %%-------------------------------------------------------------------- +handle_call({add_handler, EventManager, _Handler}, _From, State) -> + EventManList = State#state.eventmgr ++ [EventManager], + {reply, ok, State#state{eventmgr = EventManList}}; + +handle_call({add_handler, Handler}, _From, State) -> + ok = gen_event:add_handler(State#state.eventmgr, Handler, []), + {reply, ok, State}; + handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -136,10 +154,13 @@ handle_info({http, Rest}, State) -> %% Ignore it if it's empty. {_RequestId, stream, Data} when Data == <<"\r\n">> -> ok; + + %% This is where we actually do something! {_RequestId, stream, Data} -> - %% Call the gen_event stuff... - {M, F, A} = State#state.callback, - spawn(M, F, [A ++ [fill_status_rec(mochijson2:decode(Data))]]); + Tweet = fill_status_rec(mochijson2:decode(Data)), + notify_all_managers(State, {tweet, Tweet}); +% {M, F, A} = State#state.callback, +% spawn(M, F, [A ++ [Tweet]]); %% end of streaming data - we just restart in this case, %% because we want a never-ending stream. {_RequestId, stream_end, Headers} -> @@ -169,11 +190,6 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - - %% 3 arg version expects url of the form http://user:password@stream.twitter.com/1/statuses/sample.json %% retry - number of times the stream is reconnected %% sleep - secs to sleep between retries. @@ -244,3 +260,12 @@ fill_status_rec(Tweet) -> user = fill_user_rec(element(2, lists:keyfind(<<"user">>, 1, Data))) }, Status. + +%% Cycle through the list of event managers and send the message to +%% all of them. + +notify_all_managers(State, Message) -> + lists:foreach(fun(Manager) -> + gen_event:notify(Manager, Message) + end, + State#state.eventmgr). diff --git a/src/twitter_stream_sup.erl b/src/twitter_stream_sup.erl index 090dad7..974c631 100644 --- a/src/twitter_stream_sup.erl +++ b/src/twitter_stream_sup.erl @@ -8,8 +8,8 @@ %% returns the Spec for the supervisor (called by the %% supervisor:start_link above) -init([Url, Callback]) -> +init([Url]) -> Spec = [{twitter_stream, - {twitter_stream, start_link, [Url, Callback]}, % this gets run via apply(M, F, A) + {twitter_stream, start_link, [Url]}, % this gets run via apply(M, F, A) transient, 2000, worker, [twitter_stream]}], {ok, {{one_for_one, 10, 60}, Spec}}.