Skip to content
Permalink
Browse files

Support cluster

  • Loading branch information...
terry-xiaoyu committed May 30, 2019
1 parent 6bb64e3 commit 3da7fed60d92c9a994c2aed5f34509c0d0d4eff4
@@ -20,61 +20,82 @@
-type(rule_name() :: binary()).

-type(resource_id() :: binary()).
-type(resource_name() :: binary()).
-type(action_instance_id() :: binary()).

-type(action_name() :: atom()).
-type(resource_type_name() :: atom()).

-type(descr() :: #{en := binary(), zh => binary()}).

-type(mf() :: {Module::atom(), Fun::atom()}).

-type(hook() :: atom() | 'any').

-define(descr, #{en => <<>>, zh => <<>>}).

-record(action,
{ name :: action_name()
, for :: hook()
, app :: atom()
, types = [] :: list(resource_type_name())
, module :: module()
, on_create :: mf()
, on_destroy :: maybe(mf())
, params_spec :: #{atom() => term()} %% params specs
, title = ?descr :: descr()
, description = ?descr :: descr()
}).

-record(action_instance,
{ id :: action_instance_id()
, name :: action_name()
, args :: #{atom() => term()} %% the args got from API for initializing action_instance
}).

-record(rule,
{ id :: rule_id()
, for :: hook()
, rawsql :: binary()
, selects :: list()
, conditions :: tuple()
, actions :: list()
, actions :: list(#action_instance{})
, enabled :: boolean()
, description :: binary()
}).

-record(action,
{ name :: action_name()
, for :: hook()
, app :: atom()
, types = [] :: list(resource_name())
, module :: module()
, func :: atom()
, params :: #{atom() => term()}
, title :: descr()
, description :: descr()
}).

-record(resource,
{ id :: resource_id()
, type :: resource_type_name()
, config :: #{}
, params :: #{}
, config :: #{} %% the configs got from API for initializing resource
, description :: binary()
}).

-record(resource_type,
{ name :: resource_type_name()
, provider :: atom()
, params :: #{}
, on_create :: {Module::atom(), Fun::atom()}
, on_destroy :: {Module::atom(), Fun::atom()}
, title :: descr()
, description :: descr()
, params_spec :: #{atom() => term()} %% params specs
, on_create :: mf()
, on_destroy :: mf()
, title = ?descr :: descr()
, description = ?descr :: descr()
}).

-record(rule_hooks,
{ hook :: atom()
, rule_id :: rule_id()
}).

-record(resource_params,
{ id :: resource_id()
, params :: #{} %% the params got after initializing the resource
}).

-record(action_instance_params,
{ id :: action_instance_id()
, params :: #{} %% the params got after initializing the action
, apply :: fun((Data::map(), Envs::map()) -> any()) %% the func got after initializing the action
}).

%% Arithmetic operators
-define(is_arith(Op), (Op =:= '+' orelse
Op =:= '-' orelse
@@ -33,7 +33,7 @@
-rule_action(#{name => inspect,
for => '$any',
types => [],
func => inspect,
create => on_action_create_inspect,
params => #{},
title => #{en => <<"Inspect (debug)">>,
zh => <<"检查 (调试)"/utf8>>},
@@ -44,7 +44,7 @@
-rule_action(#{name => republish,
for => 'message.publish',
types => [],
func => republish,
create => on_action_create_republish,
params => ?REPUBLISH_PARAMS_SPEC,
title => #{en => <<"Republish">>,
zh => <<"消息重新发布"/utf8>>},
@@ -58,8 +58,8 @@

-export([on_resource_create/2]).

-export([ inspect/1
, republish/1
-export([ on_action_create_inspect/1
, on_action_create_republish/1
]).

%%------------------------------------------------------------------------------
@@ -70,8 +70,8 @@
on_resource_create(_Name, Conf) ->
Conf.

-spec(inspect(Params :: map()) -> action_fun()).
inspect(Params) ->
-spec(on_action_create_inspect(Params :: map()) -> action_fun()).
on_action_create_inspect(Params) ->
fun(Selected, Envs) ->
io:format("[inspect]~n"
"\tSelected Data: ~p~n"
@@ -80,9 +80,9 @@ inspect(Params) ->
end.

%% A Demo Action.
-spec(republish(#{binary() := emqx_topic:topic()})
-spec(on_action_create_republish(#{binary() := emqx_topic:topic()})
-> action_fun()).
republish(#{<<"target_topic">> := TargetTopic}) ->
on_action_create_republish(#{<<"target_topic">> := TargetTopic}) ->
fun(Selected, #{qos := QoS, from := Client,
flags := Flags, headers := Headers}) ->
?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
@@ -101,7 +101,7 @@ republish(#{<<"target_topic">> := TargetTopic}) ->
end.

republish_from(Client) ->
C = bin(Client), <<"built_in:republish:", C/binary>>.
C = bin(Client), <<"action:republish:", C/binary>>.

bin(Bin) when is_binary(Bin) -> Bin;
bin(Atom) when is_atom(Atom) ->

0 comments on commit 3da7fed

Please sign in to comment.
You can’t perform that action at this time.