From ce83079c6b855b2a579e4cff3e1538e8b19821f1 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 10 Nov 2023 09:51:43 +0800 Subject: [PATCH 1/5] feat(sysk): integrated Nari Syskeeper 2000 as a new bridge backend --- .../src/schema/emqx_bridge_enterprise.erl | 35 +- apps/emqx_bridge_syskeeper/BSL.txt | 94 +++++ apps/emqx_bridge_syskeeper/README.md | 30 ++ apps/emqx_bridge_syskeeper/doc/protocol_v1.md | 370 ++++++++++++++++++ .../emqx_bridge_syskeeper/doc/protocol_v1.org | 80 ++++ apps/emqx_bridge_syskeeper/docker-ct | 1 + .../include/emqx_bridge_syskeeper.hrl | 15 + apps/emqx_bridge_syskeeper/rebar.config | 6 + .../src/emqx_bridge_syskeeper.app.src | 13 + .../src/emqx_bridge_syskeeper.erl | 117 ++++++ .../src/emqx_bridge_syskeeper_client.erl | 180 +++++++++ .../src/emqx_bridge_syskeeper_connector.erl | 262 +++++++++++++ .../src/emqx_bridge_syskeeper_frame.erl | 163 ++++++++ .../src/emqx_bridge_syskeeper_frame_v1.erl | 70 ++++ .../src/emqx_bridge_syskeeper_proxy.erl | 100 +++++ .../emqx_bridge_syskeeper_proxy_server.erl | 251 ++++++++++++ apps/emqx_machine/priv/reboot_lists.eterm | 3 +- mix.exs | 3 +- rebar.config.erl | 1 + rel/i18n/emqx_bridge_syskeeper.hocon | 45 +++ .../emqx_bridge_syskeeper_connector.hocon | 21 + rel/i18n/emqx_bridge_syskeeper_proxy.hocon | 45 +++ 22 files changed, 1899 insertions(+), 6 deletions(-) create mode 100644 apps/emqx_bridge_syskeeper/BSL.txt create mode 100644 apps/emqx_bridge_syskeeper/README.md create mode 100644 apps/emqx_bridge_syskeeper/doc/protocol_v1.md create mode 100644 apps/emqx_bridge_syskeeper/doc/protocol_v1.org create mode 100644 apps/emqx_bridge_syskeeper/docker-ct create mode 100644 apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl create mode 100644 apps/emqx_bridge_syskeeper/rebar.config create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl create mode 100644 rel/i18n/emqx_bridge_syskeeper.hocon create mode 100644 rel/i18n/emqx_bridge_syskeeper_connector.hocon create mode 100644 rel/i18n/emqx_bridge_syskeeper_proxy.hocon diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 93951cca08..a160ecd336 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -50,7 +50,9 @@ api_schemas(Method) -> api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method), api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_grpc_v1"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer") + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer"), + api_ref(emqx_bridge_syskeeper, <<"syskeeper">>, Method), + api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) ]. schema_modules() -> @@ -78,7 +80,9 @@ schema_modules() -> emqx_bridge_rabbitmq, emqx_bridge_kinesis, emqx_bridge_greptimedb, - emqx_bridge_azure_event_hub + emqx_bridge_azure_event_hub, + emqx_bridge_syskeeper, + emqx_bridge_syskeeper_proxy ]. examples(Method) -> @@ -126,7 +130,9 @@ resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; %% We use AEH's Kafka interface. -resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer. +resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(syskeeper) -> emqx_bridge_syskeeper_connector; +resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server. %% For bridges that need to override connector configurations. bridge_impl_module(BridgeType) when is_binary(BridgeType) -> @@ -215,7 +221,8 @@ fields(bridges) -> influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++ - kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs(). + kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs() ++ + syskeeper_structs(). mongodb_structs() -> [ @@ -428,6 +435,26 @@ azure_event_hub_structs() -> )} ]. +syskeeper_structs() -> + [ + {syskeeper, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper, "config")), + #{ + desc => <<"Syskeeper bridge config ">>, + required => false + } + )}, + {syskeeper_proxy, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, "config")), + #{ + desc => <<"Syskeeper proxy server config">>, + required => false + } + )} + ]. + api_ref(Module, Type, Method) -> {Type, ref(Module, Method)}. diff --git a/apps/emqx_bridge_syskeeper/BSL.txt b/apps/emqx_bridge_syskeeper/BSL.txt new file mode 100644 index 0000000000..0acc0e6965 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_syskeeper/README.md b/apps/emqx_bridge_syskeeper/README.md new file mode 100644 index 0000000000..328fd488e1 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/README.md @@ -0,0 +1,30 @@ +# EMQX Syskeeper Bridge + +Nari Syskeeper 2000 is a one-way Physical Isolation Net Gap. + +The application is used to connect EMQX and Syskeeper. +Users can create a rule and quickly ingest IoT data to the Syskeeper by leveraging +[EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html). + +# Documentation + +- Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) + for the EMQX rules engine introduction. + +# HTTP APIs + +- Several APIs are provided for bridge management, which includes create bridge, + update bridge, get bridge, stop or restart bridge and list bridges etc. + + Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) + for more detailed information. + + +# Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + + +# License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_bridge_syskeeper/doc/protocol_v1.md b/apps/emqx_bridge_syskeeper/doc/protocol_v1.md new file mode 100644 index 0000000000..ca73c300dd --- /dev/null +++ b/apps/emqx_bridge_syskeeper/doc/protocol_v1.md @@ -0,0 +1,370 @@ + +# Table of Contents + +1. [Packet Format](#orgb2a43d1) +2. [Common Header](#org5ca4c69) + 1. [Types](#org240efb3) + 2. [Shared Flags](#org804fcce) +3. [Handshake Packet](#org6a73ea8) +4. [Forward Packet](#org39c753e) + 1. [Flags](#org5177d26) + 2. [Payload](#orgb29cbd7) + 1. [Message Content map structure](#org75acfe6) +5. [Heartbeat Packet](#org388b69a) + + + + +# Packet Format + + + + + + + + + + + + + + + + + + +
+  bytes  + +   0   + +   1   + +   2   + +   3   + +         5         + +     6 .. end     +
+         + +     variable length     + +   common header   + +     payload      +
+ +The length of the remaining part(common header + payload) is indicated by the Length Header of each packet + + + + +# Common Header + + + + + + + + + + + + + + + + + + + +
+  bits  + +   0   + +   1   + +   2   + +   3   + +   4   + +   5   + +   6   + +   7   +
+        + +       packet type       + +      shared flags       +
+ + + + +## Types + + + + + + + + + + + + + + + + + + + +
+    type    + +    usage    +
+     0      + +  handshake  +
+     1      + +   forward   +
+     2      + +  heartbeat  +
+ + + + +## Shared Flags + +The usage of each bit is determined by the type of packet + + + + +# Handshake Packet + + + + + + + + + + + + + +
+  bytes  + +        0        + +        1        +
+         + +  common header  + +     version     +
+ + + + +# Forward Packet + + + + + + + + + + + + + + + + + + + + + + + + + +
+  bits  + +  0  + +  1  + +  2  + +  3  + +  4  + +  5  + +  6  + +   7   + +     ...     +
+       
+       
+        +
+                
+   packet type  
+                 +
+             + +  ACK  + +            
+   payload  
+             +
+   forward flags   +
+ + + + +## Flags + + + + + + + + + + + +
+  flag  + +                    usage                    +
+  ACK   + +       This packet need a ACK response       +
+ + + + +## Payload + + + + + + + + + + + + + + + + + +
+  bytes  + +   0   + +   ..    + +   n   + +  n+1  + +  ..   + +   x   +
+         + +   Content Length    + +  Message Content  +
+ +- Content length is a variable length number. +- Message content is a list in an opaque binary format whose element is a map structure + + + + +### Message Content map structure + + { + id: "0006081CCFF3D48F03C10000058B0000", // unique message id + qos: 1, + flags: {dup: false, retain: false}, + from: "clientid", + topic: "t/1", + payload: "hello, world", + timestamp: 1697786555281 + } + + + + +# Heartbeat Packet + + + + + + + + + + + +
+  bytes  + +        0        +
+         + +  common header  +
+ diff --git a/apps/emqx_bridge_syskeeper/doc/protocol_v1.org b/apps/emqx_bridge_syskeeper/doc/protocol_v1.org new file mode 100644 index 0000000000..12d0fe8501 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/doc/protocol_v1.org @@ -0,0 +1,80 @@ +* Packet Format + +-------+-----+-----+-----+-----+-----------------+----------------+ + | bytes | 0 | 1 | 2 | 3 | 5 | 6 .. end | + +-------+-----+-----+-----+-----+-----------------+----------------+ + | | variable length | common header | payload | + +-------+-----------------------+-----------------+----------------+ + + The length of the remaining part(common header + payload) is indicated by the Length Header of each packet + +* Common Header + +------+-----+-----+-----+-----+-----+-----+-----+-----+ + | bits | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | + +------+-----+-----+-----+-----+-----+-----+-----+-----+ + | | packet type | shared flags | + +------+-----------------------+-----------------------+ +** Types + +----------+-----------+ + | type | usage | + +----------+-----------+ + | 0 | handshake | + +----------+-----------+ + | 1 | forward | + +----------+-----------+ + | 2 | heartbeat | + +----------+-----------+ +** Shared Flags + The usage of each bit is determined by the type of packet +* Handshake Packet + +-------+---------------+---------------+ + | bytes | 0 | 1 | + +-------+---------------+---------------+ + | | common header | version | + +-------+---------------+---------------+ +* Forward Packet + +------+---+---+---+---+---+---+---+-----+-----------+ + | bits | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | ... | + +------+---+---+---+---+---+---+---+-----+-----------+ + | | | | ACK | | + | | packet type +-----------+-----+ payload | + | | | forward flags | | + +------+---------------+-----------------+-----------+ + +** Flags + +------+-------------------------------------------+ + | flag | usage | + +------+-------------------------------------------+ + | ACK | This packet need a ACK response | + +------+-------------------------------------------+ + +** Payload + +-------+-----+-------+-----+-----+-----+-----+ + | bytes | 0 | .. | n | n+1 | .. | x | + +-------+-----+-------+-----+-----+-----+-----+ + | | Content Length | Message Content | + +-------+-------------------+-----------------+ + + + Content length is a variable length number. + + Message content is a list in an opaque binary format whose element is a map structure + +*** Message Content map structure + +#+begin_src json + { + id: "0006081CCFF3D48F03C10000058B0000", // unique message id + qos: 1, + flags: {dup: false, retain: false}, + from: "clientid", + topic: "t/1", + payload: "hello, world", + timestamp: 1697786555281 + } +#+end_src + +* Heartbeat Packet + + +-------+---------------+ + | bytes | 0 | + +-------+---------------+ + | | common header | + +-------+---------------+ diff --git a/apps/emqx_bridge_syskeeper/docker-ct b/apps/emqx_bridge_syskeeper/docker-ct new file mode 100644 index 0000000000..80f0d394b7 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/docker-ct @@ -0,0 +1 @@ +toxiproxy diff --git a/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl new file mode 100644 index 0000000000..b381ebf504 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl @@ -0,0 +1,15 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-ifndef(EMQX_BRIDGE_SYSKEEPER). +-define(EMQX_BRIDGE_SYSKEEPER, true). + +-define(TYPE_HANDSHAKE, 0). +-define(TYPE_FORWARD, 1). +-define(TYPE_HEARTBEAT, 2). + +-type packet_type() :: handshake | forward | heartbeat. +-type packet_data() :: none | binary() | [binary()]. +-type packet_type_val() :: ?TYPE_HANDSHAKE..?TYPE_HEARTBEAT. + +-endif. diff --git a/apps/emqx_bridge_syskeeper/rebar.config b/apps/emqx_bridge_syskeeper/rebar.config new file mode 100644 index 0000000000..31879d9cee --- /dev/null +++ b/apps/emqx_bridge_syskeeper/rebar.config @@ -0,0 +1,6 @@ +%% -*- mode: erlang; -*- +{erl_opts, [debug_info]}. +{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src new file mode 100644 index 0000000000..a8f5338679 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -0,0 +1,13 @@ +{application, emqx_bridge_syskeeper, [ + {description, "EMQX Enterprise Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_resource + ]}, + {env, []}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl new file mode 100644 index 0000000000..9e0ec4eedd --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl @@ -0,0 +1,117 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_syskeeper). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api +conn_bridge_examples(Method) -> + [ + #{ + <<"syskeeper">> => #{ + summary => <<"Syskeeper Bridge">>, + value => values(Method) + } + } + ]. + +values(_Method) -> + #{ + enable => true, + type => syskeeper, + name => <<"foo">>, + server => <<"127.0.0.1:9092">>, + ack_mode => <<"no_ack">>, + ack_timeout => <<"10s">>, + pool_size => 16, + target_topic => <<"${topic}">>, + target_qos => <<"-1">>, + template => <<"${payload}">>, + resource_opts => #{ + worker_pool_size => 16, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => sync, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_syskeeper". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {target_topic, + mk( + binary(), + #{desc => ?DESC("target_topic"), default => <<"${topic}">>} + )}, + {target_qos, + mk( + range(-1, 2), + #{desc => ?DESC("target_qos"), default => -1} + )}, + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => <<"${payload}">>} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ emqx_bridge_syskeeper_connector:fields(config); +fields("creation_opts") -> + emqx_resource_schema:create_opts([{request_ttl, #{default => infinity}}]); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Syskeeper using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([syskeeper]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl new file mode 100644 index 0000000000..18822886f3 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl @@ -0,0 +1,180 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_syskeeper_client). + +-behaviour(gen_server). + +%% API +-export([ + start_link/1, + forward/3, + heartbeat/2 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). + +-include("emqx_bridge_syskeeper.hrl"). + +-type state() :: #{ + ack_mode := need_ack | no_ack, + ack_timeout := timer:time(), + socket := undefined | inet:socket(), + frame_state := emqx_bridge_syskeeper_frame:state(), + last_error := undefined | tuple() +}. + +-type send_result() :: {ok, state()} | {error, term()}. + +%% ------------------------------------------------------------------------------------------------- +%% API +forward(Pid, Msg, Timeout) -> + call(Pid, {?FUNCTION_NAME, Msg}, Timeout). + +heartbeat(Pid, Timeout) -> + ok =:= call(Pid, ?FUNCTION_NAME, Timeout). + +%% ------------------------------------------------------------------------------------------------- +%% Starts Bridge which transfer data to Syskeeper + +start_link(Options) -> + gen_server:start_link(?MODULE, Options, []). + +%% ------------------------------------------------------------------------------------------------- +%%% gen_server callbacks + +%% Initialize syskeeper client +init(#{ack_timeout := AckTimeout, ack_mode := AckMode} = Options) -> + erlang:process_flag(trap_exit, true), + connect(Options, #{ + ack_timeout => AckTimeout, + ack_mode => AckMode, + socket => undefined, + last_error => undefined, + frame_state => emqx_bridge_syskeeper_frame:make_state_with_conf(Options) + }). + +handle_call({forward, Msgs}, _From, State) -> + Result = send_packet(forward, Msgs, State), + handle_reply_result(Result, State); +handle_call(heartbeat, _From, State) -> + Result = send_ack_packet(heartbeat, none, State), + handle_reply_result(Result, State); +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({tcp_closed, _} = Reason, State) -> + {noreply, State#{socket := undefined, last_error := Reason}}; +handle_info({last_error, _, _} = Reason, State) -> + {noreply, State#{socket := undefined, last_error := Reason}}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #{socket := Socket} = _State) -> + close_socket(Socket), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%% ------------------------------------------------------------------------------------------------ +connect( + #{ + hostname := Host, + port := Port + }, + State +) -> + case + gen_tcp:connect(Host, Port, [ + {active, true}, + {mode, binary}, + {nodelay, true} + ]) + of + {ok, Socket} -> + send_ack_packet(handshake, none, State#{socket := Socket}); + {error, Reason} -> + {stop, Reason} + end. + +-spec send_ack_packet(packet_type(), packet_data(), state()) -> send_result(). +send_ack_packet(Type, Data, State) -> + send_packet(Type, Data, State, true). + +-spec send_packet(packet_type(), packet_data(), state()) -> send_result(). +send_packet(Type, Data, State) -> + send_packet(Type, Data, State, false). + +-spec send_packet(packet_type(), packet_data(), state(), boolean()) -> send_result(). +send_packet(_Type, _Data, #{socket := undefined, last_error := Reason}, _Force) -> + {error, Reason}; +send_packet(Type, Data, #{frame_state := FrameState} = State, Force) -> + Packet = emqx_bridge_syskeeper_frame:encode(Type, Data, FrameState), + case socket_send(Packet, State) of + ok -> + wait_ack(State, Force); + {error, _} = Error -> + Error + end. + +-spec socket_send(binary() | [binary()], state()) -> ok | {error, _Reason}. +socket_send(Bin, State) when is_binary(Bin) -> + socket_send([Bin], State); +socket_send(Bins, #{socket := Socket}) -> + Map = fun(Data) -> + Len = erlang:byte_size(Data), + VarLen = emqx_bridge_syskeeper_frame:serialize_variable_byte_integer(Len), + <> + end, + gen_tcp:send(Socket, lists:map(Map, Bins)). + +-spec wait_ack(state(), boolean()) -> send_result(). +wait_ack(#{ack_timeout := AckTimeout, ack_mode := AckMode} = State, Force) when + AckMode =:= need_ack; Force +-> + receive + {tcp, _Socket, <<16#FF>>} -> + {ok, State}; + {tcp_closed, _} = Reason -> + {error, Reason}; + {tcp_error, _, _} = Reason -> + {error, Reason} + after AckTimeout -> + {error, wait_ack_timeout} + end; +wait_ack(State, _Force) -> + {ok, State}. + +close_socket(undefined) -> + ok; +close_socket(Socket) -> + catch gen_tcp:close(Socket), + ok. + +call(Pid, Msg, Timeout) -> + gen_server:call(Pid, Msg, Timeout). + +handle_reply_result({ok, _}, State) -> + {reply, ok, State}; +handle_reply_result({error, Reason}, State) -> + {reply, {error, {recoverable_error, Reason}}, State#{last_error := Reason}}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl new file mode 100644 index 0000000000..219d4d0d23 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -0,0 +1,262 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_connector). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + query_mode/1, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-export([ + connect/1 +]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-define(SYSKEEPER_HOST_OPTIONS, #{ + default_port => 9092 +}). + +-define(EXTRA_CALL_TIMEOUT, 2000). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {server, server()}, + {ack_mode, + mk( + enum([need_ack, no_ack]), + #{desc => ?DESC(ack_mode), default => <<"no_ack">>} + )}, + {ack_timeout, + mk( + emqx_schema:timeout_duration_ms(), + #{desc => ?DESC(ack_timeout), default => <<"10s">>} + )}, + {pool_size, fun + (default) -> + 16; + (Other) -> + emqx_connector_schema_lib:pool_size(Other) + end} + ]. + +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). + +%% ------------------------------------------------------------------------------------------------- +%% `emqx_resource' API + +callback_mode() -> always_sync. + +query_mode(_) -> sync. + +on_start( + InstanceId, + #{ + server := Server, + pool_size := PoolSize, + ack_timeout := AckTimeout, + target_topic := TargetTopic, + target_qos := TargetQoS + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_syskeeper_connector", + connector => InstanceId, + config => redact(Config) + }), + + HostCfg = emqx_schema:parse_server(Server, ?SYSKEEPER_HOST_OPTIONS), + + Options = [ + {options, + maps:merge( + HostCfg, + maps:with([ack_mode, ack_timeout], Config) + )}, + {pool_size, PoolSize} + ], + + State = #{ + pool_name => InstanceId, + target_qos => TargetQoS, + ack_timeout => AckTimeout, + templates => parse_template(Config), + target_topic_tks => emqx_placeholder:preproc_tmpl(TargetTopic) + }, + case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of + ok -> + {ok, State}; + Error -> + Error + end. + +on_stop(InstanceId, _State) -> + ?SLOG(info, #{ + msg => "stopping_syskeeper_connector", + connector => InstanceId + }), + emqx_resource_pool:stop(InstanceId). + +on_query(InstanceId, {send_message, _} = Query, State) -> + do_query(InstanceId, [Query], State); +on_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +%% we only support batch insert +on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> + do_query(InstanceId, Query, State); +on_batch_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +on_get_status(_InstanceId, #{pool_name := Pool, ack_timeout := AckTimeout}) -> + Health = emqx_resource_pool:health_check_workers( + Pool, {emqx_bridge_syskeeper_client, heartbeat, [AckTimeout + ?EXTRA_CALL_TIMEOUT]} + ), + status_result(Health). + +status_result(true) -> connected; +status_result(false) -> connecting; +status_result({error, _}) -> connecting. + +%% ------------------------------------------------------------------------------------------------- +%% Helper fns + +do_query( + InstanceId, + Query, + #{pool_name := PoolName, ack_timeout := AckTimeout} = State +) -> + ?TRACE( + "QUERY", + "syskeeper_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + + Result = + case try_apply_template(Query, State) of + {ok, Msg} -> + ecpool:pick_and_do( + PoolName, + {emqx_bridge_syskeeper_client, forward, [Msg, AckTimeout + ?EXTRA_CALL_TIMEOUT]}, + no_handover + ); + Error -> + Error + end, + + case Result of + {error, Reason} -> + ?tp( + syskeeper_connector_query_return, + #{error => Reason} + ), + %% ?SLOG(error, #{ + %% msg => "syskeeper_connector_do_query_failed", + %% connector => InstanceId, + %% query => Query, + %% reason => Reason + %% }), + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; + _ -> + %% ?tp( + %% syskeeper_connector_query_return, + %% #{result => Result} + %% ), + Result + end. + +connect(Opts) -> + Options = proplists:get_value(options, Opts), + emqx_bridge_syskeeper_client:start_link(Options). + +parse_template(Config) -> + Templates = + case maps:get(template, Config, undefined) of + undefined -> #{}; + <<>> -> #{}; + Template -> #{send_message => Template} + end, + + parse_template(maps:to_list(Templates), #{}). + +parse_template([{Key, H} | T], Templates) -> + ParamsTks = emqx_placeholder:preproc_tmpl(H), + parse_template( + T, + Templates#{Key => ParamsTks} + ); +parse_template([], Templates) -> + Templates. + +try_apply_template([{Type, _} | _] = Datas, #{templates := Templates} = State) -> + case maps:find(Type, Templates) of + {ok, Template} -> + {ok, apply_template(Datas, Template, State)}; + _ -> + {error, {unrecoverable_error, {invalid_request, Datas}}} + end. + +apply_template(Datas, Template, State) -> + lists:map( + fun({_, Data}) -> + do_apply_template(Data, Template, State) + end, + Datas + ). + +do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ + target_qos := TargetQoS, target_topic_tks := TargetTopicTks +}) -> + Msg = maps:with([qos, flags, topic, payload, timestamp], Data), + Topic = emqx_placeholder:proc_tmpl(TargetTopicTks, Msg), + Msg#{ + id => emqx_guid:from_hexstr(Id), + qos := + case TargetQoS of + -1 -> + QoS; + _ -> + TargetQoS + end, + from => From, + topic := Topic, + payload := format_data(Template, Msg) + }. + +format_data([], Msg) -> + emqx_utils_json:encode(Msg); +format_data(Tokens, Msg) -> + emqx_placeholder:proc_tmpl(Tokens, Msg). + +redact(Data) -> + emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl new file mode 100644 index 0000000000..d2f8febb94 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl @@ -0,0 +1,163 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% @doc EMQ X Bridge Sysk Frame +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_frame). + +%% API +-export([ + versions/0, + current_version/0, + make_state_with_conf/1, + make_state/1, + encode/3, + parse/2, + parse_handshake/1 +]). + +-export([ + bool2int/1, + int2bool/1, + marshaller/1, + serialize_variable_byte_integer/1, + parse_variable_byte_integer/1 +]). + +-export_type([state/0, versions/0, handshake/0, forward/0, packet/0]). + +-include("emqx_bridge_syskeeper.hrl"). + +-type state() :: #{ + handler := atom(), + version := versions(), + ack => boolean() +}. + +-type versions() :: 1. + +-type handshake() :: #{type := handshake, version := versions()}. +-type forward() :: #{type := forward, ack := boolean(), messages := list(map())}. +-type heartbeat() :: #{type := heartbeat}. + +-type packet() :: + handshake() + | forward() + | heartbeat(). + +-callback version() -> versions(). +-callback encode(packet_type_val(), packet_data(), state()) -> binary(). +-callback parse(packet_type(), binary(), state()) -> packet(). + +-define(HIGHBIT, 2#10000000). +-define(LOWBITS, 2#01111111). +-define(MULTIPLIER_MAX, 16#200000). + +-export_type([packet_type/0]). + +%%------------------------------------------------------------------- +%%% API +%%------------------------------------------------------------------- +-spec versions() -> list(versions()). +versions() -> + [1]. + +-spec current_version() -> versions(). +current_version() -> + 1. + +-spec make_state_with_conf(map()) -> state(). +make_state_with_conf(#{ack_mode := Mode}) -> + State = make_state(current_version()), + State#{ack => Mode =:= need_ack}. + +-spec make_state(versions()) -> state(). +make_state(Version) -> + case lists:member(Version, versions()) of + true -> + Handler = erlang:list_to_existing_atom( + io_lib:format("emqx_bridge_syskeeper_frame_v~B", [Version]) + ), + #{ + handler => Handler, + version => Version + }; + _ -> + erlang:throw({unsupport_version, Version}) + end. + +-spec encode(packet_type(), term(), state()) -> binary(). +encode(Type, Data, #{handler := Handler} = State) -> + Handler:encode(packet_type_val(Type), Data, State). + +-spec parse(binary(), state()) -> _. +parse(<> = Bin, #{handler := Handler} = State) -> + Type = to_packet_type(TypeVal), + Handler:parse(Type, Bin, State). + +parse_handshake(Data) -> + State = make_state(1), + parse_handshake(Data, State). + +parse_handshake(Data, #{version := Version} = State) -> + case parse(Data, State) of + {ok, #{type := handshake, version := Version} = Shake} -> + {ok, {State, Shake}}; + {ok, #{type := handshake, version := NewVersion}} -> + State2 = make_state(NewVersion), + parse_handshake(Data, State2); + Error -> + Error + end. + +bool2int(true) -> + 1; +bool2int(_) -> + 0. + +int2bool(1) -> + true; +int2bool(_) -> + false. + +marshaller(Item) when is_binary(Item) -> + erlang:binary_to_term(Item); +marshaller(Item) -> + erlang:term_to_binary(Item). + +serialize_variable_byte_integer(N) when N =< ?LOWBITS -> + <<0:1, N:7>>; +serialize_variable_byte_integer(N) -> + <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. + +parse_variable_byte_integer(Bin) -> + parse_variable_byte_integer(Bin, 1, 0). + +%%------------------------------------------------------------------- +%%% Internal functions +%%------------------------------------------------------------------- +to_packet_type(?TYPE_HANDSHAKE) -> + handshake; +to_packet_type(?TYPE_FORWARD) -> + forward; +to_packet_type(?TYPE_HEARTBEAT) -> + heartbeat. + +packet_type_val(handshake) -> + ?TYPE_HANDSHAKE; +packet_type_val(forward) -> + ?TYPE_FORWARD; +packet_type_val(heartbeat) -> + ?TYPE_HEARTBEAT. + +parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) when + Multiplier > ?MULTIPLIER_MAX +-> + {error, malformed_variable_byte_integer}; +parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> + parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); +parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> + {ok, Value + Len * Multiplier, Rest}; +parse_variable_byte_integer(<<>>, _Multiplier, _Value) -> + {error, incomplete}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl new file mode 100644 index 0000000000..b1c35c68b6 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl @@ -0,0 +1,70 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% @doc EMQ X Bridge Sysk Frame version 1 +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_frame_v1). + +%% API +-export([ + version/0, + encode/3, + parse/3 +]). + +-behaviour(emqx_bridge_syskeeper_frame). + +-include("emqx_bridge_syskeeper.hrl"). + +-define(B2I(X), emqx_bridge_syskeeper_frame:bool2int((X))). +-define(I2B(X), emqx_bridge_syskeeper_frame:int2bool((X))). + +-import(emqx_bridge_syskeeper_frame, [ + serialize_variable_byte_integer/1, parse_variable_byte_integer/1, marshaller/1 +]). + +%%------------------------------------------------------------------- +%%% API +%%------------------------------------------------------------------- +version() -> + 1. + +encode(?TYPE_HANDSHAKE = Type, _, _) -> + Version = version(), + <>; +encode(?TYPE_FORWARD = Type, Messages, #{ack := Ack}) -> + encode_forward(Messages, Type, Ack); +encode(?TYPE_HEARTBEAT = Type, _, _) -> + <>. + +-dialyzer({nowarn_function, parse/3}). +parse(handshake, <<_:4, _:4, Version:8>>, _) -> + {ok, #{type => handshake, version => Version}}; +parse(forward, Bin, _) -> + parse_forward(Bin); +parse(heartbeat, <<_:4, _:4>>, _) -> + {ok, #{type => heartbeat}}. + +%%------------------------------------------------------------------- +%%% Internal functions +%%------------------------------------------------------------------- +encode_forward(Messages, Type, Ack) -> + AckVal = ?B2I(Ack), + Data = marshaller(Messages), + Len = erlang:byte_size(Data), + LenVal = serialize_variable_byte_integer(Len), + <>. + +parse_forward(<<_:4, AckVal:4, Bin/binary>>) -> + case parse_variable_byte_integer(Bin) of + {ok, Len, Rest} -> + <> = Rest, + {ok, #{ + type => forward, + ack => ?I2B(AckVal), + messages => emqx_bridge_syskeeper_frame:marshaller(MsgBin) + }}; + Error -> + Error + end. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl new file mode 100644 index 0000000000..fcdcbac858 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl @@ -0,0 +1,100 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_syskeeper_proxy). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(SYSKEEPER_HOST_OPTIONS, #{ + default_port => 9092 +}). + +%% ------------------------------------------------------------------------------------------------- +%% api +conn_bridge_examples(Method) -> + [ + #{ + <<"syskeeper_proxy">> => #{ + summary => <<"Syskeeper Bridge Proxy">>, + value => values(Method) + } + } + ]. + +values(_Method) -> + #{ + enable => true, + type => syskeeper_proxy, + name => <<"foo">>, + listen => <<"127.0.0.1:9092">>, + acceptors => 16, + handshake_timeout => <<"16s">> + }. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_syskeeper_proxy". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {listen, listen()}, + {acceptors, + mk( + non_neg_integer(), + #{desc => ?DESC("acceptors"), default => 16} + )}, + {handshake_timeout, + mk( + emqx_schema:timeout_duration_ms(), + #{desc => ?DESC(handshake_timeout), default => <<"10s">>} + )} + ]; +fields("creation_opts") -> + emqx_resource_schema:create_opts([{worker_pool_size, #{default => 1}}]); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +listen() -> + Meta = #{desc => ?DESC("listen")}, + emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([syskeeper_proxy]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl new file mode 100644 index 0000000000..50a49a0f38 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl @@ -0,0 +1,251 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_proxy_server). + +-behaviour(gen_statem). + +-include_lib("emqx/include/logger.hrl"). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% `emqx_resource' API +-export([ + query_mode/1, + on_start/2, + on_stop/2, + on_get_status/2 +]). + +%% API +-export([start_link/3]). + +%% gen_statem callbacks +-export([callback_mode/0, init/1, terminate/3, code_change/4]). +-export([handle_event/4]). + +-type state() :: wait_ready | handshake | running. +-type data() :: #{ + transport := atom(), + socket := inet:socket(), + frame_state := + undefined + | emqx_bridge_sysk_frame:state(), + buffer := binary(), + conf := map() +}. + +-define(DEFAULT_PORT, 9092). + +%% ------------------------------------------------------------------------------------------------- +%% emqx_resource + +query_mode(_) -> + no_queries. + +on_start( + InstanceId, + #{ + listen := Server, + acceptors := Acceptors + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_syskeeper_connector", + connector => InstanceId, + config => Config + }), + + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{ + default_port => ?DEFAULT_PORT + }), + ListenOn = {Host, Port}, + + Options = [ + {acceptors, Acceptors}, + {tcp_options, [{mode, binary}, {reuseaddr, true}, {nodelay, true}]} + ], + MFArgs = {?MODULE, start_link, [maps:with([handshake_timeout], Config)]}, + ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn), + + case esockd:open(?MODULE, ListenOn, Options, MFArgs) of + {ok, _} -> + {ok, #{listen_on => ListenOn}}; + Error -> + Error + end. + +on_stop(InstanceId, _State) -> + ?SLOG(info, #{ + msg => "stopping_syskeeper_connector", + connector => InstanceId + }), + case emqx_resource:get_allocated_resources(InstanceId) of + #{listen_on := ListenOn} -> + esockd:close(?MODULE, ListenOn); + _ -> + ok + end. + +on_get_status(_InstanceId, #{listen_on := ListenOn}) -> + try + _ = esockd:listener({?MODULE, ListenOn}), + connected + catch + _:_ -> + disconnected + end. + +%% ------------------------------------------------------------------------------------------------- +-spec start_link(atom(), inet:socket(), map()) -> + {ok, Pid :: pid()} + | ignore + | {error, Error :: term()}. +start_link(Transport, Socket, Conf) -> + gen_statem:start_link(?MODULE, [Transport, Socket, Conf], []). + +%% ------------------------------------------------------------------------------------------------- +%% gen_statem callbacks + +-spec callback_mode() -> gen_statem:callback_mode_result(). +callback_mode() -> handle_event_function. + +%% ------------------------------------------------------------------------------------------------- +-spec init(Args :: term()) -> + gen_statem:init_result(term()). +init([Transport, Socket, Conf]) -> + {ok, wait_ready, + #{ + transport => Transport, + socket => Socket, + conf => Conf, + buffer => <<>>, + frame_state => undefined + }, + {next_event, internal, wait_ready}}. + +handle_event(internal, wait_ready, wait_ready, Data) -> + wait_ready(Data); +handle_event(state_timeout, handshake_timeout, handshake, _Data) -> + %% ?LOG(error, "Handshake tiemout~n", []), + {stop, normal}; +handle_event(internal, try_parse, running, Data) -> + try_parse(running, Data); +handle_event(info, {tcp, _Socket, Bin}, State, Data) -> + try_parse(State, combine_buffer(Bin, Data)); +handle_event(info, {tcp_closed, _}, _State, _Data) -> + {stop, normal}; +handle_event(info, {tcp_error, _, _Reason}, _State, _Data) -> + %% ?LOG(warning, "TCP error, reason:~p~n", [Reason]), + {stop, normal}; +handle_event(_Event, _Content, _State, _Data) -> + %% ?LOG(warning, "Unexpected event:~p, Context:~p, State:~p~n", [Event, Content, State]), + keep_state_and_data. + +-spec terminate(Reason :: term(), State :: state(), Data :: data()) -> + any(). +terminate(_Reason, _State, _Data) -> + ok. + +code_change(_OldVsn, State, Data, _Extra) -> + {ok, State, Data}. + +%% ------------------------------------------------------------------------------------------------- +%%% Internal functions +send(#{transport := Transport, socket := Socket}, Bin) -> + Transport:send(Socket, Bin). + +ack(Data) -> + ack(Data, true). + +ack(Data, false) -> + send(Data, <<0>>); +ack(Data, true) -> + send(Data, <<16#FF>>). + +wait_ready( + #{ + transport := Transport, + socket := RawSocket, + conf := #{handshake_timeout := Timeout} + } = + Data +) -> + case Transport:wait(RawSocket) of + {ok, Socket} -> + Transport:setopts(Socket, [{active, true}]), + {next_state, handshake, + Data#{ + socket => Socket, + frame_state => undefined + }, + {state_timeout, Timeout, handshake_timeout}}; + {error, Reason} -> + ok = Transport:fast_close(RawSocket), + {stop, Reason} + end. + +combine_buffer(Bin, #{buffer := Buffer} = Data) -> + Data#{buffer := <>}. + +try_parse(State, #{buffer := Bin} = Data) -> + case emqx_bridge_syskeeper_frame:parse_variable_byte_integer(Bin) of + {ok, Len, Rest} -> + case Rest of + <> -> + Data2 = Data#{buffer := Rest2}, + Result = parse(Payload, Data2), + handle_parse_result(Result, State, Data2); + _ -> + {keep_state, Data} + end; + {error, incomplete} -> + {keep_state, Data}; + {error, _Reason} -> + %% ?LOG(warning, "Parse error, reason:~p, buffer:~p~n", [Reason, Bin]), + {stop, parse_error} + end. + +%% maybe handshake +parse(Bin, #{frame_state := undefined}) -> + emqx_bridge_syskeeper_frame:parse_handshake(Bin); +parse(Bin, #{frame_state := State}) -> + emqx_bridge_syskeeper_frame:parse(Bin, State). + +do_forward(Ack, Messages, Data) -> + lists:foreach( + fun(Message) -> + Msg = emqx_message:from_map(Message#{headers => #{}, extra => #{}}), + _ = emqx_broker:safe_publish(Msg) + end, + Messages + ), + case Ack of + true -> + ack(Data); + _ -> + ok + end. + +handle_parse_result({ok, Msg}, State, Data) -> + handle_packet(Msg, State, Data); +handle_parse_result({error, _Reason} = Error, State, Data) -> + handle_parse_error(Error, State, #{buffer := _Bin} = Data), + %% ?LOG(warning, "Parse error, state:~p, reason:~p, buffer:~p~n", [State, Reason, Bin]), + {stop, parse_error}. + +handle_parse_error(_, handshake, Data) -> + ack(Data, false); +handle_parse_error(_, _, _) -> + ok. + +handle_packet({FrameState, _Shake}, handshake, Data) -> + ack(Data), + {next_state, running, Data#{frame_state := FrameState}, {next_event, internal, try_parse}}; +handle_packet(#{type := forward, ack := Ack, messages := Messages}, running, Data) -> + do_forward(Ack, Messages, Data), + try_parse(running, Data); +handle_packet(#{type := heartbeat}, running, Data) -> + ack(Data), + try_parse(running, Data). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index bb9fc91a6c..190ef1afa6 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -127,7 +127,8 @@ emqx_dashboard_sso, emqx_audit, emqx_gateway_gbt32960, - emqx_gateway_ocpp + emqx_gateway_ocpp, + emqx_bridge_syskeeper ], %% must always be of type `load' ce_business_apps => diff --git a/mix.exs b/mix.exs index 1e6b37d18c..23cef80e3e 100644 --- a/mix.exs +++ b/mix.exs @@ -217,7 +217,8 @@ defmodule EMQXUmbrella.MixProject do :emqx_dashboard_sso, :emqx_audit, :emqx_gateway_gbt32960, - :emqx_gateway_ocpp + :emqx_gateway_ocpp, + :emqx_bridge_syskeeper ]) end diff --git a/rebar.config.erl b/rebar.config.erl index 54ce0d6c31..93ad9fa992 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -113,6 +113,7 @@ is_community_umbrella_app("apps/emqx_dashboard_sso") -> false; is_community_umbrella_app("apps/emqx_audit") -> false; is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false; is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false; +is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false; is_community_umbrella_app(_) -> true. is_jq_supported() -> diff --git a/rel/i18n/emqx_bridge_syskeeper.hocon b/rel/i18n/emqx_bridge_syskeeper.hocon new file mode 100644 index 0000000000..7cf1953179 --- /dev/null +++ b/rel/i18n/emqx_bridge_syskeeper.hocon @@ -0,0 +1,45 @@ +emqx_bridge_syskeeper { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +desc_config.desc: +"""Configuration for a Syskeeper bridge""" + +desc_config.label: +"""Syskeeper Bridge Configuration""" + +desc_name.desc: +"""Bridge name.""" + +desc_name.label: +"""Bridge Name""" + +desc_type.desc: +"""The Bridge Type""" + +desc_type.label: +"""Bridge Type""" + +template.desc: +"""Template""" + +template.label: +"""Template""" + +target_topic.desc: +"""The topic for the forwarded message""" + +target_topic.label: +"""Target Topic""" + +target_qos.desc: +"""The QoS for the forwarded message, -1 is for the original topic""" + +target_qos.label: +"""Target QoS""" + +} diff --git a/rel/i18n/emqx_bridge_syskeeper_connector.hocon b/rel/i18n/emqx_bridge_syskeeper_connector.hocon new file mode 100644 index 0000000000..a057e6647e --- /dev/null +++ b/rel/i18n/emqx_bridge_syskeeper_connector.hocon @@ -0,0 +1,21 @@ +emqx_bridge_syskeeper_connector { + +server.desc: +"""The address of the Syskeeper proxy server""" + +server.label: +"""Server""" + +ack_mode.desc: +"""Specify whether the proxy server should reply with an acknowledgement for the message forwarding, can be:
- need_ack
- no_ack
""" + +ack_mode.label: +"""Acknowledgement Mode""" + +ack_timeout.desc: +"""The maximum time to wait for an acknowledgement from the proxy server""" + +ack_timeout.label: +"""Acknowledgement Timeout""" + +} diff --git a/rel/i18n/emqx_bridge_syskeeper_proxy.hocon b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon new file mode 100644 index 0000000000..f6c519216a --- /dev/null +++ b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon @@ -0,0 +1,45 @@ +emqx_bridge_syskeeper_proxy { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +desc_config.desc: +"""Configuration for a Syskeeper proxy bridge""" + +desc_config.label: +"""Syskeeper Proxy Bridge Configuration""" + +desc_name.desc: +"""Bridge name""" + +desc_name.label: +"""Bridge Name""" + +desc_type.desc: +"""The Bridge Type""" + +desc_type.label: +"""Bridge Type""" + +listen.desc: +"""The listening address for this Syskeeper proxy server""" + +listen.label: +"""Listen Address""" + +acceptors.desc: +"""The number of the acceptors""" + +acceptors.label: +"""Acceptors""" + +handshake_timeout.desc: +"""The maximum to wait for the handshake when a connection is created""" + +handshake_timeout.label: +"""Handshake Timeout""" + +} From e93216fa628b046d2925432a158792bdf6537eca Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 10 Nov 2023 09:52:32 +0800 Subject: [PATCH 2/5] chore(sysk): fix spellchecks & update change & more logs --- .../src/emqx_bridge_syskeeper_connector.erl | 25 ++++----- .../src/emqx_bridge_syskeeper_frame_v1.erl | 2 +- .../emqx_bridge_syskeeper_proxy_server.erl | 52 ++++++++++++++----- changes/ee/feat-11795.en.md | 1 + scripts/spellcheck/dicts/emqx.txt | 1 + 5 files changed, 54 insertions(+), 27 deletions(-) create mode 100644 changes/ee/feat-11795.en.md diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 219d4d0d23..8088dcdd01 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -87,7 +87,7 @@ on_start( ?SLOG(info, #{ msg => "starting_syskeeper_connector", connector => InstanceId, - config => redact(Config) + config => Config }), HostCfg = emqx_schema:parse_server(Server, ?SYSKEEPER_HOST_OPTIONS), @@ -175,12 +175,12 @@ do_query( syskeeper_connector_query_return, #{error => Reason} ), - %% ?SLOG(error, #{ - %% msg => "syskeeper_connector_do_query_failed", - %% connector => InstanceId, - %% query => Query, - %% reason => Reason - %% }), + ?SLOG(error, #{ + msg => "syskeeper_connector_do_query_failed", + connector => InstanceId, + query => Query, + reason => Reason + }), case Reason of ecpool_empty -> {error, {recoverable_error, Reason}}; @@ -188,10 +188,10 @@ do_query( Result end; _ -> - %% ?tp( - %% syskeeper_connector_query_return, - %% #{result => Result} - %% ), + ?tp( + syskeeper_connector_query_return, + #{result => Result} + ), Result end. @@ -257,6 +257,3 @@ format_data([], Msg) -> emqx_utils_json:encode(Msg); format_data(Tokens, Msg) -> emqx_placeholder:proc_tmpl(Tokens, Msg). - -redact(Data) -> - emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl index b1c35c68b6..200730659a 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl @@ -63,7 +63,7 @@ parse_forward(<<_:4, AckVal:4, Bin/binary>>) -> {ok, #{ type => forward, ack => ?I2B(AckVal), - messages => emqx_bridge_syskeeper_frame:marshaller(MsgBin) + messages => marshaller(MsgBin) }}; Error -> Error diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl index 50a49a0f38..057d7579c5 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl @@ -52,7 +52,7 @@ on_start( } = Config ) -> ?SLOG(info, #{ - msg => "starting_syskeeper_connector", + msg => "starting_syskeeper_proxy_server", connector => InstanceId, config => Config }), @@ -78,7 +78,7 @@ on_start( on_stop(InstanceId, _State) -> ?SLOG(info, #{ - msg => "stopping_syskeeper_connector", + msg => "stopping_syskeeper_proxy_server", connector => InstanceId }), case emqx_resource:get_allocated_resources(InstanceId) of @@ -127,8 +127,11 @@ init([Transport, Socket, Conf]) -> handle_event(internal, wait_ready, wait_ready, Data) -> wait_ready(Data); -handle_event(state_timeout, handshake_timeout, handshake, _Data) -> - %% ?LOG(error, "Handshake tiemout~n", []), +handle_event(state_timeout, handshake_timeout, handshake, Data) -> + ?SLOG(info, #{ + msg => "syskeeper_proxy_server_handshake_timeout", + data => Data + }), {stop, normal}; handle_event(internal, try_parse, running, Data) -> try_parse(running, Data); @@ -136,11 +139,21 @@ handle_event(info, {tcp, _Socket, Bin}, State, Data) -> try_parse(State, combine_buffer(Bin, Data)); handle_event(info, {tcp_closed, _}, _State, _Data) -> {stop, normal}; -handle_event(info, {tcp_error, _, _Reason}, _State, _Data) -> - %% ?LOG(warning, "TCP error, reason:~p~n", [Reason]), +handle_event(info, {tcp_error, Error, Reason}, _State, _Data) -> + ?SLOG(warning, #{ + msg => "syskeeper_proxy_server_tcp_error", + error => Error, + reason => Reason + }), {stop, normal}; -handle_event(_Event, _Content, _State, _Data) -> - %% ?LOG(warning, "Unexpected event:~p, Context:~p, State:~p~n", [Event, Content, State]), +handle_event(Event, Content, State, Data) -> + ?SLOG(warning, #{ + msg => "syskeeper_proxy_server_unexpected_event", + event => Event, + content => Content, + state => State, + data => Data + }), keep_state_and_data. -spec terminate(Reason :: term(), State :: state(), Data :: data()) -> @@ -183,6 +196,11 @@ wait_ready( {state_timeout, Timeout, handshake_timeout}}; {error, Reason} -> ok = Transport:fast_close(RawSocket), + ?SLOG(error, #{ + msg => "syskeeper_proxy_server_listen_error", + transport => Transport, + reason => Reason + }), {stop, Reason} end. @@ -202,8 +220,13 @@ try_parse(State, #{buffer := Bin} = Data) -> end; {error, incomplete} -> {keep_state, Data}; - {error, _Reason} -> - %% ?LOG(warning, "Parse error, reason:~p, buffer:~p~n", [Reason, Bin]), + {error, Reason} -> + ?SLOG(error, #{ + msg => "syskeeper_proxy_server_try_parse_error", + state => State, + data => Data, + reason => Reason + }), {stop, parse_error} end. @@ -230,9 +253,14 @@ do_forward(Ack, Messages, Data) -> handle_parse_result({ok, Msg}, State, Data) -> handle_packet(Msg, State, Data); -handle_parse_result({error, _Reason} = Error, State, Data) -> +handle_parse_result({error, Reason} = Error, State, Data) -> handle_parse_error(Error, State, #{buffer := _Bin} = Data), - %% ?LOG(warning, "Parse error, state:~p, reason:~p, buffer:~p~n", [State, Reason, Bin]), + ?SLOG(error, #{ + msg => "syskeeper_proxy_server_parse_result_error", + state => State, + data => Data, + reason => Reason + }), {stop, parse_error}. handle_parse_error(_, handshake, Data) -> diff --git a/changes/ee/feat-11795.en.md b/changes/ee/feat-11795.en.md new file mode 100644 index 0000000000..e8ac13f669 --- /dev/null +++ b/changes/ee/feat-11795.en.md @@ -0,0 +1 @@ +Integrated Nari Syskeeper 2000 as a new bridge backend. diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index a3dd4a00b3..5630404c35 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -294,3 +294,4 @@ OCPP dnstream upstream priv +Syskeeper From abc7233a778c0dcfc96f9678085d4b0acdc24370 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 23 Oct 2023 18:09:12 +0800 Subject: [PATCH 3/5] test(sysk): add test suite for syskeeper --- .../src/emqx_bridge_syskeeper_connector.erl | 31 +- .../test/emqx_bridge_syskeeper_SUITE.erl | 290 ++++++++++++++++++ 2 files changed, 312 insertions(+), 9 deletions(-) create mode 100644 apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 8088dcdd01..3a60519ffa 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -221,25 +221,30 @@ parse_template([], Templates) -> try_apply_template([{Type, _} | _] = Datas, #{templates := Templates} = State) -> case maps:find(Type, Templates) of {ok, Template} -> - {ok, apply_template(Datas, Template, State)}; + apply_template(Datas, Template, State); _ -> {error, {unrecoverable_error, {invalid_request, Datas}}} end. apply_template(Datas, Template, State) -> - lists:map( - fun({_, Data}) -> - do_apply_template(Data, Template, State) - end, - Datas - ). + apply_template(Datas, Template, State, []). + +apply_template([{_, Data} | T], Template, State, Acc) -> + case do_apply_template(Data, Template, State) of + {ok, Msg} -> + apply_template(T, Template, State, [Msg | Acc]); + Error -> + Error + end; +apply_template([], _Template, _State, Acc) -> + {ok, lists:reverse(Acc)}. do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ target_qos := TargetQoS, target_topic_tks := TargetTopicTks }) -> Msg = maps:with([qos, flags, topic, payload, timestamp], Data), Topic = emqx_placeholder:proc_tmpl(TargetTopicTks, Msg), - Msg#{ + {ok, Msg#{ id => emqx_guid:from_hexstr(Id), qos := case TargetQoS of @@ -251,7 +256,15 @@ do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ from => From, topic := Topic, payload := format_data(Template, Msg) - }. + }}; +do_apply_template(Data, Template, State) -> + ?SLOG(info, #{ + msg => "syskeeper_connector_apply_template_error", + data => Data, + template => Template, + state => State + }), + {error, {unrecoverable_error, {invalid_data, Data}}}. format_data([], Msg) -> emqx_utils_json:encode(Msg); diff --git a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl new file mode 100644 index 0000000000..21886a90c9 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl @@ -0,0 +1,290 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(HOST, "127.0.0.1"). +-define(PORT, 9092). +-define(ACK_TIMEOUT, <<"3s">>). +-define(SYSKEEPER_NAME, <<"syskeeper">>). +-define(SYSKEEPER_PROXY_NAME, <<"syskeeper_proxy">>). +-define(BATCH_SIZE, 3). +-define(TOPIC, <<"syskeeper/message">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, lifecycle}, + {group, need_ack}, + {group, no_ack} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + Lifecycle = [t_setup_via_config, t_setup_via_http_api, t_get_status], + Write = TCs -- Lifecycle, + BatchingGroups = [{group, with_batch}, {group, without_batch}], + [ + {need_ack, BatchingGroups}, + {no_ack, BatchingGroups}, + {with_batch, Write}, + {without_batch, Write}, + {lifecycle, Lifecycle} + ]. + +init_per_group(need_ack, Config) -> + [{ack_mode, need_ack} | Config]; +init_per_group(no_ack, Config) -> + [{ack_mode, no_ack} | Config]; +init_per_group(with_batch, Config0) -> + [{enable_batch, true} | Config0]; +init_per_group(without_batch, Config0) -> + [{enable_batch, false} | Config0]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, emqx_bridge_syskeeper]), + _ = emqx_bridge_enterprise:module_info(), + emqx_mgmt_api_test_util:init_suite(), + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]). + +init_per_testcase(_Testcase, Config) -> + snabbkaffe:start_trace(), + Config. + +end_per_testcase(_Testcase, _Config) -> + ok = snabbkaffe:stop(), + delete_bridge(syskeeper, ?SYSKEEPER_NAME), + delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ +syskeeper_config(Config) -> + AckMode = proplists:get_value(ack_mode, Config, no_ack), + BatchSize = + case proplists:get_value(enable_batch, Config, false) of + true -> ?BATCH_SIZE; + false -> 1 + end, + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " server = \"~ts\"\n" + " ack_mode = ~p\n" + " ack_timeout = \"~ts\"\n" + " resource_opts = {\n" + " request_ttl = 500ms\n" + " batch_size = ~b\n" + " }\n" + "}", + [ + syskeeper, + ?SYSKEEPER_NAME, + server(), + AckMode, + ?ACK_TIMEOUT, + BatchSize + ] + ), + {?SYSKEEPER_NAME, parse_and_check(ConfigString, syskeeper, ?SYSKEEPER_NAME)}. + +syskeeper_proxy_config(_Config) -> + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " listen = \"~ts\"\n" + " acceptors = 1\n" + "}", + [ + syskeeper_proxy, + ?SYSKEEPER_PROXY_NAME, + server() + ] + ), + {?SYSKEEPER_PROXY_NAME, parse_and_check(ConfigString, syskeeper_proxy, ?SYSKEEPER_PROXY_NAME)}. + +parse_and_check(ConfigString, BridgeType0, Name) -> + BridgeType = to_bin(BridgeType0), + ct:pal("ConfigString:~ts~n", [ConfigString]), + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Type, Name, Conf) -> + emqx_bridge:create(Type, Name, Conf). + +delete_bridge(Type, Name) -> + emqx_bridge:remove(Type, Name). + +create_both_bridge(Config) -> + {ProxyName, ProxyConf} = syskeeper_proxy_config(Config), + ?assertMatch( + {ok, _}, + create_bridge(syskeeper_proxy, ProxyName, ProxyConf) + ), + {Name, Conf} = syskeeper_config(Config), + ?assertMatch( + {ok, _}, + create_bridge(syskeeper, Name, Conf) + ). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(_Config, Payload) -> + Name = ?SYSKEEPER_NAME, + BridgeType = syskeeper, + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), + emqx_bridge:send_message(BridgeID, Payload). + +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8); +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); +to_bin(Bin) when is_binary(Bin) -> + Bin. + +to_str(Atom) when is_atom(Atom) -> + erlang:atom_to_list(Atom). + +server() -> + erlang:iolist_to_binary(io_lib:format("~ts:~B", [?HOST, ?PORT])). + +make_message() -> + Message = emqx_message:make(?MODULE, ?TOPIC, ?SYSKEEPER_NAME), + Id = emqx_guid:to_hexstr(emqx_guid:gen()), + From = emqx_message:from(Message), + Msg = emqx_message:to_map(Message), + Msg#{id => Id, clientid => From}. + +receive_msg() -> + receive + {deliver, ?TOPIC, Msg} -> + {ok, Msg} + after 500 -> + {error, no_message} + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ +t_setup_via_config(Config) -> + {Name, Conf} = syskeeper_proxy_config(Config), + ?assertMatch( + {ok, _}, + create_bridge(syskeeper_proxy, Name, Conf) + ), + ?assertMatch( + X when is_pid(X), + esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) + ), + delete_bridge(syskeeper_proxy, Name), + ?assertError( + not_found, + esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) + ). + +t_setup_via_http_api(Config) -> + {Name, ProxyConf0} = syskeeper_proxy_config(Config), + ProxyConf = ProxyConf0#{ + <<"name">> => Name, + <<"type">> => syskeeper_proxy + }, + ?assertMatch( + {ok, _}, + create_bridge_http(ProxyConf) + ), + + ?assertMatch( + X when is_pid(X), + esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) + ), + + delete_bridge(syskeeper_proxy, Name), + + ?assertError( + not_found, + esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) + ). + +t_get_status(Config) -> + create_both_bridge(Config), + SyskeeperId = emqx_bridge_resource:resource_id(syskeeper, ?SYSKEEPER_NAME), + ProxyId = emqx_bridge_resource:resource_id(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(SyskeeperId)), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ProxyId)), + delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + ?retry( + _Sleep = 500, + _Attempts = 10, + ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(SyskeeperId)) + ). + +t_write_failure(Config) -> + create_both_bridge(Config), + delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + SentData = make_message(), + Result = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch({{error, {resource_error, _}}, _}, Result). + +t_invalid_data(Config) -> + create_both_bridge(Config), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, #{}), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch({error, {unrecoverable_error, {invalid_data, _}}}, Result). + +t_forward(Config) -> + emqx_broker:subscribe(?TOPIC), + create_both_bridge(Config), + SentData = make_message(), + {_, {ok, #{result := _Result}}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?retry( + 500, + 10, + ?assertMatch({ok, _}, receive_msg()) + ), + emqx_broker:unsubscribe(?TOPIC), + ok. From 548e39e799c23877f873e3e5622ddc55884c53b2 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 7 Nov 2023 10:27:18 +0800 Subject: [PATCH 4/5] refactor(sysk): refactor the syskeeper to v2 style --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 10 +- .../src/schema/emqx_bridge_enterprise.erl | 35 +--- .../src/schema/emqx_bridge_v2_enterprise.erl | 16 +- .../src/schema/emqx_bridge_v2_schema.erl | 7 +- .../src/emqx_bridge_syskeeper.erl | 62 +++--- .../src/emqx_bridge_syskeeper_connector.erl | 186 ++++++++++++------ .../src/emqx_bridge_syskeeper_proxy.erl | 46 +++-- .../test/emqx_bridge_syskeeper_SUITE.erl | 185 ++++++++++++----- .../src/schema/emqx_connector_ee_schema.erl | 30 ++- .../src/schema/emqx_connector_schema.erl | 4 +- .../emqx_bridge_syskeeper_connector.hocon | 12 ++ rel/i18n/emqx_bridge_syskeeper_proxy.hocon | 8 +- 12 files changed, 414 insertions(+), 187 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 5e42b4881b..c8aff2f8a7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -804,7 +804,9 @@ bridge_v2_type_to_connector_type(kafka) -> bridge_v2_type_to_connector_type(kafka_producer) -> kafka_producer; bridge_v2_type_to_connector_type(azure_event_hub_producer) -> - azure_event_hub_producer. + azure_event_hub_producer; +bridge_v2_type_to_connector_type(syskeeper_forwarder) -> + syskeeper_forwarder. %%==================================================================== %% Data backup API @@ -1031,7 +1033,9 @@ bridge_v1_type_to_bridge_v2_type(kafka) -> bridge_v1_type_to_bridge_v2_type(kafka_producer) -> kafka_producer; bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) -> - azure_event_hub_producer. + azure_event_hub_producer; +bridge_v1_type_to_bridge_v2_type(syskeeper_forwarder) -> + syskeeper_forwarder. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 @@ -1044,6 +1048,8 @@ is_bridge_v2_type(<<"kafka">>) -> true; is_bridge_v2_type(<<"azure_event_hub_producer">>) -> true; +is_bridge_v2_type(<<"syskeeper_forwarder">>) -> + true; is_bridge_v2_type(_) -> false. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index a160ecd336..93951cca08 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -50,9 +50,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method), api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_grpc_v1"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer"), - api_ref(emqx_bridge_syskeeper, <<"syskeeper">>, Method), - api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer") ]. schema_modules() -> @@ -80,9 +78,7 @@ schema_modules() -> emqx_bridge_rabbitmq, emqx_bridge_kinesis, emqx_bridge_greptimedb, - emqx_bridge_azure_event_hub, - emqx_bridge_syskeeper, - emqx_bridge_syskeeper_proxy + emqx_bridge_azure_event_hub ]. examples(Method) -> @@ -130,9 +126,7 @@ resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; %% We use AEH's Kafka interface. -resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; -resource_type(syskeeper) -> emqx_bridge_syskeeper_connector; -resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server. +resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer. %% For bridges that need to override connector configurations. bridge_impl_module(BridgeType) when is_binary(BridgeType) -> @@ -221,8 +215,7 @@ fields(bridges) -> influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++ - kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs() ++ - syskeeper_structs(). + kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs(). mongodb_structs() -> [ @@ -435,26 +428,6 @@ azure_event_hub_structs() -> )} ]. -syskeeper_structs() -> - [ - {syskeeper, - mk( - hoconsc:map(name, ref(emqx_bridge_syskeeper, "config")), - #{ - desc => <<"Syskeeper bridge config ">>, - required => false - } - )}, - {syskeeper_proxy, - mk( - hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, "config")), - #{ - desc => <<"Syskeeper proxy server config">>, - required => false - } - )} - ]. - api_ref(Module, Type, Method) -> {Type, ref(Module, Method)}. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl index 54448f07d3..ac07135451 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl @@ -28,7 +28,8 @@ examples(Method) -> schema_modules() -> [ emqx_bridge_kafka, - emqx_bridge_azure_event_hub + emqx_bridge_azure_event_hub, + emqx_bridge_syskeeper ]. fields(actions) -> @@ -51,13 +52,24 @@ action_structs() -> desc => <<"Azure Event Hub Actions Config">>, required => false } + )}, + {syskeeper_forwarder, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper, config)), + #{ + desc => <<"Syskeeper forwarder Bridge V2 Config">>, + required => false + } )} ]. api_schemas(Method) -> [ api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2") + api_ref( + emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2" + ), + api_ref(emqx_bridge_syskeeper, <<"syskeeper_forwarder">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index d6d8eb9a10..f0973f260a 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -134,8 +134,11 @@ desc(_) -> schema_homogeneous_test() -> case lists:filtermap( - fun({_Name, Schema}) -> - is_bad_schema(Schema) + fun + ({syskeeper_forwarder, _Schema}) -> + false; + ({_Name, Schema}) -> + is_bad_schema(Schema) end, fields(actions) ) diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl index 9e0ec4eedd..9e520d095f 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl @@ -11,7 +11,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, + bridge_v2_examples/1, values/1 ]). @@ -24,35 +24,46 @@ %% ------------------------------------------------------------------------------------------------- %% api -conn_bridge_examples(Method) -> +bridge_v2_examples(Method) -> [ #{ - <<"syskeeper">> => #{ - summary => <<"Syskeeper Bridge">>, + <<"syskeeper_forwarder">> => #{ + summary => <<"Syskeeper Forwarder Bridge">>, value => values(Method) } } ]. -values(_Method) -> +values(get) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values(post) + ); +values(post) -> + maps:merge( + #{ + name => <<"syskeeper_forwarder">>, + type => <<"syskeeper_forwarder">> + }, + values(put) + ); +values(put) -> #{ enable => true, - type => syskeeper, - name => <<"foo">>, - server => <<"127.0.0.1:9092">>, - ack_mode => <<"no_ack">>, - ack_timeout => <<"10s">>, - pool_size => 16, + connector => <<"syskeeper_forwarder">>, target_topic => <<"${topic}">>, target_qos => <<"-1">>, template => <<"${payload}">>, resource_opts => #{ - worker_pool_size => 16, - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - batch_size => ?DEFAULT_BATCH_SIZE, - batch_time => ?DEFAULT_BATCH_TIME, - query_mode => sync, - max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + worker_pool_size => 16 } }. @@ -62,9 +73,14 @@ namespace() -> "bridge_syskeeper". roots() -> []. -fields("config") -> +fields(config) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, + {connector, + mk(binary(), #{ + desc => ?DESC(emqx_connector_schema, "connector_field"), required => true + })}, {target_topic, mk( binary(), @@ -89,17 +105,17 @@ fields("config") -> desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) } )} - ] ++ emqx_bridge_syskeeper_connector:fields(config); + ]; fields("creation_opts") -> emqx_resource_schema:create_opts([{request_ttl, #{default => infinity}}]); fields("post") -> - [type_field(), name_field() | fields("config")]; + [type_field(), name_field() | fields(config)]; fields("put") -> - fields("config"); + fields(config); fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"). -desc("config") -> +desc(config) -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Syskeeper using `", string:to_upper(Method), "` method."]; @@ -111,7 +127,7 @@ desc(_) -> %% ------------------------------------------------------------------------------------------------- type_field() -> - {type, mk(enum([syskeeper]), #{required => true, desc => ?DESC("desc_type")})}. + {type, mk(enum([syskeeper_forwarder]), #{required => true, desc => ?DESC("desc_type")})}. name_field() -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 3a60519ffa..5c52017fd4 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- @@ -12,7 +12,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --export([roots/0, fields/1]). +-export([roots/0, fields/1, desc/1, connector_examples/1]). %% `emqx_resource' API -export([ @@ -22,7 +22,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -37,6 +41,48 @@ -define(EXTRA_CALL_TIMEOUT, 2000). +%% ------------------------------------------------------------------------------------------------- +%% api +connector_examples(Method) -> + [ + #{ + <<"syskeeper_forwarder">> => #{ + summary => <<"Syskeeper Forwarder Connector">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values(post) + ); +values(post) -> + maps:merge( + #{ + name => <<"syskeeper_forwarder">>, + type => <<"syskeeper_forwarder">> + }, + values(put) + ); +values(put) -> + #{ + enable => true, + server => <<"127.0.0.1:9092">>, + ack_mode => <<"no_ack">>, + ack_timeout => <<"10s">>, + pool_size => 16 + }. + %% ------------------------------------------------------------------------------------------------- %% Hocon schema roots() -> @@ -44,6 +90,8 @@ roots() -> fields(config) -> [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, {server, server()}, {ack_mode, mk( @@ -61,12 +109,31 @@ fields(config) -> (Other) -> emqx_connector_schema_lib:pool_size(Other) end} - ]. + ]; +fields("post") -> + [type_field(), name_field() | fields(config)]; +fields("put") -> + fields(config); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). +type_field() -> + {type, mk(enum([syskeeper_forwarder]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + %% ------------------------------------------------------------------------------------------------- %% `emqx_resource' API @@ -79,9 +146,7 @@ on_start( #{ server := Server, pool_size := PoolSize, - ack_timeout := AckTimeout, - target_topic := TargetTopic, - target_qos := TargetQoS + ack_timeout := AckTimeout } = Config ) -> ?SLOG(info, #{ @@ -103,10 +168,8 @@ on_start( State = #{ pool_name => InstanceId, - target_qos => TargetQoS, ack_timeout => AckTimeout, - templates => parse_template(Config), - target_topic_tks => emqx_placeholder:preproc_tmpl(TargetTopic) + channels => #{} }, case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> @@ -122,13 +185,13 @@ on_stop(InstanceId, _State) -> }), emqx_resource_pool:stop(InstanceId). -on_query(InstanceId, {send_message, _} = Query, State) -> +on_query(InstanceId, {_MessageTag, _} = Query, State) -> do_query(InstanceId, [Query], State); on_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. %% we only support batch insert -on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> +on_batch_query(InstanceId, [{_MessageTag, _} | _] = Query, State) -> do_query(InstanceId, Query, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. @@ -143,13 +206,46 @@ status_result(true) -> connected; status_result(false) -> connecting; status_result({error, _}) -> connecting. +on_add_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId, #{ + target_topic := TargetTopic, + target_qos := TargetQoS, + template := Template +}) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + Channel = #{ + target_qos => TargetQoS, + target_topic => emqx_placeholder:preproc_tmpl(TargetTopic), + template => emqx_placeholder:preproc_tmpl(Template) + }, + Channels2 = Channels#{ChannelId => Channel}, + {ok, OldState#{channels => Channels2}} + end. + +on_remove_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId) -> + Channels2 = maps:remove(ChannelId, Channels), + {ok, OldState#{channels => Channels2}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> + connected; + _ -> + {error, not_exists} + end. + %% ------------------------------------------------------------------------------------------------- %% Helper fns do_query( InstanceId, Query, - #{pool_name := PoolName, ack_timeout := AckTimeout} = State + #{pool_name := PoolName, ack_timeout := AckTimeout, channels := Channels} = State ) -> ?TRACE( "QUERY", @@ -158,7 +254,7 @@ do_query( ), Result = - case try_apply_template(Query, State) of + case try_render_message(Query, Channels) of {ok, Msg} -> ecpool:pick_and_do( PoolName, @@ -199,48 +295,26 @@ connect(Opts) -> Options = proplists:get_value(options, Opts), emqx_bridge_syskeeper_client:start_link(Options). -parse_template(Config) -> - Templates = - case maps:get(template, Config, undefined) of - undefined -> #{}; - <<>> -> #{}; - Template -> #{send_message => Template} - end, - - parse_template(maps:to_list(Templates), #{}). - -parse_template([{Key, H} | T], Templates) -> - ParamsTks = emqx_placeholder:preproc_tmpl(H), - parse_template( - T, - Templates#{Key => ParamsTks} - ); -parse_template([], Templates) -> - Templates. - -try_apply_template([{Type, _} | _] = Datas, #{templates := Templates} = State) -> - case maps:find(Type, Templates) of - {ok, Template} -> - apply_template(Datas, Template, State); +try_render_message(Datas, Channels) -> + try_render_message(Datas, Channels, []). + +try_render_message([{MessageTag, Data} | T], Channels, Acc) -> + case maps:find(MessageTag, Channels) of + {ok, Channel} -> + case render_message(Data, Channel) of + {ok, Msg} -> + try_render_message(T, Channels, [Msg | Acc]); + Error -> + Error + end; _ -> - {error, {unrecoverable_error, {invalid_request, Datas}}} - end. - -apply_template(Datas, Template, State) -> - apply_template(Datas, Template, State, []). - -apply_template([{_, Data} | T], Template, State, Acc) -> - case do_apply_template(Data, Template, State) of - {ok, Msg} -> - apply_template(T, Template, State, [Msg | Acc]); - Error -> - Error + {error, {unrecoverable_error, {invalid_message_tag, MessageTag}}} end; -apply_template([], _Template, _State, Acc) -> +try_render_message([], _Channels, Acc) -> {ok, lists:reverse(Acc)}. -do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ - target_qos := TargetQoS, target_topic_tks := TargetTopicTks +render_message(#{id := Id, qos := QoS, clientid := From} = Data, #{ + target_qos := TargetQoS, target_topic := TargetTopicTks, template := Template }) -> Msg = maps:with([qos, flags, topic, payload, timestamp], Data), Topic = emqx_placeholder:proc_tmpl(TargetTopicTks, Msg), @@ -257,13 +331,7 @@ do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ topic := Topic, payload := format_data(Template, Msg) }}; -do_apply_template(Data, Template, State) -> - ?SLOG(info, #{ - msg => "syskeeper_connector_apply_template_error", - data => Data, - template => Template, - state => State - }), +render_message(Data, _Channel) -> {error, {unrecoverable_error, {invalid_data, Data}}}. format_data([], Msg) -> diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl index fcdcbac858..1968022c12 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl @@ -11,7 +11,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, + connector_examples/1, values/1 ]). @@ -28,21 +28,40 @@ %% ------------------------------------------------------------------------------------------------- %% api -conn_bridge_examples(Method) -> +connector_examples(Method) -> [ #{ <<"syskeeper_proxy">> => #{ - summary => <<"Syskeeper Bridge Proxy">>, + summary => <<"Syskeeper Proxy Connector">>, value => values(Method) } } ]. -values(_Method) -> +values(get) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values(post) + ); +values(post) -> + maps:merge( + #{ + name => <<"syskeeper_proxy">>, + type => <<"syskeeper_proxy">> + }, + values(put) + ); +values(put) -> #{ enable => true, - type => syskeeper_proxy, - name => <<"foo">>, listen => <<"127.0.0.1:9092">>, acceptors => 16, handshake_timeout => <<"16s">> @@ -50,13 +69,14 @@ values(_Method) -> %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -namespace() -> "bridge_syskeeper_proxy". +namespace() -> "connector_syskeeper_proxy". roots() -> []. -fields("config") -> +fields(config) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, {listen, listen()}, {acceptors, mk( @@ -69,21 +89,17 @@ fields("config") -> #{desc => ?DESC(handshake_timeout), default => <<"10s">>} )} ]; -fields("creation_opts") -> - emqx_resource_schema:create_opts([{worker_pool_size, #{default => 1}}]); fields("post") -> - [type_field(), name_field() | fields("config")]; + [type_field(), name_field() | fields(config)]; fields("put") -> - fields("config"); + fields(config); fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"). -desc("config") -> +desc(config) -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; -desc("creation_opts" = Name) -> - emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl index 21886a90c9..10bbf4d59d 100644 --- a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl +++ b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl @@ -13,7 +13,8 @@ -define(HOST, "127.0.0.1"). -define(PORT, 9092). --define(ACK_TIMEOUT, <<"3s">>). +-define(ACK_TIMEOUT, 2000). +-define(HANDSHAKE_TIMEOUT, 10000). -define(SYSKEEPER_NAME, <<"syskeeper">>). -define(SYSKEEPER_PROXY_NAME, <<"syskeeper_proxy">>). -define(BATCH_SIZE, 3). @@ -32,7 +33,13 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - Lifecycle = [t_setup_via_config, t_setup_via_http_api, t_get_status], + Lifecycle = [ + t_setup_proxy_via_config, + t_setup_proxy_via_http_api, + t_setup_forwarder_via_config, + t_setup_forwarder_via_http_api, + t_get_status + ], Write = TCs -- Lifecycle, BatchingGroups = [{group, with_batch}, {group, without_batch}], [ @@ -58,14 +65,21 @@ end_per_group(_Group, _Config) -> ok. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, emqx_bridge_syskeeper]), + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_bridge_syskeeper + ]), _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]). + ok = emqx_common_test_helpers:stop_apps([ + emqx_bridge_syskeeper, emqx_bridge, emqx_connector, emqx_conf + ]). init_per_testcase(_Testcase, Config) -> snabbkaffe:start_trace(), @@ -73,15 +87,15 @@ init_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, _Config) -> ok = snabbkaffe:stop(), - delete_bridge(syskeeper, ?SYSKEEPER_NAME), - delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + delete_bridge(syskeeper_forwarder, ?SYSKEEPER_NAME), + delete_connectors(syskeeper_forwarder, ?SYSKEEPER_NAME), + delete_connectors(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), ok. %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ syskeeper_config(Config) -> - AckMode = proplists:get_value(ack_mode, Config, no_ack), BatchSize = case proplists:get_value(enable_batch, Config, false) of true -> ?BATCH_SIZE; @@ -89,82 +103,126 @@ syskeeper_config(Config) -> end, ConfigString = io_lib:format( - "bridges.~s.~s {\n" + "actions.~s.~s {\n" " enable = true\n" - " server = \"~ts\"\n" - " ack_mode = ~p\n" - " ack_timeout = \"~ts\"\n" + " connector = ~ts\n" " resource_opts = {\n" " request_ttl = 500ms\n" " batch_size = ~b\n" " }\n" "}", [ - syskeeper, + syskeeper_forwarder, + ?SYSKEEPER_NAME, + ?SYSKEEPER_NAME, + BatchSize + ] + ), + {?SYSKEEPER_NAME, parse_bridge_and_check(ConfigString, syskeeper_forwarder, ?SYSKEEPER_NAME)}. + +syskeeper_connector_config(Config) -> + AckMode = proplists:get_value(ack_mode, Config, no_ack), + ConfigString = + io_lib:format( + "connectors.~s.~s {\n" + " enable = true\n" + " server = \"~ts\"\n" + " ack_mode = ~p\n" + " ack_timeout = ~p\n" + " pool_size = 1\n" + "}", + [ + syskeeper_forwarder, ?SYSKEEPER_NAME, server(), AckMode, - ?ACK_TIMEOUT, - BatchSize + ?ACK_TIMEOUT ] ), - {?SYSKEEPER_NAME, parse_and_check(ConfigString, syskeeper, ?SYSKEEPER_NAME)}. + {?SYSKEEPER_NAME, + parse_connectors_and_check(ConfigString, syskeeper_forwarder, ?SYSKEEPER_NAME)}. syskeeper_proxy_config(_Config) -> ConfigString = io_lib:format( - "bridges.~s.~s {\n" + "connectors.~s.~s {\n" " enable = true\n" " listen = \"~ts\"\n" " acceptors = 1\n" + " handshake_timeout = ~p\n" "}", [ syskeeper_proxy, ?SYSKEEPER_PROXY_NAME, - server() + server(), + ?HANDSHAKE_TIMEOUT ] ), - {?SYSKEEPER_PROXY_NAME, parse_and_check(ConfigString, syskeeper_proxy, ?SYSKEEPER_PROXY_NAME)}. + {?SYSKEEPER_PROXY_NAME, + parse_connectors_and_check(ConfigString, syskeeper_proxy, ?SYSKEEPER_PROXY_NAME)}. -parse_and_check(ConfigString, BridgeType0, Name) -> - BridgeType = to_bin(BridgeType0), - ct:pal("ConfigString:~ts~n", [ConfigString]), +parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) -> + Type = to_bin(Type0), {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}), + #{RootKey := #{Type := #{Name := Config}}} = RawConf, Config. +parse_bridge_and_check(ConfigString, BridgeType, Name) -> + parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name). + +parse_connectors_and_check(ConfigString, ConnectorType, Name) -> + Config = parse_and_check( + ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name + ), + emqx_utils_maps:safe_atom_key_map(Config). + create_bridge(Type, Name, Conf) -> - emqx_bridge:create(Type, Name, Conf). + emqx_bridge_v2:create(Type, Name, Conf). delete_bridge(Type, Name) -> - emqx_bridge:remove(Type, Name). + emqx_bridge_v2:remove(Type, Name). create_both_bridge(Config) -> {ProxyName, ProxyConf} = syskeeper_proxy_config(Config), + {ConnectorName, ConnectorConf} = syskeeper_connector_config(Config), + {Name, Conf} = syskeeper_config(Config), ?assertMatch( {ok, _}, - create_bridge(syskeeper_proxy, ProxyName, ProxyConf) + create_connectors(syskeeper_proxy, ProxyName, ProxyConf) ), - {Name, Conf} = syskeeper_config(Config), + timer:sleep(1000), ?assertMatch( {ok, _}, - create_bridge(syskeeper, Name, Conf) - ). + create_connectors(syskeeper_forwarder, ConnectorName, ConnectorConf) + ), + timer:sleep(1000), + ?assertMatch({ok, _}, create_bridge(syskeeper_forwarder, Name, Conf)). create_bridge_http(Params) -> - Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + call_create_http("actions", Params). + +create_connectors_http(Params) -> + call_create_http("connectors", Params). + +call_create_http(Root, Params) -> + Path = emqx_mgmt_api_test_util:api_path([Root]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; Error -> Error end. +create_connectors(Type, Name, Conf) -> + emqx_connector:create(Type, Name, Conf). + +delete_connectors(Type, Name) -> + emqx_connector:remove(Type, Name). + send_message(_Config, Payload) -> Name = ?SYSKEEPER_NAME, - BridgeType = syskeeper, - BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), - emqx_bridge:send_message(BridgeID, Payload). + BridgeType = syskeeper_forwarder, + emqx_bridge_v2:send_message(BridgeType, Name, Payload, #{}). to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8); @@ -197,23 +255,23 @@ receive_msg() -> %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -t_setup_via_config(Config) -> +t_setup_proxy_via_config(Config) -> {Name, Conf} = syskeeper_proxy_config(Config), ?assertMatch( {ok, _}, - create_bridge(syskeeper_proxy, Name, Conf) + create_connectors(syskeeper_proxy, Name, Conf) ), ?assertMatch( X when is_pid(X), esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ), - delete_bridge(syskeeper_proxy, Name), + delete_connectors(syskeeper_proxy, Name), ?assertError( not_found, esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ). -t_setup_via_http_api(Config) -> +t_setup_proxy_via_http_api(Config) -> {Name, ProxyConf0} = syskeeper_proxy_config(Config), ProxyConf = ProxyConf0#{ <<"name">> => Name, @@ -221,7 +279,7 @@ t_setup_via_http_api(Config) -> }, ?assertMatch( {ok, _}, - create_bridge_http(ProxyConf) + create_connectors_http(ProxyConf) ), ?assertMatch( @@ -229,29 +287,64 @@ t_setup_via_http_api(Config) -> esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ), - delete_bridge(syskeeper_proxy, Name), + delete_connectors(syskeeper_proxy, Name), ?assertError( not_found, esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ). +t_setup_forwarder_via_config(Config) -> + {ConnectorName, ConnectorConf} = syskeeper_connector_config(Config), + {Name, Conf} = syskeeper_config(Config), + ?assertMatch( + {ok, _}, + create_connectors(syskeeper_forwarder, ConnectorName, ConnectorConf) + ), + ?assertMatch({ok, _}, create_bridge(syskeeper_forwarder, Name, Conf)). + +t_setup_forwarder_via_http_api(Config) -> + {ConnectorName, ConnectorConf0} = syskeeper_connector_config(Config), + {Name, Conf0} = syskeeper_config(Config), + + ConnectorConf = ConnectorConf0#{ + <<"name">> => ConnectorName, + <<"type">> => syskeeper_forwarder + }, + + Conf = Conf0#{ + <<"name">> => Name, + <<"type">> => syskeeper_forwarder + }, + + ?assertMatch( + {ok, _}, + create_connectors_http(ConnectorConf) + ), + + ?assertMatch( + {ok, _}, + create_bridge_http(Conf) + ). + t_get_status(Config) -> create_both_bridge(Config), - SyskeeperId = emqx_bridge_resource:resource_id(syskeeper, ?SYSKEEPER_NAME), - ProxyId = emqx_bridge_resource:resource_id(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(SyskeeperId)), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ProxyId)), - delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + ?assertMatch( + #{status := connected}, emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME) + ), + delete_connectors(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), ?retry( _Sleep = 500, _Attempts = 10, - ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(SyskeeperId)) + ?assertMatch( + #{status := connecting}, + emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME) + ) ). t_write_failure(Config) -> create_both_bridge(Config), - delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + delete_connectors(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), SentData = make_message(), Result = ?wait_async_action( diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index c8ec8e1be8..19b9fa2444 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -25,6 +25,10 @@ resource_type(kafka_producer) -> %% We use AEH's Kafka interface. resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(syskeeper_forwarder) -> + emqx_bridge_syskeeper_connector; +resource_type(syskeeper_proxy) -> + emqx_bridge_syskeeper_proxy_server; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -56,6 +60,22 @@ connector_structs() -> desc => <<"Azure Event Hub Connector Config">>, required => false } + )}, + {syskeeper_forwarder, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)), + #{ + desc => <<"Syskeeper Connector Config">>, + required => false + } + )}, + {syskeeper_proxy, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, config)), + #{ + desc => <<"Syskeeper Proxy Connector Config">>, + required => false + } )} ]. @@ -74,7 +94,9 @@ examples(Method) -> schema_modules() -> [ emqx_bridge_kafka, - emqx_bridge_azure_event_hub + emqx_bridge_azure_event_hub, + emqx_bridge_syskeeper_connector, + emqx_bridge_syskeeper_proxy ]. api_schemas(Method) -> @@ -82,7 +104,11 @@ api_schemas(Method) -> %% We need to map the `type' field of a request (binary) to a %% connector schema module. api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector") + api_ref( + emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector" + ), + api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), + api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 22eb523be0..e4308ac549 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -60,7 +60,9 @@ enterprise_fields_connectors() -> []. -endif. connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; -connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]. +connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; +connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; +connector_type_to_bridge_types(syskeeper_proxy) -> []. actions_config_name() -> <<"actions">>. diff --git a/rel/i18n/emqx_bridge_syskeeper_connector.hocon b/rel/i18n/emqx_bridge_syskeeper_connector.hocon index a057e6647e..3e93458e0c 100644 --- a/rel/i18n/emqx_bridge_syskeeper_connector.hocon +++ b/rel/i18n/emqx_bridge_syskeeper_connector.hocon @@ -1,5 +1,17 @@ emqx_bridge_syskeeper_connector { +desc_config.desc: +"""Configuration for a Syskeeper forwarder connector""" + +desc_config.label: +"""Syskeeper Forwarder Connector Configuration""" + +config_enable.desc: +"""Enable or disable this connector""" + +config_enable.label: +"""Enable Or Disable connector""" + server.desc: """The address of the Syskeeper proxy server""" diff --git a/rel/i18n/emqx_bridge_syskeeper_proxy.hocon b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon index f6c519216a..dc96486d18 100644 --- a/rel/i18n/emqx_bridge_syskeeper_proxy.hocon +++ b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon @@ -1,16 +1,16 @@ emqx_bridge_syskeeper_proxy { config_enable.desc: -"""Enable or disable this bridge""" +"""Enable or disable this connector""" config_enable.label: -"""Enable Or Disable Bridge""" +"""Enable Or Disable Connector""" desc_config.desc: -"""Configuration for a Syskeeper proxy bridge""" +"""Configuration for a Syskeeper proxy connector""" desc_config.label: -"""Syskeeper Proxy Bridge Configuration""" +"""Syskeeper Proxy Connector Configuration""" desc_name.desc: """Bridge name""" From 447c933ff8221a975d5e8083011f19f83a0c0083 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 10 Nov 2023 10:42:53 +0800 Subject: [PATCH 5/5] fix(syskeeper): homogenize the Syskeeper bridge and update license data --- apps/emqx_bridge/src/emqx_bridge.erl | 3 +- .../src/schema/emqx_bridge_v2_schema.erl | 7 +--- .../include/emqx_bridge_syskeeper.hrl | 2 +- .../src/emqx_bridge_syskeeper.app.src | 2 +- .../src/emqx_bridge_syskeeper.erl | 41 ++++++++++++------- .../src/emqx_bridge_syskeeper_connector.erl | 17 +++++--- .../test/emqx_bridge_syskeeper_SUITE.erl | 5 +++ rel/i18n/emqx_bridge_syskeeper.hocon | 11 +++++ 8 files changed, 61 insertions(+), 27 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 8098072c00..7ffe58f6a2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -93,7 +93,8 @@ T == iotdb; T == kinesis_producer; T == greptimedb; - T == azure_event_hub_producer + T == azure_event_hub_producer; + T == syskeeper_forwarder ). -define(ROOT_KEY, bridges). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index f0973f260a..d6d8eb9a10 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -134,11 +134,8 @@ desc(_) -> schema_homogeneous_test() -> case lists:filtermap( - fun - ({syskeeper_forwarder, _Schema}) -> - false; - ({_Name, Schema}) -> - is_bad_schema(Schema) + fun({_Name, Schema}) -> + is_bad_schema(Schema) end, fields(actions) ) diff --git a/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl index b381ebf504..4e14fafb04 100644 --- a/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl +++ b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -ifndef(EMQX_BRIDGE_SYSKEEPER). -define(EMQX_BRIDGE_SYSKEEPER, true). diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src index a8f5338679..3c7995cb7a 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -1,5 +1,5 @@ {application, emqx_bridge_syskeeper, [ - {description, "EMQX Enterprise Bridge"}, + {description, "EMQX Enterprise Data bridge for Syskeeper"}, {vsn, "0.1.0"}, {registered, []}, {applications, [ diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl index 9e520d095f..55e3d08b9e 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_syskeeper). @@ -59,9 +59,11 @@ values(put) -> #{ enable => true, connector => <<"syskeeper_forwarder">>, - target_topic => <<"${topic}">>, - target_qos => <<"-1">>, - template => <<"${payload}">>, + parameters => #{ + target_topic => <<"${topic}">>, + target_qos => <<"-1">>, + template => <<"${payload}">> + }, resource_opts => #{ worker_pool_size => 16 } @@ -69,7 +71,7 @@ values(put) -> %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -namespace() -> "bridge_syskeeper". +namespace() -> "syskeeper". roots() -> []. @@ -81,6 +83,24 @@ fields(config) -> mk(binary(), #{ desc => ?DESC(emqx_connector_schema, "connector_field"), required => true })}, + {parameters, + mk( + ref(?MODULE, "parameters"), + #{required => true, desc => ?DESC("parameters")} + )}, + {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ]; +fields("parameters") -> + [ {target_topic, mk( binary(), @@ -95,15 +115,6 @@ fields(config) -> mk( binary(), #{desc => ?DESC("template"), default => <<"${payload}">>} - )}, - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{}, - desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) - } )} ]; fields("creation_opts") -> @@ -119,6 +130,8 @@ desc(config) -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Syskeeper using `", string:to_upper(Method), "` method."]; +desc("parameters") -> + ?DESC("parameters"); desc("creation_opts" = Name) -> emqx_resource_schema:desc(Name); desc(_) -> diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 5c52017fd4..c267ee5210 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -206,11 +206,18 @@ status_result(true) -> connected; status_result(false) -> connecting; status_result({error, _}) -> connecting. -on_add_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId, #{ - target_topic := TargetTopic, - target_qos := TargetQoS, - template := Template -}) -> +on_add_channel( + _InstanceId, + #{channels := Channels} = OldState, + ChannelId, + #{ + parameters := #{ + target_topic := TargetTopic, + target_qos := TargetQoS, + template := Template + } + } +) -> case maps:is_key(ChannelId, Channels) of true -> {error, already_exists}; diff --git a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl index 10bbf4d59d..54330ea376 100644 --- a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl +++ b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl @@ -106,6 +106,11 @@ syskeeper_config(Config) -> "actions.~s.~s {\n" " enable = true\n" " connector = ~ts\n" + " parameters = {\n" + " target_topic = \"${topic}\"\n" + " target_qos = -1\n" + " template = \"${payload}\"\n" + " },\n" " resource_opts = {\n" " request_ttl = 500ms\n" " batch_size = ~b\n" diff --git a/rel/i18n/emqx_bridge_syskeeper.hocon b/rel/i18n/emqx_bridge_syskeeper.hocon index 7cf1953179..289dd22c58 100644 --- a/rel/i18n/emqx_bridge_syskeeper.hocon +++ b/rel/i18n/emqx_bridge_syskeeper.hocon @@ -42,4 +42,15 @@ target_qos.desc: target_qos.label: """Target QoS""" +parameters.desc: +"""Syskeeper data bridge parameters""" + +parameters.label: +"""Parameters""" + +mqtt_topic.desc: +"""MQTT topic or topic filter as data source (bridge input). If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in Syskeeper.""" + +mqtt_topic.label: +"""Source MQTT Topic""" }