Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: influxdb float serialization error #11223

Merged
merged 2 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -638,8 +638,10 @@ value_type([UInt, <<"u">>]) when
is_integer(UInt)
->
{uint, UInt};
value_type([Float]) when is_float(Float) ->
Float;
%% write `1`, `1.0`, `-1.0` all as float
%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float
value_type([Number]) when is_number(Number) ->
Number;
savonarola marked this conversation as resolved.
Show resolved Hide resolved
value_type([<<"t">>]) ->
't';
value_type([<<"T">>]) ->
Expand Down
77 changes: 63 additions & 14 deletions apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -454,24 +454,26 @@ query_by_clientid(ClientId, Config) ->
{ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
DecodedCSV1 = [
[Field || Field <- Line, Field =/= <<>>]
|| Line <- DecodedCSV0,
Line =/= [<<>>]
|| Line <- DecodedCSV0, Line =/= [<<>>]
],
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []),
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1),
index_by_field(DecodedCSV2).

decode_csv(RawBody) ->
Lines =
[
binary:split(Line, [<<";">>], [global, trim_all])
|| Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all])
],
csv_lines_to_maps(Lines, []).
csv_lines_to_maps([Title | Rest]) ->
csv_lines_to_maps(Rest, Title, _Acc = []);
csv_lines_to_maps([]) ->
[].

csv_lines_to_maps([Fields, Data | Rest], Acc) ->
Map = maps:from_list(lists:zip(Fields, Data)),
csv_lines_to_maps(Rest, [Map | Acc]);
csv_lines_to_maps(_Data, Acc) ->
csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) ->
Map = maps:from_list(lists:zip(Title, Data)),
csv_lines_to_maps(RestData, Title, [Map | Acc]);
%% ignore the csv title line
%% it's always like this:
%% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>,
%% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement],
csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) ->
csv_lines_to_maps(RestData, Title, Acc);
csv_lines_to_maps([], _Title, Acc) ->
lists:reverse(Acc).

index_by_field(DecodedCSV) ->
Expand Down Expand Up @@ -768,6 +770,53 @@ t_boolean_variants(Config) ->
),
ok.

t_any_num_as_float(Config) ->
QueryMode = ?config(query_mode, Config),
Const = erlang:system_time(nanosecond),
ConstBin = integer_to_binary(Const),
TsStr = iolist_to_binary(
calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
),
?assertMatch(
{ok, _},
create_bridge(
Config,
#{
<<"write_syntax">> =>
<<"mqtt,clientid=${clientid}", " ",
"float_no_dp=${payload.float_no_dp},float_dp=${payload.float_dp},bar=5i ",
ConstBin/binary>>
}
)
),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = #{
%% no decimal point
float_no_dp => 123,
%% with decimal point
float_dp => 123.0
},
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
case QueryMode of
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData)),
ok;
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500)
end,
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>},
assert_persisted_data(ClientId, Expected, PersistedData),
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"float_no_dp">>, PersistedData)),
TimeReturned = pad_zero(TimeReturned0),
?assertEqual(TsStr, TimeReturned).

t_bad_timestamp(Config) ->
InfluxDBType = ?config(influxdb_type, Config),
InfluxDBName = ?config(influxdb_name, Config),
Expand Down
5 changes: 5 additions & 0 deletions changes/ee/fix-11223.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
In InfluxDB bridging, if intend to write using the float data type but the placeholder represents the original value
as an integer without a decimal point during serialization, it will result in the failure of Influx Line Protocol serialization
and the inability to write to the InfluxDB bridge.

See also: [InfluxDB v2.7 Line-Protocol](https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float)