Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: infinite recursion in mysql connector and improve mysql EE bridge tests #9571

Merged
merged 6 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 19 additions & 3 deletions apps/emqx_connector/src/emqx_connector_mysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ on_query(
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
{error, not_prepared} ->
case prepare_sql(Prepares, PoolName) of
case maybe_prepare_sql(SQLOrKey2, Prepares, PoolName) of
ok ->
?tp(
mysql_connector_on_query_prepared_sql,
#{type_or_key => TypeOrKey, sql_or_key => SQLOrKey, params => Params}
),
%% not return result, next loop will try again
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
{error, Reason} ->
Expand Down Expand Up @@ -182,7 +186,7 @@ on_batch_query(
Request ->
LogMeta = #{connector => InstId, first_request => Request, state => State},
?SLOG(error, LogMeta#{msg => "invalid request"}),
{error, invald_request}
{error, invalid_request}
end.

mysql_function(sql) ->
Expand Down Expand Up @@ -256,6 +260,12 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
end
end.

maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
case maps:is_key(SQLOrKey, Prepares) of
true -> prepare_sql(Prepares, PoolName);
false -> {error, prepared_statement_invalid}
end.

prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
prepare_sql(maps:to_list(Prepares), PoolName);
prepare_sql(Prepares, PoolName) ->
Expand Down Expand Up @@ -305,6 +315,8 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList);
{error, Reason} ->
% FIXME: we should try to differ on transient failers and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible to fix now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a shot at it but came to the conclusion that it might require a more thorough refactor. The code is hard to follow as-is and already has several layers of error coping mechanisms, and these should be unified. Created EMQX-8582 to track it.

% syntax failures. Retrying syntax failures is not very productive.
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
{error, Reason}
end.
Expand Down Expand Up @@ -407,7 +419,7 @@ on_sql_query(
{ok, Conn} = ecpool_worker:client(Worker),
?tp(
mysql_connector_send_query,
#{sql_or_key => SQLOrKey, data => Data}
#{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data}
),
try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
{error, disconnected} = Result ->
Expand All @@ -419,6 +431,10 @@ on_sql_query(
_ = exit(Conn, restart),
Result;
{error, not_prepared} = Error ->
?tp(
mysql_connector_prepare_query_failed,
#{error => not_prepared}
),
?SLOG(
warning,
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.13-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

- Fix shared subscription 'sticky' strategy [#9578](https://github.com/emqx/emqx/pull/9578).
Prior to this change, a 'sticky' subscriber may continue to receive messages after unsubscribing.

- Add check to ensure that a given key is among the prepared statements on query in the mysql connector [#9571](https://github.com/emqx/emqx/pull/9571).
2 changes: 2 additions & 0 deletions changes/v5.0.13-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

- 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)。
在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。

- 增强 mysql 查询流程,确保给定的查询语句在 mysql 连接器的预编译语句中 [#9571](https://github.com/emqx/emqx/pull/9571)。
olcai marked this conversation as resolved.
Show resolved Hide resolved
182 changes: 154 additions & 28 deletions lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,59 +35,58 @@

all() ->
[
{group, with_batch},
{group, without_batch}
{group, tcp},
{group, tls}
].

groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
[
{with_batch, [
{group, sync_query}
{tcp, [
{group, with_batch},
{group, without_batch}
]},
{without_batch, [
{group, sync_query}
{tls, [
{group, with_batch},
{group, without_batch}
]},
{sync_query, [
{group, tcp},
{group, tls}
]},
{tcp, TCs},
{tls, TCs}
{with_batch, TCs -- NonBatchCases},
{without_batch, TCs}
].

init_per_group(tcp, Config0) ->
init_per_group(tcp, Config) ->
MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"),
MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
Config = [
[
{mysql_host, MysqlHost},
{mysql_port, MysqlPort},
{enable_tls, false},
{query_mode, sync},
{proxy_name, "mysql_tcp"}
| Config0
],
common_init(Config);
init_per_group(tls, Config0) ->
| Config
];
init_per_group(tls, Config) ->
MysqlHost = os:getenv("MYSQL_TLS_HOST", "toxiproxy"),
MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3307")),
Config = [
[
{mysql_host, MysqlHost},
{mysql_port, MysqlPort},
{enable_tls, true},
{query_mode, sync},
{proxy_name, "mysql_tls"}
| Config0
],
| Config
];
init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0],
common_init(Config);
init_per_group(without_batch, Config0) ->
Config = [{enable_batch, false} | Config0],
common_init(Config);
init_per_group(sync_query, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config) ->
[{enable_batch, true} | Config];
init_per_group(without_batch, Config) ->
[{enable_batch, false} | Config];
init_per_group(_Group, Config) ->
Config.

end_per_group(Group, Config) when Group =:= tcp; Group =:= tls ->
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
connect_and_drop_table(Config),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
Expand Down Expand Up @@ -224,6 +223,25 @@ send_message(Config, Payload) ->
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload).

query_resource(Config, Request) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request).

unprepare(Config, Key) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
{ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
[
begin
{ok, Conn} = ecpool_worker:client(Worker),
ok = mysql:unprepare(Conn, Key)
end
|| {_Name, Worker} <- ecpool:workers(PoolName)
].

% We need to create and drop the test table outside of using bridges
% since a bridge expects the table to exist when enabling it. We
% therefore call the mysql module directly, in addition to using it
Expand Down Expand Up @@ -392,3 +410,111 @@ t_write_failure(Config) ->
end
),
ok.

% This test doesn't work with batch enabled since it is not possible
% to set the timeout directly for batch queries
t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 10,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, _}},
query_resource(Config, {send_message, SentData, [], Timeout})
)
end),
ok.

t_simple_sql_query(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, batch_select_not_implemented}, Result);
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
end,
ok.

t_missing_data(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Result = send_message(Config, #{}),
case ?config(enable_batch, Config) of
true ->
?assertMatch(
{error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result
);
false ->
?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result)
end,
ok.

t_bad_sql_parameter(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, invalid_request}, Result);
false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result)
end,
ok.

t_unprepared_statement_query(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {prepared_query, unprepared_query, []},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, invalid_request}, Result);
false -> ?assertEqual({error, prepared_statement_invalid}, Result)
end,
ok.

%% Test doesn't work with batch enabled since batch doesn't use
%% prepared statements as such; it has its own query generation process
t_uninitialized_prepared_statement(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
unprepare(Config, send_message),
?check_trace(
begin
?assertEqual(ok, send_message(Config, SentData)),
ok
end,
fun(Trace) ->
?assert(
?strict_causality(
#{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared},
#{
?snk_kind := mysql_connector_on_query_prepared_sql,
type_or_key := send_message
},
Trace
)
),
SendQueryTrace = ?of_kind(mysql_connector_send_query, Trace),
?assertMatch([#{data := [Val, _]}, #{data := [Val, _]}], SendQueryTrace),
ReturnTrace = ?of_kind(mysql_connector_query_return, Trace),
?assertMatch([#{result := ok}], ReturnTrace),
ok
end
),
ok.