Skip to content

Commit

Permalink
Merge pull request #10997 from kjellwinblad/kjell/fix/clickhouse_lost…
Browse files Browse the repository at this point in the history
…_messages/EMQX-10217

fix: lost messages when clickhouse closes while sending messages
  • Loading branch information
kjellwinblad committed Jun 9, 2023
2 parents b930f4c + 1b3af2a commit 43292c0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,14 @@ execute_sql_in_clickhouse_server_using_connection(Connection, SQL) ->

%% This function transforms the result received from clickhouse to something
%% that is a little bit more readable and creates approprieate log messages
transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) ->
transform_and_log_clickhouse_result({ok, ResponseCode, <<"">>} = _ClickhouseResult, _, _) when
ResponseCode =:= 200; ResponseCode =:= 204
->
snabbkaffe_log_return(ok),
ok;
transform_and_log_clickhouse_result({ok, 200, Data}, _, _) ->
transform_and_log_clickhouse_result({ok, ResponseCode, Data}, _, _) when
ResponseCode =:= 200; ResponseCode =:= 204
->
Result = {ok, Data},
snabbkaffe_log_return(Result),
Result;
Expand All @@ -464,13 +468,58 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
sql => SQL,
reason => ClickhouseErrorResult
}),
case ClickhouseErrorResult of
{error, ecpool_empty} ->
{error, {recoverable_error, ecpool_empty}};
_ ->
{error, ClickhouseErrorResult}
case is_recoverable_error(ClickhouseErrorResult) of
%% TODO: The hackeny errors that the clickhouse library forwards are
%% very loosely defined. We should try to make sure that the following
%% handles all error cases that we need to handle as recoverable_error
true ->
?SLOG(warning, #{
msg => "clickhouse connector: sql query failed (recoverable)",
recoverable_error => true,
connector => ResourceID,
sql => SQL,
reason => ClickhouseErrorResult
}),
to_recoverable_error(ClickhouseErrorResult);
false ->
?SLOG(error, #{
msg => "clickhouse connector: sql query failed (unrecoverable)",
recoverable_error => false,
connector => ResourceID,
sql => SQL,
reason => ClickhouseErrorResult
}),
to_error_tuple(ClickhouseErrorResult)
end.

to_recoverable_error({error, Reason}) ->
{error, {recoverable_error, Reason}};
to_recoverable_error(Error) ->
{error, {recoverable_error, Error}}.

to_error_tuple({error, Reason}) ->
{error, {unrecoverable_error, Reason}};
to_error_tuple(Error) ->
{error, {unrecoverable_error, Error}}.

is_recoverable_error({error, Reason}) ->
is_recoverable_error_reason(Reason);
is_recoverable_error(_) ->
false.

is_recoverable_error_reason(ecpool_empty) ->
true;
is_recoverable_error_reason(econnrefused) ->
true;
is_recoverable_error_reason(closed) ->
true;
is_recoverable_error_reason({closed, _PartialBody}) ->
true;
is_recoverable_error_reason(disconnected) ->
true;
is_recoverable_error_reason(_) ->
false.

snabbkaffe_log_return(_Result) ->
?tp(
clickhouse_connector_query_return,
Expand Down
1 change: 1 addition & 0 deletions changes/ee/fix-10997.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The ClickHouse bridge had a problem that could cause messages to be dropped when the ClickHouse server is closed while sending messages even when the request_ttl is set to infinity. This has been fixed by treating errors due to a closed connection as recoverable errors.

0 comments on commit 43292c0

Please sign in to comment.