Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data bridge target unavailable #10645

Merged
merged 4 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.6"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
Expand Down
75 changes: 55 additions & 20 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ on_start(InstId, Config) ->
sasl => emqx_bridge_kafka_impl:sasl(Auth),
ssl => ssl(SSL)
},
case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
unhealthy_target ->
throw(unhealthy_target);
_ ->
ok
end,
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
{ok, _} ->
?SLOG(info, #{
Expand Down Expand Up @@ -108,7 +114,9 @@ on_start(InstId, Config) ->
kafka_topic => KafkaTopic,
producers => Producers,
resource_id => ResourceId,
sync_query_timeout => SyncQueryTimeout
sync_query_timeout => SyncQueryTimeout,
hosts => Hosts,
kafka_config => KafkaConfig
}};
{error, Reason2} ->
?SLOG(error, #{
Expand All @@ -131,6 +139,7 @@ on_start(InstId, Config) ->
client_id => ClientId
}
),

throw(
"Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters."
Expand Down Expand Up @@ -294,34 +303,60 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
%% Note: since wolff client has its own replayq that is not managed by
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
on_get_status(_InstId, #{client_id := ClientId} = State) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
do_get_status(Pid, KafkaTopic);
case do_get_status(Pid, State) of
ok -> connected;
unhealthy_target -> {disconnected, State, unhealthy_target};
error -> connecting
end;
{error, _Reason} ->
connecting
end.

do_get_status(Client, KafkaTopic) ->
%% TODO: add a wolff_producers:check_connectivity
do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) ->
case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
unhealthy_target ->
unhealthy_target;
_ ->
case do_get_healthy_leaders(Client, KafkaTopic) of
[] -> error;
_ -> ok
end
end.

do_get_healthy_leaders(Client, KafkaTopic) ->
case wolff_client:get_leader_connections(Client, KafkaTopic) of
{ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable
case
lists:any(
fun({_Partition, Pid}) ->
is_pid(Pid) andalso erlang:is_process_alive(Pid)
end,
Leaders
)
of
true ->
connected;
false ->
connecting
end;
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
fun({_Partition, Pid}) ->
case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
true -> {true, Pid};
_ -> false
end
end,
Leaders
);
{error, _} ->
connecting
[]
end.

do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) ->
CheckTopicFun =
fun() ->
wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic)
end,
try
case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of
ok -> ok;
{error, unknown_topic_or_partition} -> unhealthy_target;
_ -> error
end
catch
_:_ ->
error
end.

ssl(#{enable := true} = SSL) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,38 @@ t_failed_creation_then_fix(Config) ->
delete_all_bridges(),
ok.

t_table_removed(_Config) ->
HostsString = kafka_hosts_string_sasl(),
AuthSettings = valid_sasl_plain_settings(),
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = "undefined-test-topic",
Conf = config(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
}),
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)),
ok = emqx_bridge_resource:remove(BridgeId),
delete_all_bridges(),
ok.

%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
Expand Down
80 changes: 69 additions & 11 deletions apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,39 @@ sql_drop_table() ->
sql_check_table_exist() ->
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'".

new_jamdb_connection(Config) ->
JamdbOpts = [
{host, ?config(oracle_host, Config)},
{port, ?config(oracle_port, Config)},
{user, "system"},
{password, "oracle"},
{sid, ?SID}
],
jamdb_oracle:start(JamdbOpts).

close_jamdb_connection(Conn) ->
jamdb_oracle:stop(Conn).

reset_table(Config) ->
ResourceId = resource_id(Config),
drop_table_if_exists(Config),
{ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
ResourceId, {sql, sql_create_table()}
),
{ok, Conn} = new_jamdb_connection(Config),
try
ok = drop_table_if_exists(Conn),
{ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_create_table())
after
close_jamdb_connection(Conn)
end,
ok.

drop_table_if_exists(Conn) when is_pid(Conn) ->
{ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_drop_table()),
ok;
drop_table_if_exists(Config) ->
ResourceId = resource_id(Config),
{ok, [{proc_result, 0, _}]} =
emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
{ok, Conn} = new_jamdb_connection(Config),
try
ok = drop_table_if_exists(Conn)
after
close_jamdb_connection(Conn)
end,
ok.

oracle_config(TestCase, _ConnectionType, Config) ->
Expand All @@ -216,7 +237,7 @@ oracle_config(TestCase, _ConnectionType, Config) ->
" pool_size = 1\n"
" sql = \"~s\"\n"
" resource_opts = {\n"
" health_check_interval = \"5s\"\n"
" health_check_interval = \"15s\"\n"
" request_ttl = \"30s\"\n"
" query_mode = \"async\"\n"
" batch_size = 3\n"
Expand Down Expand Up @@ -349,13 +370,13 @@ t_sync_query(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
Expand All @@ -381,13 +402,13 @@ t_batch_sync_query(Config) ->
BridgeId = bridge_id(Config),
?check_trace(
begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 30,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
Expand Down Expand Up @@ -464,6 +485,7 @@ t_start_stop(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
Expand Down Expand Up @@ -515,6 +537,7 @@ t_on_get_status(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
reset_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
Expand Down Expand Up @@ -547,10 +570,45 @@ t_no_sid_nor_service_name(Config0) ->
),
ok.

t_missing_table(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
drop_table_if_exists(Config),
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
{ok, Status} when Status =:= disconnected orelse Status =:= connecting,
emqx_resource_manager:health_check(ResourceId)
)
),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => true
},
Message = {send_message, Params},
?assertMatch(
{error, {resource_error, #{reason := not_connected}}},
emqx_resource:simple_sync_query(ResourceId, Message)
),
ok
end,
fun(Trace) ->
?assertNotMatch([], ?of_kind(oracle_undefined_table, Trace)),
ok
end
).

t_table_removed(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
Expand Down
67 changes: 67 additions & 0 deletions apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).

query_resource_sync(Config, Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).

query_resource_async(Config, Request) ->
query_resource_async(Config, Request, _Opts = #{}).

Expand Down Expand Up @@ -634,3 +640,64 @@ t_nasty_sql_string(Config) ->
1_000
),
?assertEqual(Payload, connect_and_get_payload(Config)).

t_missing_table(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),

?check_trace(
begin
connect_and_drop_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
{ok, Status} when Status == connecting orelse Status == disconnected,
emqx_resource_manager:health_check(ResourceID)
)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
?assertMatch(
{error, {resource_error, #{reason := unhealthy_target}}},
query_resource(Config, {send_message, SentData, [], Timeout})
),
ok
end,
fun(Trace) ->
?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
ok
end
),
connect_and_create_table(Config),
ok.

t_table_removed(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
connect_and_drop_table(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
?assertMatch(
{error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}},
query_resource_sync(Config, {send_message, SentData, []})
),
ok
end,
[]
),
connect_and_create_table(Config),
ok.