Skip to content

Commit

Permalink
fix(kafka): fix result handling when sending message with invalid header
Browse files Browse the repository at this point in the history
  • Loading branch information
paulozulato committed Aug 24, 2023
1 parent 3aa1555 commit 60e6217
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 28 deletions.
107 changes: 79 additions & 28 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ on_stop(InstanceId, _State) ->
ok.

on_query(
_InstId,
InstId,
{send_message, Message},
#{
message_template := Template,
Expand All @@ -229,19 +229,34 @@ on_query(
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_sync_query,
#{headers_config => KafkaHeaders, instance_id => _InstId}
),
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
ok
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_sync_query,
#{headers_config => KafkaHeaders, instance_id => InstId}
),
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
catch
error:{producer_down, _} = Reason ->
{error, Reason};
error:timeout ->
{error, timeout}
throw:{bad_kafka_header, _} = Error ->
?tp(
emqx_bridge_kafka_impl_producer_sync_query_failed,
#{
headers_config => KafkaHeaders,
instance_id => InstId,
reason => Error
}
),
{error, {unrecoverable_error, Error}};
throw:{bad_kafka_headers, _} = Error ->
?tp(
emqx_bridge_kafka_impl_producer_sync_query_failed,
#{
headers_config => KafkaHeaders,
instance_id => InstId,
reason => Error
}
),
{error, {unrecoverable_error, Error}}
end.

%% @doc The callback API for rule-engine (or bridge without rules)
Expand All @@ -251,7 +266,7 @@ on_query(
%% E.g. the output of rule-engine process chain
%% or the direct mapping from an MQTT message.
on_query_async(
_InstId,
InstId,
{send_message, Message},
AsyncReplyFn,
#{
Expand All @@ -267,21 +282,35 @@ on_query_async(
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_async_query,
#{headers_config => KafkaHeaders, instance_id => _InstId}
),
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes
%% for counters and gauges.
Batch = [KafkaMessage],
%% The retuned information is discarded here.
%% If the producer process is down when sending, this function would
%% raise an error exception which is to be caught by the caller of this callback
{_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
%% this Pid is so far never used because Kafka producer is by-passing the buffer worker
{ok, Pid}.
try
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_async_query,
#{headers_config => KafkaHeaders, instance_id => InstId}
),
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
catch
throw:{bad_kafka_header, _} = Error ->
?tp(
emqx_bridge_kafka_impl_producer_async_query_failed,
#{
headers_config => KafkaHeaders,
instance_id => InstId,
reason => Error
}
),
{error, {unrecoverable_error, Error}};
throw:{bad_kafka_headers, _} = Error ->
?tp(
emqx_bridge_kafka_impl_producer_async_query_failed,
#{
headers_config => KafkaHeaders,
instance_id => InstId,
reason => Error
}
),
{error, {unrecoverable_error, Error}}
end.

compile_message_template(T) ->
KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
Expand Down Expand Up @@ -337,6 +366,28 @@ render_timestamp(Template, Message) ->
erlang:system_time(millisecond)
end.

do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
ok
catch
error:{producer_down, _} = Reason ->
{error, Reason};
error:timeout ->
{error, timeout}
end;
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes
%% for counters and gauges.
Batch = [KafkaMessage],
%% The retuned information is discarded here.
%% If the producer process is down when sending, this function would
%% raise an error exception which is to be caught by the caller of this callback
{_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
%% this Pid is so far never used because Kafka producer is by-passing the buffer worker
{ok, Pid}.

%% Wolff producer never gives up retrying
%% so there can only be 'ok' results.
on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,90 @@ t_wrong_headers(_Config) ->
),
ok.

t_wrong_headers_from_message(Config) ->
HostsString = kafka_hosts_string_sasl(),
AuthSettings = valid_sasl_plain_settings(),
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = "test-topic-one-partition",
Conf = config_with_headers(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"kafka_headers" => <<"${payload}">>,
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
}),
{ok, #{config := ConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ConfigAtom = ConfigAtom1#{bridge_name => Name},
{ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
Time1 = erlang:unique_integer(),
Payload1 = <<"wrong_header">>,
Msg1 = #{
clientid => integer_to_binary(Time1),
payload => Payload1,
timestamp => Time1
},
?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_headers, Payload1}}}},
send(Config, ResourceId, Msg1, State)
),
Time2 = erlang:unique_integer(),
Payload2 = <<"[{\"foo\":\"bar\"}, {\"foo2\":\"bar2\"}]">>,
Msg2 = #{
clientid => integer_to_binary(Time2),
payload => Payload2,
timestamp => Time2
},
?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}},
send(Config, ResourceId, Msg2, State)
),
Time3 = erlang:unique_integer(),
Payload3 = <<"[{\"key\":\"foo\"}, {\"value\":\"bar\"}]">>,
Msg3 = #{
clientid => integer_to_binary(Time3),
payload => Payload3,
timestamp => Time3
},
?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}},
send(Config, ResourceId, Msg3, State)
),
Time4 = erlang:unique_integer(),
Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>,
Msg4 = #{
clientid => integer_to_binary(Time4),
payload => Payload4,
timestamp => Time4
},
?assertError(
{badmatch,
{error,
{unrecoverable_error,
{bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}},
send(Config, ResourceId, Msg4, State)
),
%% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok = emqx_bridge_resource:remove(BridgeId),
delete_all_bridges(),
ok.

%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions changes/ee/fix-11508.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix message error handling on Kafka bridge when headers translate to an invalid value.

0 comments on commit 60e6217

Please sign in to comment.