Skip to content

Commit

Permalink
Merge pull request #12386 from SergeTupchiy/EMQX-11457-bridge-v2-grep…
Browse files Browse the repository at this point in the history
…timedb

refactor greptimedb bridge (split to actions and connectors)
  • Loading branch information
SergeTupchiy committed Jan 26, 2024
2 parents c93fabb + a56a5e9 commit 017386f
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 120 deletions.
3 changes: 2 additions & 1 deletion apps/emqx_bridge/src/emqx_action_info.erl
Expand Up @@ -101,7 +101,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_redis_action_info,
emqx_bridge_iotdb_action_info,
emqx_bridge_es_action_info,
emqx_bridge_opents_action_info
emqx_bridge_opents_action_info,
emqx_bridge_greptimedb_action_info
].
-else.
hard_coded_action_info_modules_ee() ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_greptimedb/rebar.config
Expand Up @@ -6,7 +6,7 @@
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}}
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.7"}}}
]}.
{plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}.
Expand Up @@ -8,7 +8,7 @@
emqx_resource,
greptimedb
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_greptimedb_action_info]}]},
{modules, []},
{links, []}
]}.
136 changes: 106 additions & 30 deletions apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl
Expand Up @@ -10,17 +10,23 @@

-import(hoconsc, [mk/2, enum/1, ref/2]).

-export([
conn_bridge_examples/1
]).

-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).

%% Examples
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).

-define(CONNECTOR_TYPE, greptimedb).
-define(ACTION_TYPE, greptimedb).

%% -------------------------------------------------------------------------------------------------
%% api

Expand All @@ -29,44 +35,67 @@ conn_bridge_examples(Method) ->
#{
<<"greptimedb">> => #{
summary => <<"Greptimedb HTTP API V2 Bridge">>,
value => values("greptimedb", Method)
value => bridge_v1_values(Method)
}
}
].

values(Protocol, get) ->
values(Protocol, post);
values("greptimedb", post) ->
SupportUint = <<"uint_value=${payload.uint_key}u,">>,
TypeOpts = #{
bucket => <<"example_bucket">>,
org => <<"examlpe_org">>,
token => <<"example_token">>,
server => <<"127.0.0.1:4001">>
bridge_v2_examples(Method) ->
ParamsExample = #{
parameters => #{
write_syntax => write_syntax_value(), precision => ms
}
},
values(common, "greptimedb", SupportUint, TypeOpts);
values(Protocol, put) ->
values(Protocol, post).
[
#{
<<"greptimedb">> => #{
summary => <<"GreptimeDB Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, greptimedb, greptimedb, ParamsExample
)
}
}
].

values(common, Protocol, SupportUint, TypeOpts) ->
CommonConfigs = #{
type => list_to_atom(Protocol),
connector_examples(Method) ->
[
#{
<<"greptimedb">> => #{
summary => <<"GreptimeDB Connector">>,
value => emqx_connector_schema:connector_values(
Method, greptimedb, connector_values(Method)
)
}
}
].

bridge_v1_values(_Method) ->
#{
type => greptimedb,
name => <<"demo">>,
enable => true,
local_topic => <<"local/topic/#">>,
write_syntax =>
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
"bool=${payload.bool}">>,
write_syntax => write_syntax_value(),
precision => ms,
resource_opts => #{
batch_size => 100,
batch_time => <<"20ms">>
},
username => <<"example_username">>,
password => <<"******">>,
dbname => <<"example_db">>,
server => <<"127.0.0.1:4001">>,
ssl => #{enable => false}
},
maps:merge(TypeOpts, CommonConfigs).
}.

connector_values(Method) ->
maps:without([write_syntax, precision], bridge_v1_values(Method)).

write_syntax_value() ->
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,",
"uint_value=${payload.uint_key}u,"
"bool=${payload.bool}">>.

%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
Expand All @@ -80,11 +109,50 @@ fields("put_grpc_v1") ->
method_fields(put, greptimedb);
fields("get_grpc_v1") ->
method_fields(get, greptimedb);
fields(Type) when
Type == greptimedb
->
fields(greptimedb = Type) ->
greptimedb_bridge_common_fields() ++
connector_fields(Type).
connector_fields(Type);
%% Actions
fields(action) ->
{greptimedb,
mk(
hoconsc:map(name, ref(?MODULE, greptimedb_action)),
#{desc => <<"GreptimeDB Action Config">>, required => false}
)};
fields(greptimedb_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(ref(?MODULE, action_parameters), #{
required => true, desc => ?DESC(action_parameters)
})
);
fields(action_parameters) ->
[
{write_syntax, fun write_syntax/1},
emqx_bridge_greptimedb_connector:precision_field()
];
%% Connectors
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
emqx_bridge_greptimedb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields =
emqx_bridge_greptimedb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
%$ Bridge v2
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(greptimedb_action)).

method_fields(post, ConnectorType) ->
greptimedb_bridge_common_fields() ++
Expand Down Expand Up @@ -122,6 +190,14 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Greptimedb using `", string:to_upper(Method), "` method."];
desc(greptimedb) ->
?DESC(emqx_bridge_greptimedb_connector, "greptimedb");
desc(greptimedb_action) ->
?DESC(greptimedb_action);
desc(action_parameters) ->
?DESC(action_parameters);
desc("config_connector") ->
?DESC("desc_config");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.

Expand Down
@@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_greptimedb_action_info).

-behaviour(emqx_action_info).

-export([
action_type_name/0,
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1,
bridge_v1_type_name/0,
connector_action_config_to_bridge_v1_config/2,
connector_type_name/0,
schema_module/0
]).

-import(emqx_utils_conv, [bin/1]).

-define(SCHEMA_MODULE, emqx_bridge_greptimedb).
-define(GREPTIMEDB_TYPE, greptimedb).

action_type_name() -> ?GREPTIMEDB_TYPE.
bridge_v1_type_name() -> ?GREPTIMEDB_TYPE.
connector_type_name() -> ?GREPTIMEDB_TYPE.

schema_module() -> ?SCHEMA_MODULE.

bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(greptimedb_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).

bridge_v1_config_to_connector_config(BridgeV1Config) ->
ConnectorKeys = schema_keys("config_connector"),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
maps:with(ConnectorKeys, BridgeV1Config)
).

connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorRawConf, ActionRawConf
).

make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).

schema_keys(Name) ->
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

0 comments on commit 017386f

Please sign in to comment.