Skip to content

Commit

Permalink
test(influxdb): write raw as float for all number value
Browse files Browse the repository at this point in the history
  • Loading branch information
JimMoen committed Jul 10, 2023
1 parent e9f1d7f commit e30bc6a
Showing 1 changed file with 63 additions and 14 deletions.
77 changes: 63 additions & 14 deletions apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -454,24 +454,26 @@ query_by_clientid(ClientId, Config) ->
{ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
DecodedCSV1 = [
[Field || Field <- Line, Field =/= <<>>]
|| Line <- DecodedCSV0,
Line =/= [<<>>]
|| Line <- DecodedCSV0, Line =/= [<<>>]
],
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []),
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1),
index_by_field(DecodedCSV2).

decode_csv(RawBody) ->
Lines =
[
binary:split(Line, [<<";">>], [global, trim_all])
|| Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all])
],
csv_lines_to_maps(Lines, []).
csv_lines_to_maps([Title | Rest]) ->
csv_lines_to_maps(Rest, Title, _Acc = []);
csv_lines_to_maps([]) ->
[].

csv_lines_to_maps([Fields, Data | Rest], Acc) ->
Map = maps:from_list(lists:zip(Fields, Data)),
csv_lines_to_maps(Rest, [Map | Acc]);
csv_lines_to_maps(_Data, Acc) ->
csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) ->
Map = maps:from_list(lists:zip(Title, Data)),
csv_lines_to_maps(RestData, Title, [Map | Acc]);
%% ignore the csv title line
%% it's always like this:
%% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>,
%% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement],
csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) ->
csv_lines_to_maps(RestData, Title, Acc);
csv_lines_to_maps([], _Title, Acc) ->
lists:reverse(Acc).

index_by_field(DecodedCSV) ->
Expand Down Expand Up @@ -768,6 +770,53 @@ t_boolean_variants(Config) ->
),
ok.

t_any_num_as_float(Config) ->
QueryMode = ?config(query_mode, Config),
Const = erlang:system_time(nanosecond),
ConstBin = integer_to_binary(Const),
TsStr = iolist_to_binary(
calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
),
?assertMatch(
{ok, _},
create_bridge(
Config,
#{
<<"write_syntax">> =>
<<"mqtt,clientid=${clientid}", " ",
"float_no_dp=${payload.float_no_dp},float_dp=${payload.float_dp},bar=5i ",
ConstBin/binary>>
}
)
),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = #{
%% no decimal point
float_no_dp => 123,
%% with decimal point
float_dp => 123.0
},
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
case QueryMode of
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData)),
ok;
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500)
end,
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>},
assert_persisted_data(ClientId, Expected, PersistedData),
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"float_no_dp">>, PersistedData)),
TimeReturned = pad_zero(TimeReturned0),
?assertEqual(TsStr, TimeReturned).

t_bad_timestamp(Config) ->
InfluxDBType = ?config(influxdb_type, Config),
InfluxDBName = ?config(influxdb_name, Config),
Expand Down

0 comments on commit e30bc6a

Please sign in to comment.