Skip to content

Commit

Permalink
Merge pull request #12072 from killme2008/feature/greptimedb-async-write
Browse files Browse the repository at this point in the history
feat: implements async query mode for GreptimeDB data bridge
  • Loading branch information
HJianBo committed Dec 7, 2023
2 parents 9b612cb + 9920841 commit bd985e5
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .ci/docker-compose-file/docker-compose-greptimedb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
greptimedb:
container_name: greptimedb
hostname: greptimedb
image: greptime/greptimedb:0.3.2
image: greptime/greptimedb:v0.4.4
expose:
- "4000"
- "4001"
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_greptimedb/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.2"}}}
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}}
]}.
{plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2
]).
-export([reply_callback/2]).

-export([
roots/0,
Expand Down Expand Up @@ -57,7 +60,7 @@

%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> always_sync.
callback_mode() -> async_if_possible.

on_start(InstId, Config) ->
%% InstID as pool would be handled by greptimedb client
Expand Down Expand Up @@ -110,6 +113,49 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
{error, {unrecoverable_error, Reason}}
end.

on_query_async(
InstId,
{send_message, Data},
{ReplyFun, Args},
_State = #{write_syntax := SyntaxLines, client := Client}
) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => false, mode => async}
),
do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, ErrorPoints} = Err ->
?tp(
greptimedb_connector_send_query_error,
#{batch => false, mode => async, error => ErrorPoints}
),
log_error_points(InstId, ErrorPoints),
Err
end.

on_batch_query_async(
InstId,
BatchData,
{ReplyFun, Args},
#{write_syntax := SyntaxLines, client := Client}
) ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => true, mode => async}
),
do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, Reason} ->
?tp(
greptimedb_connector_send_query_error,
#{batch => true, mode => async, error => Reason}
),
{error, {unrecoverable_error, Reason}}
end.

on_get_status(_InstId, #{client := Client}) ->
case greptimedb:is_alive(Client) of
true ->
Expand Down Expand Up @@ -344,6 +390,31 @@ do_query(InstId, Client, Points) ->
end
end.

do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
?SLOG(info, #{
msg => "greptimedb_write_point_async",
connector => InstId,
points => Points
}),
WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).

reply_callback(ReplyFunAndArgs, {error, {unauth, _, _}}) ->
?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
case is_unrecoverable_error(Error) of
true ->
Result = {error, {unrecoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
false ->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end;
reply_callback(ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).

%% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans

Expand Down
90 changes: 80 additions & 10 deletions apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@ groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{with_batch, [
{group, sync_query}
{group, sync_query},
{group, async_query}
]},
{without_batch, [
{group, sync_query}
{group, sync_query},
{group, async_query}
]},
{sync_query, [
{group, grpcv1_tcp}
%% uncomment tls when we are ready
%% {group, grpcv1_tls}
]},
{async_query, [
{group, grpcv1_tcp}
%% uncomment tls when we are ready
%% {group, grpcv1_tls}
]},
{grpcv1_tcp, TCs}
%%{grpcv1_tls, TCs}
].
Expand Down Expand Up @@ -130,6 +137,8 @@ init_per_group(GreptimedbType, Config0) when
end;
init_per_group(sync_query, Config) ->
[{query_mode, sync} | Config];
init_per_group(async_query, Config) ->
[{query_mode, async} | Config];
init_per_group(with_batch, Config) ->
[{batch_size, 100} | Config];
init_per_group(without_batch, Config) ->
Expand Down Expand Up @@ -420,6 +429,9 @@ t_start_ok(Config) ->
?check_trace(
begin
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
end,
Expand Down Expand Up @@ -666,6 +678,9 @@ t_const_timestamp(Config) ->
<<"timestamp">> => erlang:system_time(millisecond)
},
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
end,
Expand Down Expand Up @@ -709,9 +724,12 @@ t_boolean_variants(Config) ->
},
case QueryMode of
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
?assertMatch({ok, _}, send_message(Config, SentData));
async ->
?assertMatch(ok, send_message(Config, SentData))
end,
case QueryMode of
async -> ct:sleep(500);
sync -> ok
end,
PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
Expand Down Expand Up @@ -779,11 +797,29 @@ t_bad_timestamp(Config) ->
#{?snk_kind := greptimedb_connector_send_query_error},
10_000
),
fun(Result, _Trace) ->
fun(Result, Trace) ->
?assertMatch({_, {ok, _}}, Result),
{Return, {ok, _}} = Result,
IsBatch = BatchSize > 1,
case {QueryMode, IsBatch} of
{async, true} ->
?assertEqual(ok, Return),
?assertMatch(
[#{error := points_trans_failed}],
?of_kind(greptimedb_connector_send_query_error, Trace)
);
{async, false} ->
?assertEqual(ok, Return),
?assertMatch(
[
#{
error := [
{error, {bad_timestamp, <<"bad_timestamp">>}}
]
}
],
?of_kind(greptimedb_connector_send_query_error, Trace)
);
{sync, false} ->
?assertEqual(
{error, [
Expand Down Expand Up @@ -907,17 +943,34 @@ t_write_failure(Config) ->
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData)
),
#{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
16_000
#{?snk_kind := handle_async_reply, action := nack},
1_000
);
async ->
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end
end),
fun(Trace) ->
fun(Trace0) ->
case QueryMode of
sync ->
?assertMatch(
[#{error := _} | _],
?of_kind(greptimedb_connector_do_query_failure, Trace)
Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
#{got => Result}
);
async ->
Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
#{got => Result}
)
end,
ok
Expand Down Expand Up @@ -1029,6 +1082,23 @@ t_authentication_error_on_send_message(Config0) ->
?assertMatch(
{error, {unrecoverable_error, <<"authorization failure">>}},
send_message(Config, SentData)
);
async ->
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := <<"authorization failure">>} | _],
?of_kind(greptimedb_connector_do_query_failure, Trace)
),
ok
end
)
end,
ok.
3 changes: 3 additions & 0 deletions changes/ee/feat-12072.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Supports async query mode for GreptimeDB data bridge. It provides better performance.


2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ defmodule EMQXUmbrella.MixProject do
{:crc32cer, "0.1.8", override: true},
{:supervisor3, "1.1.12", override: true},
{:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},
{:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.2", override: true},
{:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.6", override: true},
# The following two are dependencies of rabbit_common. They are needed here to
# make mix not complain about conflicting versions
{:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true},
Expand Down

0 comments on commit bd985e5

Please sign in to comment.