Skip to content

Commit

Permalink
fix(kafka): use client config for topic existence check
Browse files Browse the repository at this point in the history
Prior to this fix, Kafka producer config was used as client config
  • Loading branch information
zmstone committed Oct 27, 2023
1 parent f2c9739 commit 5f17a8f
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 110 deletions.
107 changes: 54 additions & 53 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Expand Up @@ -102,6 +102,7 @@ on_start(InstId, Config) ->
client_id => ClientId,
resource_id => ResourceId,
hosts => Hosts,
client_config => ClientConfig,
installed_bridge_v2s => #{}
}}.

Expand All @@ -110,14 +111,15 @@ on_add_channel(
#{
client_id := ClientId,
hosts := Hosts,
client_config := ClientConfig,
installed_bridge_v2s := InstalledBridgeV2s
} = OldState,
BridgeV2Id,
BridgeV2Config
) ->
%% The following will throw an exception if the bridge producers fails to start
{ok, BridgeV2State} = create_producers_for_bridge_v2(
InstId, BridgeV2Id, ClientId, Hosts, BridgeV2Config
InstId, BridgeV2Id, ClientId, Hosts, ClientConfig, BridgeV2Config
),
NewInstalledBridgeV2s = maps:put(BridgeV2Id, BridgeV2State, InstalledBridgeV2s),
%% Update state
Expand All @@ -129,15 +131,17 @@ create_producers_for_bridge_v2(
BridgeV2Id,
ClientId,
Hosts,
ClientConfig,
#{
bridge_type := BridgeType,
kafka := #{
message := MessageTemplate,
topic := KafkaTopic,
sync_query_timeout := SyncQueryTimeout
} = KafkaConfig
kafka := KafkaConfig
}
) ->
#{
message := MessageTemplate,
topic := KafkaTopic,
sync_query_timeout := SyncQueryTimeout
} = KafkaConfig,
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
Expand All @@ -150,7 +154,7 @@ create_producers_for_bridge_v2(
_ ->
string:equal(TestIdStart, InstId)
end,
ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic),
ok = check_topic_status(Hosts, ClientConfig, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, KafkaTopic),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id
Expand Down Expand Up @@ -488,15 +492,16 @@ on_get_channel_status(
#{
client_id := ClientId,
hosts := Hosts,
client_config := ClientConfig,
installed_bridge_v2s := Channels
} = _State
) ->
ChannelState = maps:get(ChannelId, Channels),
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok ->
try check_leaders_and_topic(Pid, Hosts, ChannelState) of
try check_leaders_and_topic(ClientId, Pid, Hosts, ClientConfig, KafkaTopic) of
ok ->
connected
catch
Expand All @@ -511,19 +516,31 @@ on_get_channel_status(
end.

check_leaders_and_topic(
Client,
ClientId,
ClientPid,
Hosts,
#{
kafka_config := KafkaConfig,
kafka_topic := KafkaTopic
} = _ChannelState
ClientConfig,
KafkaTopic
) ->
check_if_healthy_leaders(Client, KafkaTopic),
check_topic_status(Hosts, KafkaConfig, KafkaTopic).
check_topic_status(Hosts, ClientConfig, KafkaTopic),
do_check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic).

check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) ->
check_if_healthy_leaders(ClientId, KafkaTopic) when is_binary(ClientId) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
do_check_if_healthy_leaders(ClientId, Pid, KafkaTopic);
{error, Reason} ->
throw(#{
error => cannot_find_kafka_client,
reason => Reason,
kafka_client => ClientId,
kafka_topic => KafkaTopic
})
end.

do_check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(Client, KafkaTopic) of
case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of
{ok, LeadersToCheck} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
Expand All @@ -540,50 +557,34 @@ check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) ->
end,
case Leaders of
[] ->
throw(
iolist_to_binary(
io_lib:format("Could not find any healthy partion leader for topic ~s", [
KafkaTopic
])
)
);
throw(#{
error => no_connected_partition_leader,
kafka_client => ClientId,
kafka_topic => KafkaTopic
});
_ ->
ok
end;
check_if_healthy_leaders(ClientId, KafkaTopic) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
check_if_healthy_leaders(Pid, KafkaTopic);
{error, _Reason} ->
throw(iolist_to_binary(io_lib:format("Could not find Kafka client: ~p", [ClientId])))
end.

check_topic_status(Hosts, KafkaConfig, KafkaTopic) ->
CheckTopicFun =
fun() ->
wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic)
end,
try
case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of
ok ->
ok;
{error, unknown_topic_or_partition} ->
throw(
iolist_to_binary(io_lib:format("Unknown topic or partition ~s", [KafkaTopic]))
);
_ ->
ok
end
catch
error:_:_ ->
%% Some other error not related to unknown_topic_or_partition
ok
check_topic_status(Hosts, ClientConfig, KafkaTopic) ->
%% TODO: change to call wolff:check_if_topic_exists when type spec is fixed for this function
case wolff_client:check_if_topic_exists(Hosts, ClientConfig#{nolink => true}, KafkaTopic) of
ok ->
ok;
{error, unknown_topic_or_partition} ->
throw(#{error => unknown_kafka_topic, topic => KafkaTopic});
{error, Reason} ->
throw(#{
error => failed_to_check_topic_status,
reason => Reason,
kafka_topic => KafkaTopic
})
end.

ssl(#{enable := true} = SSL) ->
emqx_tls_lib:to_client_opts(SSL);
ssl(_) ->
[].
false.

producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) ->
#{
Expand Down
111 changes: 54 additions & 57 deletions apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl
Expand Up @@ -168,17 +168,17 @@ t_publish_no_auth(CtConfig) ->
t_publish_no_auth_key_dispatch(CtConfig) ->
publish_with_and_without_ssl(CtConfig, "none", #{"partition_strategy" => "key_dispatch"}).

% t_publish_sasl_plain(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()).
t_publish_sasl_plain(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()).

% t_publish_sasl_scram256(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()).
t_publish_sasl_scram256(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()).

% t_publish_sasl_scram512(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()).
t_publish_sasl_scram512(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()).

% t_publish_sasl_kerberos(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()).
t_publish_sasl_kerberos(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()).

%%------------------------------------------------------------------------------
%% Test cases for REST api
Expand All @@ -187,20 +187,21 @@ t_publish_no_auth_key_dispatch(CtConfig) ->
t_kafka_bridge_rest_api_plain_text(_CtConfig) ->
kafka_bridge_rest_api_all_auth_methods(false).

% t_kafka_bridge_rest_api_ssl(_CtConfig) ->
% kafka_bridge_rest_api_all_auth_methods(true).
t_kafka_bridge_rest_api_ssl(_CtConfig) ->
kafka_bridge_rest_api_all_auth_methods(true).

kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
emqx_logger:set_log_level(debug),
NormalHostsString =
case UseSSL of
true -> kafka_hosts_string_ssl();
false -> kafka_hosts_string()
end,
% SASLHostsString =
% case UseSSL of
% true -> kafka_hosts_string_ssl_sasl();
% false -> kafka_hosts_string_sasl()
% end,
SASLHostsString =
case UseSSL of
true -> kafka_hosts_string_ssl_sasl();
false -> kafka_hosts_string_sasl()
end,
BinifyMap = fun(Map) ->
maps:from_list([
{erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)}
Expand All @@ -210,7 +211,7 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
SSLSettings =
case UseSSL of
true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())};
false -> #{}
false -> #{<<"ssl">> => BinifyMap(#{"enable" => "false"})}
end,
kafka_bridge_rest_api_helper(
maps:merge(
Expand All @@ -221,42 +222,42 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
SSLSettings
)
),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_plain_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
% },
% SSLSettings
% )
% ),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_plain_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
},
SSLSettings
)
),
ok.

%% So that we can check if new atoms are created when they are not supposed to be created
Expand Down Expand Up @@ -328,11 +329,7 @@ kafka_bridge_rest_api_helper(Config) ->
}
}
},
CreateBody =
case maps:is_key(<<"ssl">>, Config) of
true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)};
false -> CreateBodyTmp
end,
CreateBody = CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)},
{ok, 201, _Data} = http_post(BridgesParts, CreateBody),
%% Check that the new bridge is in the list of bridges
true = MyKafkaBridgeExists(),
Expand Down

0 comments on commit 5f17a8f

Please sign in to comment.