Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor greptimedb bridge (split to actions and connectors) #12386

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/emqx_bridge/src/emqx_action_info.erl
Expand Up @@ -101,7 +101,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_redis_action_info,
emqx_bridge_iotdb_action_info,
emqx_bridge_es_action_info,
emqx_bridge_opents_action_info
emqx_bridge_opents_action_info,
emqx_bridge_greptimedb_action_info
].
-else.
hard_coded_action_info_modules_ee() ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_greptimedb/rebar.config
Expand Up @@ -6,7 +6,7 @@
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}}
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.7"}}}
]}.
{plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}.
Expand Up @@ -8,7 +8,7 @@
emqx_resource,
greptimedb
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_greptimedb_action_info]}]},
{modules, []},
{links, []}
]}.
136 changes: 106 additions & 30 deletions apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl
Expand Up @@ -10,17 +10,23 @@

-import(hoconsc, [mk/2, enum/1, ref/2]).

-export([
conn_bridge_examples/1
]).

-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).

%% Examples
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).

-define(CONNECTOR_TYPE, greptimedb).
-define(ACTION_TYPE, greptimedb).

%% -------------------------------------------------------------------------------------------------
%% api

Expand All @@ -29,44 +35,67 @@ conn_bridge_examples(Method) ->
#{
<<"greptimedb">> => #{
summary => <<"Greptimedb HTTP API V2 Bridge">>,
value => values("greptimedb", Method)
value => bridge_v1_values(Method)
}
}
].

values(Protocol, get) ->
values(Protocol, post);
values("greptimedb", post) ->
SupportUint = <<"uint_value=${payload.uint_key}u,">>,
TypeOpts = #{
bucket => <<"example_bucket">>,
org => <<"examlpe_org">>,
token => <<"example_token">>,
server => <<"127.0.0.1:4001">>
bridge_v2_examples(Method) ->
ParamsExample = #{
parameters => #{
write_syntax => write_syntax_value(), precision => ms
}
},
values(common, "greptimedb", SupportUint, TypeOpts);
values(Protocol, put) ->
values(Protocol, post).
[
#{
<<"greptimedb">> => #{
summary => <<"GreptimeDB Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, greptimedb, greptimedb, ParamsExample
)
}
}
].

values(common, Protocol, SupportUint, TypeOpts) ->
CommonConfigs = #{
type => list_to_atom(Protocol),
connector_examples(Method) ->
[
#{
<<"greptimedb">> => #{
summary => <<"GreptimeDB Connector">>,
value => emqx_connector_schema:connector_values(
Method, greptimedb, connector_values(Method)
)
}
}
].

bridge_v1_values(_Method) ->
#{
type => greptimedb,
name => <<"demo">>,
enable => true,
local_topic => <<"local/topic/#">>,
write_syntax =>
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
"bool=${payload.bool}">>,
write_syntax => write_syntax_value(),
precision => ms,
resource_opts => #{
batch_size => 100,
batch_time => <<"20ms">>
},
username => <<"example_username">>,
password => <<"******">>,
dbname => <<"example_db">>,
server => <<"127.0.0.1:4001">>,
ssl => #{enable => false}
},
maps:merge(TypeOpts, CommonConfigs).
}.

connector_values(Method) ->
maps:without([write_syntax, precision], bridge_v1_values(Method)).

write_syntax_value() ->
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,",
"uint_value=${payload.uint_key}u,"
"bool=${payload.bool}">>.

%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
Expand All @@ -80,11 +109,50 @@ fields("put_grpc_v1") ->
method_fields(put, greptimedb);
fields("get_grpc_v1") ->
method_fields(get, greptimedb);
fields(Type) when
Type == greptimedb
->
fields(greptimedb = Type) ->
greptimedb_bridge_common_fields() ++
connector_fields(Type).
connector_fields(Type);
%% Actions
fields(action) ->
{greptimedb,
mk(
hoconsc:map(name, ref(?MODULE, greptimedb_action)),
#{desc => <<"GreptimeDB Action Config">>, required => false}
)};
fields(greptimedb_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(ref(?MODULE, action_parameters), #{
required => true, desc => ?DESC(action_parameters)
})
);
fields(action_parameters) ->
[
{write_syntax, fun write_syntax/1},
emqx_bridge_greptimedb_connector:precision_field()
];
%% Connectors
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
emqx_bridge_greptimedb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields =
emqx_bridge_greptimedb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
%$ Bridge v2
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(greptimedb_action)).

method_fields(post, ConnectorType) ->
greptimedb_bridge_common_fields() ++
Expand Down Expand Up @@ -122,6 +190,14 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Greptimedb using `", string:to_upper(Method), "` method."];
desc(greptimedb) ->
?DESC(emqx_bridge_greptimedb_connector, "greptimedb");
desc(greptimedb_action) ->
?DESC(greptimedb_action);
desc(action_parameters) ->
?DESC(action_parameters);
desc("config_connector") ->
?DESC("desc_config");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.

Expand Down
@@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_greptimedb_action_info).

-behaviour(emqx_action_info).

-export([
action_type_name/0,
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1,
bridge_v1_type_name/0,
connector_action_config_to_bridge_v1_config/2,
connector_type_name/0,
schema_module/0
]).

-import(emqx_utils_conv, [bin/1]).

-define(SCHEMA_MODULE, emqx_bridge_greptimedb).
-define(GREPTIMEDB_TYPE, greptimedb).

action_type_name() -> ?GREPTIMEDB_TYPE.
bridge_v1_type_name() -> ?GREPTIMEDB_TYPE.
connector_type_name() -> ?GREPTIMEDB_TYPE.

schema_module() -> ?SCHEMA_MODULE.

bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(greptimedb_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).

bridge_v1_config_to_connector_config(BridgeV1Config) ->
ConnectorKeys = schema_keys("config_connector"),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
maps:with(ConnectorKeys, BridgeV1Config)
).

connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorRawConf, ActionRawConf
).

make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).

schema_keys(Name) ->
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].