Skip to content

Commit

Permalink
fix: cant replace source conf
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongwencool committed Mar 15, 2024
1 parent 228b2c1 commit 43d9c12
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 25 deletions.
58 changes: 33 additions & 25 deletions apps/emqx_conf/src/emqx_conf_cli.erl
Expand Up @@ -245,10 +245,11 @@ load_config_from_raw(RawConf0, Opts) ->
case check_config(RawConf1) of
{ok, RawConf} ->
%% It has been ensured that the connector is always the first configuration to be updated.
%% However, when deleting the connector, we need to clean up the dependent actions first;
%% However, when deleting the connector, we need to clean up the dependent actions/sources first;
%% otherwise, the deletion will fail.
%% notice: we can't create a action before connector.
uninstall_actions(RawConf, Opts),
%% notice: we can't create a action/sources before connector.
uninstall(<<"actions">>, RawConf, Opts),
uninstall(<<"sources">>, RawConf, Opts),
Error =
lists:filtermap(
fun({K, V}) ->
Expand Down Expand Up @@ -288,27 +289,33 @@ load_config_from_raw(RawConf0, Opts) ->
{error, Errors}
end.

uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) ->
Old = emqx_conf:get_raw([<<"actions">>], #{}),
#{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
maps:foreach(
fun({Type, Name}, _) ->
case emqx_bridge_v2:remove(Type, Name) of
ok ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "failed_to_remove_action",
type => Type,
name => Name,
error => Reason
})
end
end,
Removed
);
%% we don't delete things when in merge mode or without actions key.
uninstall_actions(_RawConf, _) ->
uninstall(ActionOrSource, Conf, #{mode := replace}) ->
case maps:find(ActionOrSource, Conf) of
{ok, New} ->
Old = emqx_conf:get_raw([ActionOrSource], #{}),
ActionOrSourceAtom = binary_to_existing_atom(ActionOrSource),
#{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
maps:foreach(
fun({Type, Name}, _) ->
case emqx_bridge_v2:remove(ActionOrSourceAtom, Type, Name) of
ok ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "failed_to_remove",
type => Type,
name => Name,
error => Reason
})
end
end,
Removed
);
error ->
ok
end;
%% we don't delete things when in merge mode or without actions/sources key.
uninstall(_, _RawConf, _) ->
ok.

update_config_cluster(
Expand Down Expand Up @@ -481,7 +488,8 @@ filter_readonly_config(Raw) ->
end.

reload_config(AllConf, Opts) ->
uninstall_actions(AllConf, Opts),
uninstall(<<"actions">>, AllConf, Opts),
uninstall(<<"sources">>, AllConf, Opts),
Fold = fun({Key, Conf}, Acc) ->
case update_config_local(Key, Conf, Opts) of
ok ->
Expand Down
123 changes: 123 additions & 0 deletions apps/emqx_conf/test/emqx_conf_cli_SUITE.erl
Expand Up @@ -219,6 +219,36 @@ t_reload_etc_emqx_conf_not_persistent(Config) ->
),
ok.

t_replace_action_source(Config) ->
Action = rabbitmq_action(),
Source = rabbitmq_source(),
Connector = rabbitmq_connector(),
Rabbitmq = #{
<<"actions">> => Action,
<<"sources">> => Source,
<<"connectors">> => Connector
},
ConfBin0 = hocon_pp:do(Rabbitmq, #{}),
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile0])),
?assertEqual(Action, emqx_config:get_raw([actions])),
?assertEqual(Source, emqx_config:get_raw([sources])),
?assertEqual(Connector, emqx_config:get_raw([connectors])),

Empty = #{
<<"actions">> => #{},
<<"sources">> => #{},
<<"connectors">> => #{}
},
ConfBin1 = hocon_pp:do(Empty, #{}),
ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config),
?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile1])),

?assertEqual(#{}, emqx_config:get_raw([actions])),
?assertEqual(#{}, emqx_config:get_raw([sources])),
?assertEqual(#{}, emqx_config:get_raw([connectors])),
ok.

base_conf() ->
#{
<<"cluster">> => emqx_conf:get_raw([cluster]),
Expand All @@ -235,3 +265,96 @@ changed(node) ->
};
changed(rpc) ->
#{<<"mode">> => <<"sync">>}.

rabbitmq_connector() ->
#{
<<"rabbitmq">> =>
#{
<<"my_connector">> =>
#{
<<"enable">> => true,
<<"description">> => <<>>,
<<"heartbeat">> => <<"30s">>,
<<"password">> => <<"rabbitmq">>,
<<"pool_size">> => 8,
<<"port">> => 5672,
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
},
<<"server">> => <<"localhost">>,
<<"ssl">> =>
#{
<<"ciphers">> => [],
<<"depth">> => 10,
<<"enable">> => false,
<<"hibernate_after">> => <<"5s">>,
<<"log_level">> => <<"notice">>,
<<"reuse_sessions">> => true,
<<"secure_renegotiate">> => true,
<<"verify">> => <<"verify_peer">>,
<<"versions">> =>
[<<"tlsv1.3">>, <<"tlsv1.2">>]
},
<<"timeout">> => <<"5s">>,
<<"username">> => <<"rabbitmq">>,
<<"virtual_host">> => <<"/">>
}
}
}.

rabbitmq_action() ->
#{
<<"rabbitmq">> =>
#{
<<"action">> =>
#{
<<"connector">> => <<"my_connector">>,
<<"description">> => <<>>,
<<"enable">> => true,
<<"parameters">> =>
#{
<<"delivery_mode">> => <<"non_persistent">>,
<<"exchange">> => <<"test">>,
<<"payload_template">> => <<>>,
<<"publish_confirmation_timeout">> => <<"30s">>,
<<"routing_key">> => <<"test">>,
<<"wait_for_publish_confirmations">> => true
},
<<"resource_opts">> =>
#{
<<"batch_size">> => 10,
<<"batch_time">> => <<"10ms">>,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"query_mode">> => <<"async">>,
<<"request_ttl">> => <<"45s">>,
<<"worker_pool_size">> => 16
}
}
}
}.

rabbitmq_source() ->
#{
<<"rabbitmq">> =>
#{
<<"source">> =>
#{
<<"connector">> => <<"my_connector">>,
<<"description">> => <<>>,
<<"enable">> => true,
<<"parameters">> =>
#{
<<"no_ack">> => true,
<<"queue">> => <<"test">>,
<<"wait_for_publish_confirmations">> => true
},
<<"resource_opts">> =>
#{<<"health_check_interval">> => <<"15s">>}
}
}
}.
2 changes: 2 additions & 0 deletions apps/emqx_connector/src/emqx_connector.erl
Expand Up @@ -473,6 +473,8 @@ ensure_no_channels(Configs) ->
fun({Type, ConnectorName}) ->
fun(_) ->
case emqx_connector_resource:get_channels(Type, ConnectorName) of
{error, not_found} ->
ok;
{ok, []} ->
ok;
{ok, Channels} ->
Expand Down
1 change: 1 addition & 0 deletions changes/ce/fix-12715.en.md
@@ -0,0 +1 @@
Fix replacing sources crash if connector has active channels

0 comments on commit 43d9c12

Please sign in to comment.