Skip to content
Permalink
Browse files

Webhook event actions

  • Loading branch information...
terry-xiaoyu committed Apr 18, 2019
1 parent 20356ec commit 8367e02f5ccafc7df9600c258348461a67c171bd
Showing with 91 additions and 19 deletions.
  1. +91 −17 src/emqx_web_hook_actions.erl
  2. +0 −2 src/emqx_web_hook_app.erl
@@ -17,53 +17,86 @@

-include_lib("emqx/include/emqx.hrl").

-resource_type(#{name => 'web_hook',
-define(RESOURCE_TYPE_WEBHOOK, 'web_hook').
-define(RESOURCE_CONFIG_SPEC, #{url => string, headers => json,
method => ['GET','PUT','POST','DELETE']}).

-define(JSON_REQ(URL, HEADERS, BODY), {(URL), (HEADERS), "application/json", (BODY)}).

-resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK,
schema => "emqx_web_hook.url",
create => on_resource_create,
params => ?RESOURCE_CONFIG_SPEC,
description => "WebHook Resource"
}).

-rule_action(#{name => forward_action,
-rule_action(#{name => publish_action,
for => 'message.publish',
func => forward_action,
params => #{url => string, '$resource' => web_hook},
description => "Forward a MQTT message"
func => forward_publish_action,
params => #{'$resource' => ?RESOURCE_TYPE_WEBHOOK},
type => ?RESOURCE_TYPE_WEBHOOK,
description => "Forward MQTT messages to Web Server"
}).

-rule_action(#{name => event_action,
for => any,
func => forward_event_action,
params => #{'$resource' => ?RESOURCE_TYPE_WEBHOOK,
template => json},
type => ?RESOURCE_TYPE_WEBHOOK,
description => "Forward Events to Web Server"
}).

-type(action_fun() :: fun((Data :: map()) -> Result :: any())).
-type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())).

-export_type([action_fun/0]).

-export([on_resource_create/2]).
-export([forward_action/1]).

-export([ forward_publish_action/1
, forward_event_action/1
]).

-export([feed_template/2]).

%%------------------------------------------------------------------------------
%% Actions for web hook
%%------------------------------------------------------------------------------

-spec(on_resource_create(binary(), map()) -> map()).
on_resource_create(_Name, Conf) ->
validate_resource_config(Conf, ?RESOURCE_CONFIG_SPEC),
Conf.

%% An action that forwards messages to a remote web server.
-spec(forward_action(#{url := string()}) -> action_fun()).
forward_action(_Args = #{url := Url}) ->
fun(Data = #{}) ->
http_request(Url, Data)
%% An action that forwards publish messages to a remote web server.
-spec(forward_publish_action(#{url := string()}) -> action_fun()).
forward_publish_action(Params) ->
#{url := Url, headers := Headers, method := Method}
= parse_action_params(Params),
fun(Selected, _Envs) ->
http_request(Url, Headers, Method, Selected)
end.

%% An action that forwards events to a remote web server.
-spec(forward_event_action(#{url := string()}) -> action_fun()).
forward_event_action(Params) ->
#{url := Url, headers := Headers, method := Method, template := Template}
= parse_action_params(Params),
fun(_Selected, Envs) ->
http_request(Url, Headers, Method, feed_template(Template, Envs))
end.

%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------

http_request(Url, Params) ->
logger:debug("[WebHook] HTTP request URL: ~s, params: ~p", [Url, Params]),
case http_request(post, {str(Url), [], "application/json", jsx:encode(Params)},
http_request(Url, Headers, Method, Params) ->
logger:debug("[WebHook Action] ~s to ~s, headers: ~s, body: ~p", [Method, Url, Headers, Params]),
case http_request(Method, ?JSON_REQ(Url, Headers, jsx:encode(Params)),
[{timeout, 5000}], [], 0) of
{ok, _} -> ok;
{error, Reason} ->
logger:error("[WebHook] HTTP request error: ~p", [Reason]),
ok %% TODO: return ok?
logger:error("[WebHook Action] HTTP request error: ~p", [Reason])
end.

http_request(Method, Req, HTTPOpts, Opts, Times) ->
@@ -75,5 +108,46 @@ http_request(Method, Req, HTTPOpts, Opts, Times) ->
Other -> Other
end.

validate_resource_config(_Config, _ConfigSepc) ->
%% erlang:error(invaild_config)
ok.

parse_action_params(Params = #{url := Url}) ->
#{url => str(Url),
headers => headers(maps:get(headers, Params, undefined)),
method => method(maps:get(method, Params, <<"POST">>)),
template => maps:get(template, Params, undefined)}.

method(GET) when GET == <<"GET">>; GET == <<"get">> -> get;
method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete.

headers(undefined) -> [];
headers(Headers) ->
[{str(K), str(V)} || {K, V} <- Headers].

feed_template(undefined, Envs) ->
maps:with([event, client_id, username], Envs);
feed_template(Template, Envs) when is_list(Template) ->
lists:foldr(
fun({K, V}, Acc) ->
[{K, feed_template(V, Envs)} | Acc];
(V, Acc) ->
[feed_template(V, Envs) | Acc]
end, [], Template);
feed_template(<<"${", Bin/binary>>, Envs) ->
Val = binary:part(Bin, {0, byte_size(Bin)-1}),
feed_val(Val, Envs);
feed_template(Bin, _Envs) ->
Bin.

feed_val(Val, Envs) ->
try V = binary_to_existing_atom(Val, utf8),
maps:get(V, Envs, null)
catch error:badarg ->
null
end.

str(Str) when is_list(Str) -> Str;
str(Bin) when is_binary(Bin) -> binary_to_list(Bin).
@@ -26,8 +26,6 @@

start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_web_hook_sup:start_link(),
%% Contribute resource types and actions
emqx_rule_engine:register_provider(?APP),
emqx_web_hook:load(),
emqx_web_hook_cfg:register(),
{ok, Sup}.

0 comments on commit 8367e02

Please sign in to comment.
You can’t perform that action at this time.