From 2d693402c57e7f2d1c352179536c3f9488d8a585 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 17 Jan 2024 09:56:59 +0200 Subject: [PATCH 1/2] refactor: split greptimedb bridge to actions and connectors --- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- apps/emqx_bridge_greptimedb/rebar.config | 2 +- .../src/emqx_bridge_greptimedb.app.src | 2 +- .../src/emqx_bridge_greptimedb.erl | 136 +++++++++--- .../emqx_bridge_greptimedb_action_info.erl | 58 +++++ .../src/emqx_bridge_greptimedb_connector.erl | 206 ++++++++++++------ .../test/emqx_bridge_greptimedb_SUITE.erl | 15 +- ...emqx_bridge_greptimedb_connector_SUITE.erl | 28 ++- .../src/schema/emqx_connector_ee_schema.erl | 16 +- .../src/schema/emqx_connector_schema.erl | 4 +- changes/ee/feat-12386.en.md | 1 + mix.exs | 2 +- rel/i18n/emqx_bridge_greptimedb.hocon | 15 ++ 13 files changed, 368 insertions(+), 120 deletions(-) create mode 100644 apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_action_info.erl create mode 100644 changes/ee/feat-12386.en.md diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 7dce9d7cb5..b495fa6718 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -101,7 +101,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_redis_action_info, emqx_bridge_iotdb_action_info, emqx_bridge_es_action_info, - emqx_bridge_opents_action_info + emqx_bridge_opents_action_info, + emqx_bridge_greptimedb_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_greptimedb/rebar.config b/apps/emqx_bridge_greptimedb/rebar.config index 170ced1e79..bb37de16ed 100644 --- a/apps/emqx_bridge_greptimedb/rebar.config +++ b/apps/emqx_bridge_greptimedb/rebar.config @@ -6,7 +6,7 @@ {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}}, - {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}} + {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.7"}}} ]}. {plugins, [rebar3_path_deps]}. {project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index 0875d13ba0..b3ac508ad2 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -8,7 +8,7 @@ emqx_resource, greptimedb ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_greptimedb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl index f5ae714d72..cf3586c738 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl @@ -10,10 +10,6 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --export([ - conn_bridge_examples/1 -]). - -export([ namespace/0, roots/0, @@ -21,6 +17,16 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + +-define(CONNECTOR_TYPE, greptimedb). +-define(ACTION_TYPE, greptimedb). + %% ------------------------------------------------------------------------------------------------- %% api @@ -29,44 +35,67 @@ conn_bridge_examples(Method) -> #{ <<"greptimedb">> => #{ summary => <<"Greptimedb HTTP API V2 Bridge">>, - value => values("greptimedb", Method) + value => bridge_v1_values(Method) } } ]. -values(Protocol, get) -> - values(Protocol, post); -values("greptimedb", post) -> - SupportUint = <<"uint_value=${payload.uint_key}u,">>, - TypeOpts = #{ - bucket => <<"example_bucket">>, - org => <<"examlpe_org">>, - token => <<"example_token">>, - server => <<"127.0.0.1:4001">> +bridge_v2_examples(Method) -> + ParamsExample = #{ + parameters => #{ + write_syntax => write_syntax_value(), precision => ms + } }, - values(common, "greptimedb", SupportUint, TypeOpts); -values(Protocol, put) -> - values(Protocol, post). + [ + #{ + <<"greptimedb">> => #{ + summary => <<"GreptimeDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, greptimedb, greptimedb, ParamsExample + ) + } + } + ]. -values(common, Protocol, SupportUint, TypeOpts) -> - CommonConfigs = #{ - type => list_to_atom(Protocol), +connector_examples(Method) -> + [ + #{ + <<"greptimedb">> => #{ + summary => <<"GreptimeDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, greptimedb, connector_values(Method) + ) + } + } + ]. + +bridge_v1_values(_Method) -> + #{ + type => greptimedb, name => <<"demo">>, enable => true, local_topic => <<"local/topic/#">>, - write_syntax => - <<"${topic},clientid=${clientid}", " ", "payload=${payload},", - "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, - "bool=${payload.bool}">>, + write_syntax => write_syntax_value(), precision => ms, resource_opts => #{ batch_size => 100, batch_time => <<"20ms">> }, + username => <<"example_username">>, + password => <<"******">>, + dbname => <<"example_db">>, server => <<"127.0.0.1:4001">>, ssl => #{enable => false} - }, - maps:merge(TypeOpts, CommonConfigs). + }. + +connector_values(Method) -> + maps:without([write_syntax, precision], bridge_v1_values(Method)). + +write_syntax_value() -> + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "bool=${payload.bool}">>. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -80,11 +109,50 @@ fields("put_grpc_v1") -> method_fields(put, greptimedb); fields("get_grpc_v1") -> method_fields(get, greptimedb); -fields(Type) when - Type == greptimedb --> +fields(greptimedb = Type) -> greptimedb_bridge_common_fields() ++ - connector_fields(Type). + connector_fields(Type); +%% Actions +fields(action) -> + {greptimedb, + mk( + hoconsc:map(name, ref(?MODULE, greptimedb_action)), + #{desc => <<"GreptimeDB Action Config">>, required => false} + )}; +fields(greptimedb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + {write_syntax, fun write_syntax/1}, + emqx_bridge_greptimedb_connector:precision_field() + ]; +%% Connectors +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_greptimedb_connector:fields("connector") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + emqx_bridge_greptimedb_connector:fields("connector") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +%$ Bridge v2 +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(greptimedb_action)). method_fields(post, ConnectorType) -> greptimedb_bridge_common_fields() ++ @@ -122,6 +190,14 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Greptimedb using `", string:to_upper(Method), "` method."]; desc(greptimedb) -> ?DESC(emqx_bridge_greptimedb_connector, "greptimedb"); +desc(greptimedb_action) -> + ?DESC(greptimedb_action); +desc(action_parameters) -> + ?DESC(action_parameters); +desc("config_connector") -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_action_info.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_action_info.erl new file mode 100644 index 0000000000..c128e7101a --- /dev/null +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_action_info.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_greptimedb_action_info). + +-behaviour(emqx_action_info). + +-export([ + action_type_name/0, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name/0, + connector_action_config_to_bridge_v1_config/2, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_greptimedb). +-define(GREPTIMEDB_TYPE, greptimedb). + +action_type_name() -> ?GREPTIMEDB_TYPE. +bridge_v1_type_name() -> ?GREPTIMEDB_TYPE. +connector_type_name() -> ?GREPTIMEDB_TYPE. + +schema_module() -> ?SCHEMA_MODULE. + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(greptimedb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ConnectorKeys = schema_keys("config_connector"), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + maps:with(ConnectorKeys, BridgeV1Config) + ). + +connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) -> + emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorRawConf, ActionRawConf + ). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index af42dac525..0016af4631 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -4,7 +4,7 @@ -module(emqx_bridge_greptimedb_connector). -include_lib("emqx_connector/include/emqx_connector.hrl"). - +-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -19,6 +19,10 @@ callback_mode/0, on_start/2, on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1, on_query/3, on_batch_query/3, on_query_async/4, @@ -34,6 +38,8 @@ desc/1 ]). +-export([precision_field/0]). + %% only for test -ifdef(TEST). -export([is_unrecoverable_error/1]). @@ -62,6 +68,38 @@ %% resource callback callback_mode() -> async_if_possible. +on_add_channel( + _InstanceId, + #{channels := Channels} = OldState, + ChannelId, + #{parameters := Parameters} = ChannelConfig0 +) -> + #{write_syntax := WriteSyntaxTmpl} = Parameters, + Precision = maps:get(precision, Parameters, ms), + ChannelConfig = maps:merge( + Parameters, + ChannelConfig0#{ + precision => Precision, + write_syntax => to_config(WriteSyntaxTmpl, Precision) + } + ), + {ok, OldState#{ + channels => Channels#{ChannelId => ChannelConfig} + }}. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + NewState = State#{channels => maps:remove(ChannelId, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, _ChannelId, State) -> + case on_get_status(InstanceId, State) of + ?status_connected -> ?status_connected; + _ -> ?status_connecting + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + on_start(InstId, Config) -> %% InstID as pool would be handled by greptimedb client %% so there is no need to allocate pool_name here @@ -78,8 +116,13 @@ on_stop(InstId, _State) -> ok end. -on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> - case data_to_points(Data, SyntaxLines) of +on_query(InstId, {Channel, Message}, State) -> + #{ + channels := #{Channel := #{write_syntax := SyntaxLines}}, + client := Client, + dbname := DbName + } = State, + case data_to_points(Message, DbName, SyntaxLines) of {ok, Points} -> ?tp( greptimedb_connector_send_query, @@ -97,8 +140,13 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> - case parse_batch_data(InstId, BatchData, SyntaxLines) of +on_batch_query(InstId, [{Channel, _} | _] = BatchData, State) -> + #{ + channels := #{Channel := #{write_syntax := SyntaxLines}}, + client := Client, + dbname := DbName + } = State, + case parse_batch_data(InstId, DbName, BatchData, SyntaxLines) of {ok, Points} -> ?tp( greptimedb_connector_send_query, @@ -113,13 +161,13 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client {error, {unrecoverable_error, Reason}} end. -on_query_async( - InstId, - {send_message, Data}, - {ReplyFun, Args}, - _State = #{write_syntax := SyntaxLines, client := Client} -) -> - case data_to_points(Data, SyntaxLines) of +on_query_async(InstId, {Channel, Message}, {ReplyFun, Args}, State) -> + #{ + channels := #{Channel := #{write_syntax := SyntaxLines}}, + client := Client, + dbname := DbName + } = State, + case data_to_points(Message, DbName, SyntaxLines) of {ok, Points} -> ?tp( greptimedb_connector_send_query, @@ -135,13 +183,13 @@ on_query_async( Err end. -on_batch_query_async( - InstId, - BatchData, - {ReplyFun, Args}, - #{write_syntax := SyntaxLines, client := Client} -) -> - case parse_batch_data(InstId, BatchData, SyntaxLines) of +on_batch_query_async(InstId, [{Channel, _} | _] = BatchData, {ReplyFun, Args}, State) -> + #{ + channels := #{Channel := #{write_syntax := SyntaxLines}}, + client := Client, + dbname := DbName + } = State, + case parse_batch_data(InstId, DbName, BatchData, SyntaxLines) of {ok, Points} -> ?tp( greptimedb_connector_send_query, @@ -159,9 +207,9 @@ on_batch_query_async( on_get_status(_InstId, #{client := Client}) -> case greptimedb:is_alive(Client) of true -> - connected; + ?status_connected; false -> - disconnected + ?status_disconnected end. %% ------------------------------------------------------------------------------------------------- @@ -179,22 +227,36 @@ roots() -> }} ]. +fields("connector") -> + [server_field()] ++ + credentials_fields() ++ + emqx_connector_schema_lib:ssl_fields(); +%% ============ begin: schema for old bridge configs ============ fields(common) -> [ - {server, server()}, - {precision, - %% The greptimedb only supports these 4 precision - mk(enum([ns, us, ms, s]), #{ - required => false, default => ms, desc => ?DESC("precision") - })} + server_field(), + precision_field() ]; fields(greptimedb) -> fields(common) ++ - [ - {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})}, - {username, mk(binary(), #{desc => ?DESC("username")})}, - {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} - ] ++ emqx_connector_schema_lib:ssl_fields(). + credentials_fields() ++ + emqx_connector_schema_lib:ssl_fields(). +%% ============ end: schema for old bridge configs ============ + +desc(common) -> + ?DESC("common"); +desc(greptimedb) -> + ?DESC("greptimedb"). + +precision_field() -> + {precision, + %% The greptimedb only supports these 4 precision + mk(enum([ns, us, ms, s]), #{ + required => false, default => ms, desc => ?DESC("precision") + })}. + +server_field() -> + {server, server()}. server() -> Meta = #{ @@ -205,10 +267,12 @@ server() -> }, emqx_schema:servers_sc(Meta, ?GREPTIMEDB_HOST_OPTIONS). -desc(common) -> - ?DESC("common"); -desc(greptimedb) -> - ?DESC("greptimedb"). +credentials_fields() -> + [ + {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})}, + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} + ]. %% ------------------------------------------------------------------------------------------------- %% internal functions @@ -243,9 +307,8 @@ start_client(InstId, Config) -> do_start_client( InstId, ClientConfig, - Config = #{write_syntax := Lines} + Config ) -> - Precision = maps:get(precision, Config, ms), case greptimedb:start_client(ClientConfig) of {ok, Client} -> case greptimedb:is_alive(Client, true) of @@ -253,7 +316,7 @@ do_start_client( State = #{ client => Client, dbname => proplists:get_value(dbname, ClientConfig, ?DEFAULT_DB), - write_syntax => to_config(Lines, Precision) + channels => #{} }, ?SLOG(info, #{ msg => "starting_greptimedb_connector_success", @@ -314,8 +377,7 @@ client_config( {pool, InstId}, {pool_type, random}, {auto_reconnect, ?AUTO_RECONNECT_S}, - {gprc_options, grpc_config()}, - {timeunit, maps:get(precision, Config, ms)} + {gprc_options, grpc_config()} ] ++ protocol_config(Config). protocol_config( @@ -469,10 +531,10 @@ to_maps_config(K, V, Res) -> %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Data Trans -parse_batch_data(InstId, BatchData, SyntaxLines) -> +parse_batch_data(InstId, DbName, BatchData, SyntaxLines) -> {Points, Errors} = lists:foldl( - fun({send_message, Data}, {ListOfPoints, ErrAccIn}) -> - case data_to_points(Data, SyntaxLines) of + fun({_, Data}, {ListOfPoints, ErrAccIn}) -> + case data_to_points(Data, DbName, SyntaxLines) of {ok, Points} -> {[Points | ListOfPoints], ErrAccIn}; {error, ErrorPoints} -> @@ -496,21 +558,25 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> {error, points_trans_failed} end. --spec data_to_points(map(), [ - #{ - fields := [{binary(), binary()}], - measurement := binary(), - tags := [{binary(), binary()}], - timestamp := emqx_placeholder:tmpl_token() | integer(), - precision := {From :: ts_precision(), To :: ts_precision()} - } -]) -> {ok, [map()]} | {error, term()}. -data_to_points(Data, SyntaxLines) -> - lines_to_points(Data, SyntaxLines, [], []). +-spec data_to_points( + map(), + binary(), + [ + #{ + fields := [{binary(), binary()}], + measurement := binary(), + tags := [{binary(), binary()}], + timestamp := emqx_placeholder:tmpl_token() | integer(), + precision := {From :: ts_precision(), To :: ts_precision()} + } + ] +) -> {ok, [map()]} | {error, term()}. +data_to_points(Data, DbName, SyntaxLines) -> + lines_to_points(Data, DbName, SyntaxLines, [], []). %% When converting multiple rows data into Greptimedb Line Protocol, they are considered to be strongly correlated. %% And once a row fails to convert, all of them are considered to have failed. -lines_to_points(_, [], Points, ErrorPoints) -> +lines_to_points(_Data, _DbName, [], Points, ErrorPoints) -> case ErrorPoints of [] -> {ok, Points}; @@ -518,23 +584,27 @@ lines_to_points(_, [], Points, ErrorPoints) -> %% ignore trans succeeded points {error, ErrorPoints} end; -lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when +lines_to_points( + Data, DbName, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc +) when is_list(Ts) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of {ok, TsInt} -> Item1 = Item#{timestamp => TsInt}, - continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); + continue_lines_to_points(Data, DbName, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); {error, BadTs} -> - lines_to_points(Data, Rest, ResultPointsAcc, [ + lines_to_points(Data, DbName, Rest, ResultPointsAcc, [ {error, {bad_timestamp, BadTs}} | ErrorPointsAcc ]) end; -lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when +lines_to_points( + Data, DbName, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc +) when is_integer(Ts) -> - continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc). + continue_lines_to_points(Data, DbName, Item, Rest, ResultPointsAcc, ErrorPointsAcc). parse_timestamp([TsInt]) when is_integer(TsInt) -> {ok, TsInt}; @@ -546,30 +616,32 @@ parse_timestamp([TsBin]) -> {error, TsBin} end. -continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> - case line_to_point(Data, Item) of +continue_lines_to_points(Data, DbName, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> + case line_to_point(Data, DbName, Item) of {_, [#{fields := Fields}]} when map_size(Fields) =:= 0 -> %% greptimedb client doesn't like empty field maps... ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc], - lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1); + lines_to_points(Data, DbName, Rest, ResultPointsAcc, ErrorPointsAcc1); Point -> - lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) + lines_to_points(Data, DbName, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) end. line_to_point( Data, + DbName, #{ measurement := Measurement, tags := Tags, fields := Fields, timestamp := Ts, - precision := Precision + precision := {_, ToPrecision} = Precision } = Item ) -> {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), TableName = emqx_placeholder:proc_tmpl(Measurement, Data), - {TableName, [ + Metric = #{dbname => DbName, table => TableName, timeunit => ToPrecision}, + {Metric, [ maps:without([precision, measurement], Item#{ tags => EncodedTags, fields => EncodedFields, diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl index 73223892de..fb6639b68a 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -452,10 +452,7 @@ t_start_ok(Config) -> [#{points := [Point0]}] = Trace, {Measurement, [Point]} = Point0, ct:pal("sent point: ~p", [Point]), - ?assertMatch( - <<_/binary>>, - Measurement - ), + ?assertMatch(#{dbname := _, table := _, timeunit := _}, Measurement), ?assertMatch( #{ fields := #{}, @@ -481,7 +478,6 @@ t_start_stop(Config) -> BridgeName = ?config(bridge_name, Config), BridgeConfig = ?config(bridge_config, Config), StopTracePoint = greptimedb_client_stopped, - ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ?check_trace( begin ProbeRes0 = emqx_bridge_testlib:probe_bridge_api( @@ -491,6 +487,7 @@ t_start_stop(Config) -> ), ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -554,6 +551,7 @@ t_start_stop(Config) -> ok end, fun(Trace) -> + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), %% one for probe, two for real ?assertMatch( [_, #{instance_id := ResourceId}, #{instance_id := ResourceId}], @@ -568,10 +566,7 @@ t_start_already_started(Config) -> Type = greptimedb_type_bin(?config(greptimedb_type, Config)), Name = ?config(greptimedb_name, Config), GreptimedbConfigString = ?config(greptimedb_config_string, Config), - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), + ?assertMatch({ok, _}, create_bridge(Config)), ResourceId = resource_id(Config), TypeAtom = binary_to_atom(Type), NameAtom = binary_to_atom(Name), @@ -1036,7 +1031,6 @@ t_missing_field(Config) -> ok. t_authentication_error_on_send_message(Config0) -> - ResourceId = resource_id(Config0), QueryMode = proplists:get_value(query_mode, Config0, sync), GreptimedbType = ?config(greptimedb_type, Config0), GreptimeConfig0 = proplists:get_value(greptimedb_config, Config0), @@ -1055,6 +1049,7 @@ t_authentication_error_on_send_message(Config0) -> end, fun() -> {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config0), ?retry( _Sleep = 1_000, _Attempts = 10, diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl index a4acf5b4e9..bb8bca17dc 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl @@ -65,7 +65,7 @@ t_lifecycle(Config) -> Port = ?config(greptimedb_tcp_port, Config), perform_lifecycle_check( <<"emqx_bridge_greptimedb_connector_SUITE">>, - greptimedb_config(Host, Port) + greptimedb_connector_config(Host, Port) ). perform_lifecycle_check(PoolName, InitialConfig) -> @@ -75,6 +75,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> % expects this FullConfig = CheckedConfig#{write_syntax => greptimedb_write_syntax()}, {ok, #{ + id := ResourceId, state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( @@ -92,8 +93,13 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + %% install actions to the connector + ActionConfig = greptimedb_action_config(), + ChannelId = <<"test_channel">>, + ?assertEqual(ok, emqx_resource_manager:add_channel(ResourceId, ChannelId, ActionConfig)), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), % % Perform query as further check that the resource is working as expected - ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query(ChannelId))), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. @@ -115,7 +121,9 @@ perform_lifecycle_check(PoolName, InitialConfig) -> {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query())), + ?assertEqual(ok, emqx_resource_manager:add_channel(ResourceId, ChannelId, ActionConfig)), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), + ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query(ChannelId))), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), @@ -126,7 +134,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> % %% Helpers % %%------------------------------------------------------------------------------ -greptimedb_config(Host, Port) -> +greptimedb_connector_config(Host, Port) -> Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])), ResourceConfig = #{ <<"dbname">> => <<"public">>, @@ -136,6 +144,14 @@ greptimedb_config(Host, Port) -> }, #{<<"config">> => ResourceConfig}. +greptimedb_action_config() -> + #{ + parameters => #{ + write_syntax => greptimedb_write_syntax(), + precision => ms + } + }. + greptimedb_write_syntax() -> [ #{ @@ -146,8 +162,8 @@ greptimedb_write_syntax() -> } ]. -test_query() -> - {send_message, #{ +test_query(ChannelId) -> + {ChannelId, #{ <<"clientid">> => <<"something">>, <<"payload">> => #{bool => true}, <<"topic">> => <<"connector_test">>, diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 8e81b12ea1..95c9d2991f 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -58,6 +58,8 @@ resource_type(elasticsearch) -> emqx_bridge_es_connector; resource_type(opents) -> emqx_bridge_opents_connector; +resource_type(greptimedb) -> + emqx_bridge_greptimedb_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -225,6 +227,14 @@ connector_structs() -> desc => <<"OpenTSDB Connector Config">>, required => false } + )}, + {greptimedb, + mk( + hoconsc:map(name, ref(emqx_bridge_greptimedb, "config_connector")), + #{ + desc => <<"GreptimeDB Connector Config">>, + required => false + } )} ]. @@ -247,7 +257,8 @@ schema_modules() -> emqx_bridge_redis_schema, emqx_bridge_iotdb_connector, emqx_bridge_es_connector, - emqx_bridge_opents_connector + emqx_bridge_opents_connector, + emqx_bridge_greptimedb ]. api_schemas(Method) -> @@ -279,7 +290,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), - api_ref(emqx_bridge_opents_connector, <<"opents">>, Method) + api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), + api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index d5c450529c..b7c4d9f743 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -160,7 +160,9 @@ connector_type_to_bridge_types(iotdb) -> connector_type_to_bridge_types(elasticsearch) -> [elasticsearch]; connector_type_to_bridge_types(opents) -> - [opents]. + [opents]; +connector_type_to_bridge_types(greptimedb) -> + [greptimedb]. actions_config_name(action) -> <<"actions">>; actions_config_name(source) -> <<"sources">>. diff --git a/changes/ee/feat-12386.en.md b/changes/ee/feat-12386.en.md new file mode 100644 index 0000000000..b12e2f24fd --- /dev/null +++ b/changes/ee/feat-12386.en.md @@ -0,0 +1 @@ +Split GreptimeDB bridge into connector and action components. diff --git a/mix.exs b/mix.exs index bb79c3204d..4b689b15f2 100644 --- a/mix.exs +++ b/mix.exs @@ -209,7 +209,7 @@ defmodule EMQXUmbrella.MixProject do {:crc32cer, "0.1.8", override: true}, {:supervisor3, "1.1.12", override: true}, {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}, - {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.6", override: true}, + {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.7", override: true}, # The following two are dependencies of rabbit_common. They are needed here to # make mix not complain about conflicting versions {:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true}, diff --git a/rel/i18n/emqx_bridge_greptimedb.hocon b/rel/i18n/emqx_bridge_greptimedb.hocon index 977e6e0646..9ca36acb38 100644 --- a/rel/i18n/emqx_bridge_greptimedb.hocon +++ b/rel/i18n/emqx_bridge_greptimedb.hocon @@ -47,4 +47,19 @@ Please note that a placeholder for an integer value must be annotated with a suf write_syntax.label: """Write Syntax""" +action_parameters.label: +"""Action Parameters""" +action_parameters.desc: +"""Additional parameters specific to this action type""" + +connector.label: +"""GreptimeDB Connector""" +connector.desc: +"""GreptimeDB Connector Configs""" + +greptimedb_action.label: +"""GreptimeDB Action""" +greptimedb_action.desc: +"""Action to interact with a GreptimeDB connector""" + } From a56a5e9c3cc358441b128f71a38a55de8b5bd6cf Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 24 Jan 2024 21:10:30 +0200 Subject: [PATCH 2/2] chore: add copyright --- .../src/emqx_bridge_influxdb_action_info.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl index 00a6c5510f..5864daf501 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl @@ -1,3 +1,6 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- -module(emqx_bridge_influxdb_action_info). -behaviour(emqx_action_info).