Skip to content

Commit

Permalink
Merge pull request #9571 from olcai/improve-mysql-bridge-test
Browse files Browse the repository at this point in the history
fix: infinite recursion in mysql connector and improve mysql EE bridge tests
  • Loading branch information
zmstone committed Dec 22, 2022
2 parents 0baeb68 + 13942f5 commit c678770
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 31 deletions.
22 changes: 19 additions & 3 deletions apps/emqx_connector/src/emqx_connector_mysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ on_query(
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
{error, not_prepared} ->
case prepare_sql(Prepares, PoolName) of
case maybe_prepare_sql(SQLOrKey2, Prepares, PoolName) of
ok ->
?tp(
mysql_connector_on_query_prepared_sql,
#{type_or_key => TypeOrKey, sql_or_key => SQLOrKey, params => Params}
),
%% not return result, next loop will try again
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
{error, Reason} ->
Expand Down Expand Up @@ -182,7 +186,7 @@ on_batch_query(
Request ->
LogMeta = #{connector => InstId, first_request => Request, state => State},
?SLOG(error, LogMeta#{msg => "invalid request"}),
{error, invald_request}
{error, invalid_request}
end.

mysql_function(sql) ->
Expand Down Expand Up @@ -256,6 +260,12 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
end
end.

maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
case maps:is_key(SQLOrKey, Prepares) of
true -> prepare_sql(Prepares, PoolName);
false -> {error, prepared_statement_invalid}
end.

prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
prepare_sql(maps:to_list(Prepares), PoolName);
prepare_sql(Prepares, PoolName) ->
Expand Down Expand Up @@ -305,6 +315,8 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList);
{error, Reason} ->
% FIXME: we should try to differ on transient failers and
% syntax failures. Retrying syntax failures is not very productive.
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
{error, Reason}
end.
Expand Down Expand Up @@ -407,7 +419,7 @@ on_sql_query(
{ok, Conn} = ecpool_worker:client(Worker),
?tp(
mysql_connector_send_query,
#{sql_or_key => SQLOrKey, data => Data}
#{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data}
),
try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
{error, disconnected} = Result ->
Expand All @@ -419,6 +431,10 @@ on_sql_query(
_ = exit(Conn, restart),
Result;
{error, not_prepared} = Error ->
?tp(
mysql_connector_prepare_query_failed,
#{error => not_prepared}
),
?SLOG(
warning,
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.13-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

- Fix shared subscription 'sticky' strategy [#9578](https://github.com/emqx/emqx/pull/9578).
Prior to this change, a 'sticky' subscriber may continue to receive messages after unsubscribing.

- Add check to ensure that a given key is among the prepared statements on query in the mysql connector [#9571](https://github.com/emqx/emqx/pull/9571).
2 changes: 2 additions & 0 deletions changes/v5.0.13-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

- 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)
在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。

- 增强 mysql 查询流程,确保给定的查询语句在 mysql 连接器的预编译语句中 [#9571](https://github.com/emqx/emqx/pull/9571)
182 changes: 154 additions & 28 deletions lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,59 +35,58 @@

all() ->
[
{group, with_batch},
{group, without_batch}
{group, tcp},
{group, tls}
].

groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
[
{with_batch, [
{group, sync_query}
{tcp, [
{group, with_batch},
{group, without_batch}
]},
{without_batch, [
{group, sync_query}
{tls, [
{group, with_batch},
{group, without_batch}
]},
{sync_query, [
{group, tcp},
{group, tls}
]},
{tcp, TCs},
{tls, TCs}
{with_batch, TCs -- NonBatchCases},
{without_batch, TCs}
].

init_per_group(tcp, Config0) ->
init_per_group(tcp, Config) ->
MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"),
MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
Config = [
[
{mysql_host, MysqlHost},
{mysql_port, MysqlPort},
{enable_tls, false},
{query_mode, sync},
{proxy_name, "mysql_tcp"}
| Config0
],
common_init(Config);
init_per_group(tls, Config0) ->
| Config
];
init_per_group(tls, Config) ->
MysqlHost = os:getenv("MYSQL_TLS_HOST", "toxiproxy"),
MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3307")),
Config = [
[
{mysql_host, MysqlHost},
{mysql_port, MysqlPort},
{enable_tls, true},
{query_mode, sync},
{proxy_name, "mysql_tls"}
| Config0
],
| Config
];
init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0],
common_init(Config);
init_per_group(without_batch, Config0) ->
Config = [{enable_batch, false} | Config0],
common_init(Config);
init_per_group(sync_query, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config) ->
[{enable_batch, true} | Config];
init_per_group(without_batch, Config) ->
[{enable_batch, false} | Config];
init_per_group(_Group, Config) ->
Config.

end_per_group(Group, Config) when Group =:= tcp; Group =:= tls ->
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
connect_and_drop_table(Config),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
Expand Down Expand Up @@ -224,6 +223,25 @@ send_message(Config, Payload) ->
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload).

query_resource(Config, Request) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request).

unprepare(Config, Key) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
{ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
[
begin
{ok, Conn} = ecpool_worker:client(Worker),
ok = mysql:unprepare(Conn, Key)
end
|| {_Name, Worker} <- ecpool:workers(PoolName)
].

% We need to create and drop the test table outside of using bridges
% since a bridge expects the table to exist when enabling it. We
% therefore call the mysql module directly, in addition to using it
Expand Down Expand Up @@ -392,3 +410,111 @@ t_write_failure(Config) ->
end
),
ok.

% This test doesn't work with batch enabled since it is not possible
% to set the timeout directly for batch queries
t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 10,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, _}},
query_resource(Config, {send_message, SentData, [], Timeout})
)
end),
ok.

t_simple_sql_query(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, batch_select_not_implemented}, Result);
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
end,
ok.

t_missing_data(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Result = send_message(Config, #{}),
case ?config(enable_batch, Config) of
true ->
?assertMatch(
{error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result
);
false ->
?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result)
end,
ok.

t_bad_sql_parameter(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, invalid_request}, Result);
false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result)
end,
ok.

t_unprepared_statement_query(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {prepared_query, unprepared_query, []},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, invalid_request}, Result);
false -> ?assertEqual({error, prepared_statement_invalid}, Result)
end,
ok.

%% Test doesn't work with batch enabled since batch doesn't use
%% prepared statements as such; it has its own query generation process
t_uninitialized_prepared_statement(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
unprepare(Config, send_message),
?check_trace(
begin
?assertEqual(ok, send_message(Config, SentData)),
ok
end,
fun(Trace) ->
?assert(
?strict_causality(
#{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared},
#{
?snk_kind := mysql_connector_on_query_prepared_sql,
type_or_key := send_message
},
Trace
)
),
SendQueryTrace = ?of_kind(mysql_connector_send_query, Trace),
?assertMatch([#{data := [Val, _]}, #{data := [Val, _]}], SendQueryTrace),
ReturnTrace = ?of_kind(mysql_connector_query_return, Trace),
?assertMatch([#{result := ok}], ReturnTrace),
ok
end
),
ok.

0 comments on commit c678770

Please sign in to comment.