Skip to content

Commit

Permalink
Merge pull request #12488 from kjellwinblad/kjell/refactor/rocketmq_b…
Browse files Browse the repository at this point in the history
…ridge/EMQX-11467

feat: refactor RocketMQ bridge to connector and action
  • Loading branch information
kjellwinblad committed Feb 13, 2024
2 parents 6fbb6f6 + 976099f commit 3668273
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 44 deletions.
1 change: 1 addition & 0 deletions apps/emqx_bridge/src/emqx_action_info.erl
Expand Up @@ -94,6 +94,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_oracle_action_info,
emqx_bridge_rocketmq_action_info,
emqx_bridge_influxdb_action_info,
emqx_bridge_cassandra_action_info,
emqx_bridge_mysql_action_info,
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src
@@ -1,9 +1,9 @@
{application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.4"},
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]},
{modules, []},
{links, []}
]}.
162 changes: 150 additions & 12 deletions apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl
Expand Up @@ -8,12 +8,7 @@
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").

-import(hoconsc, [mk/2, enum/1, ref/2]).

-export([
conn_bridge_examples/1,
values/1
]).
-import(hoconsc, [mk/2, enum/1]).

-export([
namespace/0,
Expand All @@ -22,6 +17,14 @@
desc/1
]).

-export([
bridge_v2_examples/1,
connector_examples/1,
conn_bridge_examples/1
]).

-define(CONNECTOR_TYPE, rocketmq).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
-define(DEFAULT_TEMPLATE, <<>>).
-define(DEFFAULT_REQ_TIMEOUT, <<"15s">>).

Expand All @@ -33,14 +36,14 @@ conn_bridge_examples(Method) ->
#{
<<"rocketmq">> => #{
summary => <<"RocketMQ Bridge">>,
value => values(Method)
value => conn_bridge_example_values(Method)
}
}
].

values(get) ->
values(post);
values(post) ->
conn_bridge_example_values(get) ->
conn_bridge_example_values(post);
conn_bridge_example_values(post) ->
#{
enable => true,
type => rocketmq,
Expand All @@ -58,15 +61,142 @@ values(post) ->
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}
};
values(put) ->
values(post).
conn_bridge_example_values(put) ->
conn_bridge_example_values(post).

connector_examples(Method) ->
[
#{
<<"rocketmq">> =>
#{
summary => <<"RocketMQ Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].

connector_values() ->
#{
<<"enable">> => true,
<<"servers">> => <<"127.0.0.1:9876">>,
<<"pool_size">> => 8,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
}.

bridge_v2_examples(Method) ->
[
#{
<<"rocketmq">> =>
#{
summary => <<"RocketMQ Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].

action_values() ->
#{
<<"parameters">> => #{
<<"topic">> => <<"TopicTest">>,
<<"template">> => ?DEFAULT_TEMPLATE,
<<"refresh_interval">> => <<"3s">>,
<<"send_buffer">> => <<"1024KB">>,
<<"sync_timeout">> => <<"3s">>
}
}.

%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_rocketmq".

roots() -> [].

fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(
Field,
?CONNECTOR_TYPE,
fields("config_connector") -- emqx_connector_schema:common_fields()
);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rocketmq_action));
fields(action) ->
{?ACTION_TYPE,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, rocketmq_action)),
#{
desc => <<"RocketMQ Action Config">>,
required => false
}
)};
fields(rocketmq_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, action_parameters),
#{
required => true,
desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
Parameters =
[
{template,
mk(
binary(),
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
)}
] ++ emqx_bridge_rocketmq_connector:fields(config),
lists:foldl(
fun(Key, Acc) ->
proplists:delete(Key, Acc)
end,
Parameters,
[
servers,
pool_size,
auto_reconnect,
access_key,
secret_key,
security_token
]
);
fields("config_connector") ->
Config =
emqx_connector_schema:common_fields() ++
emqx_bridge_rocketmq_connector:fields(config) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
lists:foldl(
fun(Key, Acc) ->
proplists:delete(Key, Acc)
end,
Config,
[
topic,
sync_timeout,
refresh_interval,
send_buffer,
auto_reconnect
]
);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("config") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
Expand Down Expand Up @@ -94,6 +224,14 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for RocketMQ using `", string:to_upper(Method), "` method."];
desc("config_connector") ->
?DESC("config_connector");
desc(rocketmq_action) ->
?DESC("rocketmq_action");
desc(action_parameters) ->
?DESC("action_parameters");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.

Expand Down
22 changes: 22 additions & 0 deletions apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_action_info.erl
@@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

-module(emqx_bridge_rocketmq_action_info).

-behaviour(emqx_action_info).

-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).

bridge_v1_type_name() -> rocketmq.

action_type_name() -> rocketmq.

connector_type_name() -> rocketmq.

schema_module() -> emqx_bridge_rocketmq.

0 comments on commit 3668273

Please sign in to comment.