Skip to content

Commit

Permalink
Merge pull request #11031 from paulozulato/fix-influx-wrong-username
Browse files Browse the repository at this point in the history
fix(influxdb): check authentication
  • Loading branch information
paulozulato committed Jun 14, 2023
2 parents 41004e8 + a573b9b commit 36d6350
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 14 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_bridge_influxdb/rebar.config
@@ -1,7 +1,7 @@
{erl_opts, [debug_info]}.

{deps, [
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.10"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
Expand Down
51 changes: 39 additions & 12 deletions apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl
Expand Up @@ -150,7 +150,7 @@ on_batch_query_async(
end.

on_get_status(_InstId, #{client := Client}) ->
case influxdb:is_alive(Client) of
case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
true ->
connected;
false ->
Expand Down Expand Up @@ -262,17 +262,32 @@ do_start_client(
{ok, Client} ->
case influxdb:is_alive(Client, true) of
true ->
State = #{
client => Client,
write_syntax => to_config(Lines, Precision)
},
?SLOG(info, #{
msg => "starting influxdb connector success",
connector => InstId,
client => redact_auth(Client),
state => redact_auth(State)
}),
{ok, State};
case influxdb:check_auth(Client) of
ok ->
State = #{
client => Client,
write_syntax => to_config(Lines, Precision)
},
?SLOG(info, #{
msg => "starting influxdb connector success",
connector => InstId,
client => redact_auth(Client),
state => redact_auth(State)
}),
{ok, State};
Error ->
?tp(influxdb_connector_start_failed, #{error => auth_error}),
?SLOG(warning, #{
msg => "failed_to_start_influxdb_connector",
error => Error,
connector => InstId,
client => redact_auth(Client),
reason => auth_error
}),
%% no leak
_ = influxdb:stop_client(Client),
{error, influxdb_client_auth_error}
end;
{false, Reason} ->
?tp(influxdb_connector_start_failed, #{
error => influxdb_client_not_alive, reason => Reason
Expand Down Expand Up @@ -388,6 +403,14 @@ do_query(InstId, Client, Points) ->
connector => InstId,
points => Points
});
{error, {401, _, _}} ->
?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
?SLOG(error, #{
msg => "influxdb_authorization_failed",
client => redact_auth(Client),
connector => InstId
}),
{error, {unrecoverable_error, <<"authorization failure">>}};
{error, Reason} = Err ->
?tp(influxdb_connector_do_query_failure, #{error => Reason}),
?SLOG(error, #{
Expand Down Expand Up @@ -421,6 +444,10 @@ reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end;
reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) ->
?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
reply_callback(ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).

Expand Down
128 changes: 128 additions & 0 deletions apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl
Expand Up @@ -1058,3 +1058,131 @@ t_missing_field(Config) ->
end
),
ok.

t_authentication_error(Config0) ->
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
InfluxConfig =
case InfluxDBType of
apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
?check_trace(
begin
?wait_async_action(
create_bridge(Config),
#{?snk_kind := influxdb_connector_start_failed},
10_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := auth_error} | _],
?of_kind(influxdb_connector_start_failed, Trace)
),
ok
end
),
ok.

t_authentication_error_on_get_status(Config0) ->
ResourceId = resource_id(Config0),

% Fake initialization to simulate credential update after bridge was created.
emqx_common_test_helpers:with_mock(
influxdb,
check_auth,
fun(_) ->
ok
end,
fun() ->
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
InfluxConfig =
case InfluxDBType of
apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
{ok, _} = create_bridge(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
end
),

% Now back to wrong credentials
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
ok.

t_authentication_error_on_send_message(Config0) ->
ResourceId = resource_id(Config0),
QueryMode = proplists:get_value(query_mode, Config0, sync),
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
InfluxConfig =
case InfluxDBType of
apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),

% Fake initialization to simulate credential update after bridge was created.
emqx_common_test_helpers:with_mock(
influxdb,
check_auth,
fun(_) ->
ok
end,
fun() ->
{ok, _} = create_bridge(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
end
),

% Now back to wrong credentials
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = #{
int_key => -123,
bool => true,
float_key => 24.5,
uint_key => 123
},
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
case QueryMode of
sync ->
?assertMatch(
{error, {unrecoverable_error, <<"authorization failure">>}},
send_message(Config, SentData)
);
async ->
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := <<"authorization failure">>} | _],
?of_kind(influxdb_connector_do_query_failure, Trace)
),
ok
end
)
end,
ok.
1 change: 1 addition & 0 deletions changes/ee/fix-11031.en.md
@@ -0,0 +1 @@
Fixed credential validation when creating bridge and checking status for InfluxDB Bridges.
2 changes: 1 addition & 1 deletion mix.exs
Expand Up @@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.9", override: true},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.10", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"},
Expand Down

0 comments on commit 36d6350

Please sign in to comment.