diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index c50495b3e8..d6462a0b65 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -245,10 +245,11 @@ load_config_from_raw(RawConf0, Opts) -> case check_config(RawConf1) of {ok, RawConf} -> %% It has been ensured that the connector is always the first configuration to be updated. - %% However, when deleting the connector, we need to clean up the dependent actions first; + %% However, when deleting the connector, we need to clean up the dependent actions/sources first; %% otherwise, the deletion will fail. - %% notice: we can't create a action before connector. - uninstall_actions(RawConf, Opts), + %% notice: we can't create a action/sources before connector. + uninstall(<<"actions">>, RawConf, Opts), + uninstall(<<"sources">>, RawConf, Opts), Error = lists:filtermap( fun({K, V}) -> @@ -288,27 +289,33 @@ load_config_from_raw(RawConf0, Opts) -> {error, Errors} end. -uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) -> - Old = emqx_conf:get_raw([<<"actions">>], #{}), - #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old), - maps:foreach( - fun({Type, Name}, _) -> - case emqx_bridge_v2:remove(Type, Name) of - ok -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_remove_action", - type => Type, - name => Name, - error => Reason - }) - end - end, - Removed - ); -%% we don't delete things when in merge mode or without actions key. -uninstall_actions(_RawConf, _) -> +uninstall(ActionOrSource, Conf, #{mode := replace}) -> + case maps:find(ActionOrSource, Conf) of + {ok, New} -> + Old = emqx_conf:get_raw([ActionOrSource], #{}), + ActionOrSourceAtom = binary_to_existing_atom(ActionOrSource), + #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old), + maps:foreach( + fun({Type, Name}, _) -> + case emqx_bridge_v2:remove(ActionOrSourceAtom, Type, Name) of + ok -> + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_remove", + type => Type, + name => Name, + error => Reason + }) + end + end, + Removed + ); + error -> + ok + end; +%% we don't delete things when in merge mode or without actions/sources key. +uninstall(_, _RawConf, _) -> ok. update_config_cluster( @@ -481,7 +488,8 @@ filter_readonly_config(Raw) -> end. reload_config(AllConf, Opts) -> - uninstall_actions(AllConf, Opts), + uninstall(<<"actions">>, AllConf, Opts), + uninstall(<<"sources">>, AllConf, Opts), Fold = fun({Key, Conf}, Acc) -> case update_config_local(Key, Conf, Opts) of ok -> diff --git a/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl b/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl index 78a5fb5d63..b1dc4c205d 100644 --- a/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_cli_SUITE.erl @@ -219,6 +219,36 @@ t_reload_etc_emqx_conf_not_persistent(Config) -> ), ok. +t_replace_action_source(Config) -> + Action = rabbitmq_action(), + Source = rabbitmq_source(), + Connector = rabbitmq_connector(), + Rabbitmq = #{ + <<"actions">> => Action, + <<"sources">> => Source, + <<"connectors">> => Connector + }, + ConfBin0 = hocon_pp:do(Rabbitmq, #{}), + ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config), + ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile0])), + ?assertEqual(Action, emqx_config:get_raw([actions])), + ?assertEqual(Source, emqx_config:get_raw([sources])), + ?assertEqual(Connector, emqx_config:get_raw([connectors])), + + Empty = #{ + <<"actions">> => #{}, + <<"sources">> => #{}, + <<"connectors">> => #{} + }, + ConfBin1 = hocon_pp:do(Empty, #{}), + ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config), + ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile1])), + + ?assertEqual(#{}, emqx_config:get_raw([actions])), + ?assertEqual(#{}, emqx_config:get_raw([sources])), + ?assertEqual(#{}, emqx_config:get_raw([connectors])), + ok. + base_conf() -> #{ <<"cluster">> => emqx_conf:get_raw([cluster]), @@ -235,3 +265,96 @@ changed(node) -> }; changed(rpc) -> #{<<"mode">> => <<"sync">>}. + +rabbitmq_connector() -> + #{ + <<"rabbitmq">> => + #{ + <<"my_connector">> => + #{ + <<"enable">> => true, + <<"description">> => <<>>, + <<"heartbeat">> => <<"30s">>, + <<"password">> => <<"rabbitmq">>, + <<"pool_size">> => 8, + <<"port">> => 5672, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + }, + <<"server">> => <<"localhost">>, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => + [<<"tlsv1.3">>, <<"tlsv1.2">>] + }, + <<"timeout">> => <<"5s">>, + <<"username">> => <<"rabbitmq">>, + <<"virtual_host">> => <<"/">> + } + } + }. + +rabbitmq_action() -> + #{ + <<"rabbitmq">> => + #{ + <<"action">> => + #{ + <<"connector">> => <<"my_connector">>, + <<"description">> => <<>>, + <<"enable">> => true, + <<"parameters">> => + #{ + <<"delivery_mode">> => <<"non_persistent">>, + <<"exchange">> => <<"test">>, + <<"payload_template">> => <<>>, + <<"publish_confirmation_timeout">> => <<"30s">>, + <<"routing_key">> => <<"test">>, + <<"wait_for_publish_confirmations">> => true + }, + <<"resource_opts">> => + #{ + <<"batch_size">> => 10, + <<"batch_time">> => <<"10ms">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"query_mode">> => <<"async">>, + <<"request_ttl">> => <<"45s">>, + <<"worker_pool_size">> => 16 + } + } + } + }. + +rabbitmq_source() -> + #{ + <<"rabbitmq">> => + #{ + <<"source">> => + #{ + <<"connector">> => <<"my_connector">>, + <<"description">> => <<>>, + <<"enable">> => true, + <<"parameters">> => + #{ + <<"no_ack">> => true, + <<"queue">> => <<"test">>, + <<"wait_for_publish_confirmations">> => true + }, + <<"resource_opts">> => + #{<<"health_check_interval">> => <<"15s">>} + } + } + }. diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index bf9a960d56..159e05f9b0 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -473,6 +473,8 @@ ensure_no_channels(Configs) -> fun({Type, ConnectorName}) -> fun(_) -> case emqx_connector_resource:get_channels(Type, ConnectorName) of + {error, not_found} -> + ok; {ok, []} -> ok; {ok, Channels} -> diff --git a/changes/ce/fix-12715.en.md b/changes/ce/fix-12715.en.md new file mode 100644 index 0000000000..ed3ec38fcf --- /dev/null +++ b/changes/ce/fix-12715.en.md @@ -0,0 +1 @@ +Fix replacing sources crash if connector has active channels