New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data bridge target unavailable #10645
Data bridge target unavailable #10645
Conversation
6b23c31
to
9dcdc19
Compare
a561f3d
to
e9bf002
Compare
b5c67e9
to
ed91d81
Compare
% wolff starts a socket to get metadata and closes it asynchronously (calling a spawn to | ||
% actually close it), causing some {'EXIT', tcp_closed} errors. | ||
process_flag(trap_exit, true), | ||
Ret = | ||
case catch wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) of | ||
ok -> | ||
ok; | ||
{error, unknown_topic_or_partition} -> | ||
unhealthy_target; | ||
_ -> | ||
error | ||
end, | ||
process_flag(trap_exit, false), | ||
Ret. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
% wolff starts a socket to get metadata and closes it asynchronously (calling a spawn to | |
% actually close it), causing some {'EXIT', tcp_closed} errors. | |
process_flag(trap_exit, true), | |
Ret = | |
case catch wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) of | |
ok -> | |
ok; | |
{error, unknown_topic_or_partition} -> | |
unhealthy_target; | |
_ -> | |
error | |
end, | |
process_flag(trap_exit, false), | |
Ret. | |
try wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) of | |
ok -> | |
ok; | |
{error, unknown_topic_or_partition} -> | |
unhealthy_target; | |
_ -> | |
error | |
catch | |
_:_ -> error | |
end. |
?
lists:foldl( | ||
fun | ||
(_, connected) -> | ||
connected; | ||
({_Partition, Pid}, Acc) -> | ||
case is_pid(Pid) andalso erlang:is_process_alive(Pid) of | ||
false -> | ||
Acc; | ||
true -> | ||
CheckIfTopicExists(Pid, KafkaTopic, Acc) | ||
end | ||
end, | ||
disconnected, | ||
Leaders | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could disentangle the two checks to make it more readable, and check the topic existence only once?
on_get_status ->
case do_get_status(..., State) of
{disconnected, unhealthy} -> {disconnected, State, unhealthy};
Status -> Status
end.
%% ...
do_get_status ->
HealthyLeaders = lists:filter(fun(L) -> is_pid ... end, Leaders),
case HealthyLeaders of
[Pid | _] ->
do_get_topic_status(Pid, Topic);
[] ->
disconnected
end.
do_get_topic_status ->
try wolf_client:check_if_topic_exists(Pid, Topic) of
...
catch
_:_ -> disconnected
end.
delete_all_bridges(), | ||
?check_trace( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe split this test case into 2.
delete_bridge(Config), | ||
?check_trace( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split into two separate cases?
end. | ||
|
||
check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) -> | ||
% wolff starts a socket to get metadata and closes it asynchronously (calling a spawn to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then setting trap_exit immediately after would not help? it could happen that the trap_exist is set back to false before the socket is down.
better solution is to span_monitor a process to do it and drain the DOWN message
do_get_status(Client, #{kafka_topic := KafkaTopic} = State) -> | ||
CheckIfTopicExists = | ||
fun(Pid, Topic, Default) -> | ||
case wolff_client:check_if_topic_exists(Pid, Topic) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the return value match of wolff_client:check_if_topic_exists is repeated, maybe create a wrapper function instead.
ed91d81
to
a02fa2d
Compare
case do_get_topic_status(HealthyLeaders, KafkaTopic) of | ||
error -> | ||
% it can be that topic is unknown or there was some error getting the leaders (like | ||
% some connection issue). Therefore, let's try to get metadata for that topic directly. | ||
do_get_topic_status(Hosts, KafkaConfig, KafkaTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I guess we could just call do_get_topic_status(Hosts, KafkaConfig, KafkaTopic)
directly rather than going through do_get_topic_status(HealthyLeaders, KafkaTopic)
?
(_Pid, ok) -> | ||
ok; | ||
(Pid, Acc) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should stop iterating when we already know the topic is unknown.
HealthyLeaders = do_get_healthy_leaders(Client, KafkaTopic), | ||
case do_get_topic_status(HealthyLeaders, KafkaTopic) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the unknown topic error seems to take precedence over connection issues, we could check the topic first and short-circuit if it's unknown, instead of checking the leader pids.
RetPid = self(), | ||
QueryRef = make_ref(), | ||
CheckTopic = fun() -> | ||
Ret = | ||
case wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) of | ||
ok -> | ||
ok; | ||
{error, unknown_topic_or_partition} -> | ||
unhealthy_target; | ||
_ -> | ||
error | ||
end, | ||
RetPid ! {QueryRef, Ret} | ||
end, | ||
% wolff starts a socket to get metadata and closes it asynchronously (calling a spawn to | ||
% actually close it), causing some {'EXIT', tcp_closed} errors. Therefore, it'll be executed | ||
% by a spawn process to avoid crashing the main process. | ||
{Pid, Ref} = spawn_monitor(CheckTopic), | ||
% wait process terminate | ||
receive | ||
{'DOWN', Ref, process, Pid, _What} -> ok | ||
after 5000 -> | ||
?SLOG(error, #{msg => "failed to check kafka topic", pid => Pid}), | ||
demonitor(Ref), | ||
exit(Pid, kill) | ||
end, | ||
% check for result | ||
receive | ||
{QueryRef, Result} -> Result | ||
after 500 -> | ||
error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this could use emqx_utils:nolink_apply
.
try
emqx_utils:nolink_apply(fun() -> check... end, Timeout)
catch
exit:_ -> ...;
error:_ -> ...
lists:foldl( | ||
fun | ||
(WorkerPid, ok) -> | ||
{ok, Conn} = ecpool_worker:client(WorkerPid), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better avoid exploding here?
a02fa2d
to
ca40e80
Compare
wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) | ||
end, | ||
try | ||
case emqx_utils:nolink_apply(CheckTopicFun, 500) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe 500 ms is a bit too strict and could lead to false positives?
case emqx_utils:nolink_apply(CheckTopicFun, 500) of | |
case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of |
@@ -224,7 +224,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> | |||
{connected, NState}; | |||
{error, _Reason} -> | |||
%% do not log error, it is logged in prepare_sql_to_conn | |||
connecting | |||
connecting; | |||
{undefined_table, NState} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
{undefined_table, NState} -> | |
{error, {undefined_table, NState}} -> |
(will require reordering with the catch-all clause above)
lists:foldl( | ||
fun | ||
(WorkerPid, ok) -> | ||
{ok, Conn} = ecpool_worker:client(WorkerPid), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is still explosive.
apps/emqx_oracle/src/emqx_oracle.erl
Outdated
lists:foldl( | ||
fun | ||
(WorkerPid, ok) -> | ||
{ok, Conn} = ecpool_worker:client(WorkerPid), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
馃挘
1f9ebb3
to
a5ceaf3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are still a few occurrences of {undefined_table, _}
instead of {error, {undefined_table, _}}
, but that's a nit that could be addressed at a later time.
a5ceaf3
to
8430ec6
Compare
fixed =) |
Fixes https://emqx.atlassian.net/browse/EMQX-9026
Summary
馃 Generated by Copilot at 6b23c31
This pull request enhances the error handling and reporting of the MySQL and PostgreSQL connectors when the target table is not created. It also updates the default SQL statement for the MySQL connector to match the new table schema. It affects the files
emqx_connector_mysql.erl
,emqx_connector_pgsql.erl
,emqx_resource_buffer_worker.erl
, andemqx_ee_bridge_mysql.erl
.PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/{ce,ee}/(feat|perf|fix)-<PR-id>.en.md
filesIf there should be document changes, a PR to emqx-docs.git is sent, or a jira ticket is created to follow upSchema changes are backward compatibleChecklist for CI (.github/workflows) changes
If changed package build workflow, pass this action (manual trigger)Change log has been added tochanges/
dir for user-facing artifacts update