Skip to content
Browse files

added webhook suppor

  • Loading branch information...
1 parent 09e0320 commit 7fd37be12ab3ddf0c9df297ab2fe2063eb6fad7d @dergraf committed
Showing with 119 additions and 7 deletions.
  1. +1 −0 rebar.config
  2. +23 −7 src/opendata_handler.erl
  3. +2 −0 src/opendata_pubsub_app.erl
  4. +93 −0 src/opendata_webhook_handler.erl
View
1 rebar.config
@@ -2,6 +2,7 @@
{erl_opts, [{parse_transform, lager_transform}]}.
{deps, [
{lager, "1.0.*", {git, "git://github.com/basho/lager.git", "HEAD"}},
+ {ibrowse, "3.0.*", {git, "git://github.com/cmullaparthi/ibrowse.git", "HEAD"}},
{gproc, "0.2.*", {git, "git://github.com/uwiger/gproc.git", "HEAD"}},
{cowboy, "0.5.*", {git, "git://github.com/extend/cowboy", "HEAD"}}
]}.
View
30 src/opendata_handler.erl
@@ -5,7 +5,7 @@
-export([websocket_init/3, websocket_handle/3,
websocket_info/3, websocket_terminate/3]).
--record(state, {service}).
+-record(state, {service, subscriptions=[]}).
init({_Any, http}, Req, []) ->
case cowboy_http_req:header('Upgrade', Req) of
@@ -75,12 +75,28 @@ websocket_init(_Any, Req, []) ->
Req2 = cowboy_http_req:compact(Req),
{ok, Req2, #state{service=Service}, hibernate}.
-websocket_handle({text, <<"subscribe:", Topic/binary>>}, Req, #state{service=Svc} = State) ->
- opendata_pubsub:subscribe(Svc, Topic),
- {reply, {text, <<"ok">>}, Req, State, hibernate};
-websocket_handle({text, <<"unsubscribe:", Topic/binary>>}, Req, #state{service=Svc} = State) ->
- opendata_pubsub:unsubscribe(Svc, Topic),
- {reply, {text, <<"ok">>}, Req, State, hibernate};
+websocket_handle({text, <<"subscribe:", Topic/binary>>}, Req, #state{service=Svc, subscriptions=Subs} = State) ->
+ NewSubs =
+ case lists:member(Topic, Subs) of
+ false ->
+ opendata_pubsub:subscribe(Svc, Topic),
+ catch opendata_webhook_handler:incr_topic_counter(Svc, Topic),
+ Subs ++ [Topic];
+ true ->
+ Subs
+ end,
+ {reply, {text, <<"ok">>}, Req, State#state{subscriptions=NewSubs}, hibernate};
+websocket_handle({text, <<"unsubscribe:", Topic/binary>>}, Req, #state{service=Svc, subscriptions=Subs} = State) ->
+ NewSubs =
+ case lists:member(Topic, Subs) of
+ false ->
+ opendata_pubsub:unsubscribe(Svc, Topic),
+ catch opendata_webhook_handler:decr_topic_counter(Svc, Topic),
+ Subs -- [Topic];
+ true ->
+ Subs
+ end,
+ {reply, {text, <<"ok">>}, Req, State#state{subscriptions=NewSubs}, hibernate};
websocket_handle({text, Msg}, Req, State) ->
{reply, {text, << "You said: ", Msg/binary >>}, Req, State, hibernate};
View
2 src/opendata_pubsub_app.erl
@@ -13,6 +13,7 @@ start() ->
application:start(crypto),
application:start(public_key),
application:start(ssl),
+ application:start(ibrowse),
application:start(cowboy),
application:start(gproc),
application:start(opendata_pubsub).
@@ -21,6 +22,7 @@ start(_StartType, _StartArgs) ->
Dispatch = [
{'_', [
{[<<"pubsub">>, '_'], opendata_handler, []},
+ {[<<"demohook">>], opendata_demohook, []},
{'_', default_handler, []}
]}
],
View
93 src/opendata_webhook_handler.erl
@@ -0,0 +1,93 @@
+-module(opendata_webhook_handler).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_hook/3,
+ stop_hook/1,
+ incr_topic_counter/2,
+ decr_topic_counter/2]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-record(state, {name, hook, topics=ets:new(topics, []), interval}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_hook(HookName, WebHook, Interval) ->
+ gen_server:start({local, list_to_atom(HookName)}, ?MODULE, [HookName, WebHook, Interval], []).
+
+stop_hook(HookName) ->
+ gen_server:call(HookName, stop).
+incr_topic_counter(HookName, Topic) when is_binary(HookName) ->
+ incr_topic_counter(binary_to_list(HookName), Topic);
+incr_topic_counter(HookName, Topic) when is_list(HookName) ->
+ incr_topic_counter(list_to_atom(HookName), Topic);
+incr_topic_counter(HookName, Topic) ->
+ gen_server:cast(HookName, {incr_topic_counter, Topic}).
+
+decr_topic_counter(HookName, Topic) when is_binary(HookName) ->
+ decr_topic_counter(binary_to_list(HookName), Topic);
+decr_topic_counter(HookName, Topic) when is_list(HookName) ->
+ decr_topic_counter(list_to_atom(HookName), Topic);
+decr_topic_counter(HookName, Topic) ->
+ gen_server:cast(HookName, {decr_topic_counter, Topic}).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([HookName, WebHook, Interval]) ->
+ timer:send_interval(Interval, trigger),
+ {ok, #state{name=HookName, hook=WebHook, interval=Interval}}.
+
+handle_call(_Req, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast({incr_topic_counter, Topic}, #state{topics=Topics} = State) ->
+ case catch ets:update_counter(Topics, Topic, 1) of
+ {'EXIT', {badarg, _}} ->
+ ets:insert(Topics, {Topic, 1});
+ _ ->
+ ok
+ end,
+ {noreply, State};
+handle_cast({decr_topic_counter, Topic}, #state{topics=Topics} = State) ->
+ case catch ets:update_counter(Topics, Topic, -1) of
+ {'EXIT', {badarg, _}} ->
+ ok; %% should not happen, but it's ok anyway
+ Val when Val =< 0 ->
+ ets:delete(Topics, Topic);
+ _ ->
+ ok
+ end,
+ {noreply, State}.
+
+handle_info(trigger, #state{name=HookName, hook=Hook, topics=Topics} = State) ->
+ ListOfTopics = lists:flatten(ets:match(Topics, {'$1', '_'})),
+ Url = list_to_binary([Hook, <<"?topics=">>, [[T, <<",">>] || T <- ListOfTopics]]),
+ {ok, _Status, _ResponseHeaders, ResponseBody} = ibrowse:send_req(binary_to_list(Url), [], get),
+ %% TODO: check Response Status
+ %% TODO: parse Response , one message / topic
+ BHook = list_to_binary(HookName),
+ spawn(fun()->
+ [opendata_pubsub:publish(BHook, Topic, ResponseBody)||Topic <- ListOfTopics]
+ end),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================

0 comments on commit 7fd37be

Please sign in to comment.
Something went wrong with that request. Please try again.