Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implements async query mode for GreptimeDB data bridge #12072

Merged
merged 10 commits into from
Dec 7, 2023
2 changes: 1 addition & 1 deletion .ci/docker-compose-file/docker-compose-greptimedb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
greptimedb:
container_name: greptimedb
hostname: greptimedb
image: greptime/greptimedb:0.3.2
image: greptime/greptimedb:v0.4.4
expose:
- "4000"
- "4001"
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_greptimedb/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.2"}}}
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}}
]}.
{plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2
]).
-export([reply_callback/2]).

-export([
roots/0,
Expand Down Expand Up @@ -57,7 +60,7 @@

%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> always_sync.
callback_mode() -> async_if_possible.

on_start(InstId, Config) ->
%% InstID as pool would be handled by greptimedb client
Expand Down Expand Up @@ -110,6 +113,49 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
{error, {unrecoverable_error, Reason}}
end.

on_query_async(
InstId,
{send_message, Data},
{ReplyFun, Args},
_State = #{write_syntax := SyntaxLines, client := Client}
) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => false, mode => async}
),
do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, ErrorPoints} = Err ->
?tp(
greptimedb_connector_send_query_error,
#{batch => false, mode => async, error => ErrorPoints}
),
log_error_points(InstId, ErrorPoints),
Err
end.

on_batch_query_async(
InstId,
BatchData,
{ReplyFun, Args},
#{write_syntax := SyntaxLines, client := Client}
) ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => true, mode => async}
),
do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, Reason} ->
?tp(
greptimedb_connector_send_query_error,
#{batch => true, mode => async, error => Reason}
),
{error, {unrecoverable_error, Reason}}
end.

on_get_status(_InstId, #{client := Client}) ->
case greptimedb:is_alive(Client) of
true ->
Expand Down Expand Up @@ -344,6 +390,31 @@ do_query(InstId, Client, Points) ->
end
end.

do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
?SLOG(info, #{
msg => "greptimedb_write_point_async",
connector => InstId,
points => Points
}),
WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).

reply_callback(ReplyFunAndArgs, {error, {unauth, _, _}}) ->
?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
case is_unrecoverable_error(Error) of
true ->
Result = {error, {unrecoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
false ->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end;
reply_callback(ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).

%% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans

Expand Down
90 changes: 80 additions & 10 deletions apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@ groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{with_batch, [
{group, sync_query}
{group, sync_query},
{group, async_query}
]},
{without_batch, [
{group, sync_query}
{group, sync_query},
{group, async_query}
]},
{sync_query, [
{group, grpcv1_tcp}
%% uncomment tls when we are ready
%% {group, grpcv1_tls}
]},
{async_query, [
{group, grpcv1_tcp}
%% uncomment tls when we are ready
%% {group, grpcv1_tls}
]},
{grpcv1_tcp, TCs}
%%{grpcv1_tls, TCs}
].
Expand Down Expand Up @@ -130,6 +137,8 @@ init_per_group(GreptimedbType, Config0) when
end;
init_per_group(sync_query, Config) ->
[{query_mode, sync} | Config];
init_per_group(async_query, Config) ->
[{query_mode, async} | Config];
init_per_group(with_batch, Config) ->
[{batch_size, 100} | Config];
init_per_group(without_batch, Config) ->
Expand Down Expand Up @@ -420,6 +429,9 @@ t_start_ok(Config) ->
?check_trace(
begin
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
end,
Expand Down Expand Up @@ -666,6 +678,9 @@ t_const_timestamp(Config) ->
<<"timestamp">> => erlang:system_time(millisecond)
},
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
end,
Expand Down Expand Up @@ -709,9 +724,12 @@ t_boolean_variants(Config) ->
},
case QueryMode of
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
?assertMatch({ok, _}, send_message(Config, SentData));
async ->
?assertMatch(ok, send_message(Config, SentData))
end,
case QueryMode of
async -> ct:sleep(500);
sync -> ok
end,
PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
Expand Down Expand Up @@ -779,11 +797,29 @@ t_bad_timestamp(Config) ->
#{?snk_kind := greptimedb_connector_send_query_error},
10_000
),
fun(Result, _Trace) ->
fun(Result, Trace) ->
?assertMatch({_, {ok, _}}, Result),
{Return, {ok, _}} = Result,
IsBatch = BatchSize > 1,
case {QueryMode, IsBatch} of
{async, true} ->
?assertEqual(ok, Return),
?assertMatch(
[#{error := points_trans_failed}],
?of_kind(greptimedb_connector_send_query_error, Trace)
);
{async, false} ->
?assertEqual(ok, Return),
?assertMatch(
[
#{
error := [
{error, {bad_timestamp, <<"bad_timestamp">>}}
]
}
],
?of_kind(greptimedb_connector_send_query_error, Trace)
);
{sync, false} ->
?assertEqual(
{error, [
Expand Down Expand Up @@ -907,17 +943,34 @@ t_write_failure(Config) ->
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData)
),
#{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
16_000
#{?snk_kind := handle_async_reply, action := nack},
1_000
);
async ->
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end
end),
fun(Trace) ->
fun(Trace0) ->
case QueryMode of
sync ->
?assertMatch(
[#{error := _} | _],
?of_kind(greptimedb_connector_do_query_failure, Trace)
Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
#{got => Result}
);
async ->
Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
#{got => Result}
)
end,
ok
Expand Down Expand Up @@ -1029,6 +1082,23 @@ t_authentication_error_on_send_message(Config0) ->
?assertMatch(
{error, {unrecoverable_error, <<"authorization failure">>}},
send_message(Config, SentData)
);
async ->
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := <<"authorization failure">>} | _],
?of_kind(greptimedb_connector_do_query_failure, Trace)
),
ok
end
)
end,
ok.
3 changes: 3 additions & 0 deletions changes/ee/feat-12072.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Supports async query mode for GreptimeDB data bridge. It provides better performance.


2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ defmodule EMQXUmbrella.MixProject do
{:crc32cer, "0.1.8", override: true},
{:supervisor3, "1.1.12", override: true},
{:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},
{:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.2", override: true},
{:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.6", override: true},
# The following two are dependencies of rabbit_common. They are needed here to
# make mix not complain about conflicting versions
{:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true},
Expand Down