Skip to content

Commit

Permalink
fix(oracle): drop support for async queries
Browse files Browse the repository at this point in the history
jamdb_oracle does not provide interface for performing async queries and
ecpool does not monitor the worker which calls jamdb_oracle, so it's
safer to keep support for sync queries only.
  • Loading branch information
paulozulato committed Apr 27, 2023
1 parent dd90b2f commit 43bb6f0
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 151 deletions.
80 changes: 0 additions & 80 deletions apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,6 @@ create_rule_and_action_http(Config) ->
%% Testcases
%%------------------------------------------------------------------------------

% Under normal operations, the bridge will be called async via
% `simple_async_query'.
t_sync_query(Config) ->
ResourceId = resource_id(Config),
?check_trace(
Expand Down Expand Up @@ -360,48 +358,6 @@ t_sync_query(Config) ->
),
ok.

t_async_query(Config) ->
Overrides = #{
<<"resource_opts">> => #{
<<"enable_batch">> => <<"false">>,
<<"batch_size">> => 1
}
},
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config, Overrides)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => false
},
Message = {send_message, Params},
?assertMatch(
{
ok,
{ok, #{result := {ok, [{affected_rows, 1}]}}}
},
?wait_async_action(
emqx_resource:query(ResourceId, Message),
#{?snk_kind := oracle_query},
5_000
)
),
ok
end,
[]
),
ok.

t_batch_sync_query(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
Expand Down Expand Up @@ -449,42 +405,6 @@ t_batch_sync_query(Config) ->
),
ok.

t_batch_async_query(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => false
},
Message = {send_message, Params},
?assertMatch(
{
ok,
{ok, #{result := {ok, [{affected_rows, 1}]}}}
},
?wait_async_action(
emqx_resource:query(ResourceId, Message),
#{?snk_kind := oracle_batch_query},
5_000
)
),
ok
end,
[]
),
ok.

t_create_via_http(Config) ->
?check_trace(
begin
Expand Down
75 changes: 4 additions & 71 deletions apps/emqx_oracle/src/emqx_oracle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2
]).

Expand All @@ -35,7 +33,6 @@
-export([
query/3,
execute_batch/3,
do_async_reply/2,
do_get_status/1
]).

Expand All @@ -46,7 +43,6 @@
-define(ACTION_SEND_MESSAGE, send_message).

-define(SYNC_QUERY_MODE, no_handover).
-define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}).

-define(ORACLE_HOST_OPTIONS, #{
default_port => ?ORACLE_DEFAULT_PORT
Expand All @@ -67,7 +63,10 @@
batch_params_tokens := params_tokens()
}.

callback_mode() -> async_if_possible.
% As ecpool is not monitoring the worker's PID when doing a handover_async, the
% request can be lost if worker crashes. Thus, it's better to force requests to
% be sync for now.
callback_mode() -> always_sync.

is_buffer_supported() -> false.

Expand Down Expand Up @@ -147,24 +146,6 @@ on_query(
Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data),
handle_result(Res).

on_query_async(InstId, {TypeOrKey, NameOrSQL}, Reply, State) ->
on_query_async(InstId, {TypeOrKey, NameOrSQL, []}, Reply, State);
on_query_async(
InstId, {TypeOrKey, NameOrSQL, Params} = Query, Reply, #{pool_name := PoolName} = State
) ->
?SLOG(debug, #{
msg => "oracle database connector received async sql query",
connector => InstId,
query => Query,
reply => Reply,
state => State
}),
ApplyMode = ?ASYNC_QUERY_MODE(Reply),
Type = query,
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL2, Data),
handle_result(Res).

on_batch_query(
InstId,
BatchReq,
Expand Down Expand Up @@ -207,51 +188,6 @@ on_batch_query(
{error, {unrecoverable_error, invalid_request}}
end.

on_batch_query_async(
InstId,
BatchReq,
Reply,
#{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
) ->
case BatchReq of
[{Key, _} = Request | _] ->
BinKey = to_bin(Key),
case maps:get(BinKey, Tokens, undefined) of
undefined ->
Log = #{
connector => InstId,
first_request => Request,
state => State,
msg => "batch prepare not implemented"
},
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
case
on_sql_query(
InstId, PoolName, execute_batch, ?ASYNC_QUERY_MODE(Reply), St, Datas2
)
of
{ok, Results} ->
handle_batch_result(Results, 0);
Result ->
Result
end
end;
_ ->
Log = #{
connector => InstId,
request => BatchReq,
state => State,
msg => "invalid request"
},
?SLOG(error, Log),
{error, {unrecoverable_error, invalid_request}}
end.

proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{
Expand Down Expand Up @@ -429,6 +365,3 @@ handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) ->
{error, {unrecoverable_error, {RetCode, Reason}}};
handle_batch_result([], Acc) ->
{ok, Acc}.

do_async_reply(Result, {ReplyFun, [Context]}) ->
ReplyFun(Context, Result).

0 comments on commit 43bb6f0

Please sign in to comment.