Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle escaped characters in InfluxDB data bridge write_syntax #10165

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/ee/feat-10165.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Support escaped special characters in InfluxDB data bridge write_syntax.
This update allows to use escaped special characters in string elements in accordance with InfluxDB line protocol.
184 changes: 141 additions & 43 deletions lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_influxdb).

-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
Expand Down Expand Up @@ -169,53 +170,150 @@ write_syntax(_) ->
undefined.

to_influx_lines(RawLines) ->
Lines = string:tokens(str(RawLines), "\n"),
lists:reverse(lists:foldl(fun converter_influx_line/2, [], Lines)).

converter_influx_line(Line, AccIn) ->
case string:tokens(str(Line), " ") of
[MeasurementAndTags, Fields, Timestamp] ->
append_influx_item(MeasurementAndTags, Fields, Timestamp, AccIn);
[MeasurementAndTags, Fields] ->
append_influx_item(MeasurementAndTags, Fields, undefined, AccIn);
_ ->
throw("Bad InfluxDB Line Protocol schema")
try
influx_lines(str(RawLines), [])
catch
_:Reason:Stacktrace ->
Msg = lists:flatten(
io_lib:format("Unable to parse InfluxDB line protocol: ~p", [RawLines])
),
?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}),
throw(Msg)
end.

append_influx_item(MeasurementAndTags, Fields, Timestamp, Acc) ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => Timestamp
}
| Acc
].
-define(MEASUREMENT_ESC_CHARS, [$,, $\s]).
-define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]).
-define(FIELD_VAL_ESC_CHARS, [$", $\\]).
% Common separator for both tags and fields
-define(SEP, $\s).
-define(MEASUREMENT_TAG_SEP, $,).
-define(KEY_SEP, $=).
-define(VAL_SEP, $,).
-define(NON_EMPTY, [_ | _]).

split_measurement_and_tags(Subject) ->
case string:tokens(Subject, ",") of
[] ->
throw("Bad Measurement schema");
[Measurement] ->
{Measurement, []};
[Measurement | Tags] ->
{Measurement, Tags}
end.
influx_lines([] = _RawLines, Acc) ->
?NON_EMPTY = lists:reverse(Acc);
influx_lines(RawLines, Acc) ->
{Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc),
influx_lines(RawLines1, Acc1).

kv_pairs(Pairs) ->
kv_pairs(Pairs, []).
kv_pairs([], Acc) ->
lists:reverse(Acc);
kv_pairs([Pair | Rest], Acc) ->
case string:tokens(Pair, "=") of
[K, V] ->
%% Reduplicated keys will be overwritten. Follows InfluxDB Line Protocol.
kv_pairs(Rest, [{K, V} | Acc]);
_ ->
throw(io_lib:format("Bad InfluxDB Line Protocol Key Value pair: ~p", Pair))
end.
influx_line([], Acc) ->
{Acc, []};
influx_line(Line, Acc) ->
{?NON_EMPTY = Measurement, Line1} = measurement(Line),
{Tags, Line2} = tags(Line1),
{?NON_EMPTY = Fields, Line3} = influx_fields(Line2),
{Timestamp, Line4} = timestamp(Line3),
{
[
#{
measurement => Measurement,
tags => Tags,
fields => Fields,
timestamp => Timestamp
}
| Acc
],
Line4
}.

measurement(Line) ->
unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []).

tags([?MEASUREMENT_TAG_SEP | Line]) ->
tags1(Line, []);
tags(Line) ->
{[], Line}.

%% Empty line is invalid as fields are required after tags,
%% need to break recursion here and fail later on parsing fields
tags1([] = Line, Acc) ->
{lists:reverse(Acc), Line};
%% Matching non empty Acc treats lines like "m, field=field_val" invalid
tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) ->
{lists:reverse(Acc), Line};
tags1(Line, Acc) ->
{Tag, Line1} = tag(Line),
tags1(Line1, [Tag | Acc]).

tag(Line) ->
{?NON_EMPTY = Key, Line1} = key(Line),
{?NON_EMPTY = Val, Line2} = tag_val(Line1),
{{Key, Val}, Line2}.

tag_val(Line) ->
{Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []),
{Val, strip_l(Line1, ?VAL_SEP)}.

influx_fields([?SEP | Line]) ->
fields1(string:trim(Line, leading, "\s"), []).

%% Timestamp is optional, so fields may be at the very end of the line
fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n ->
{lists:reverse(Acc), Line};
fields1([] = Line, Acc) ->
{lists:reverse(Acc), Line};
fields1(Line, Acc) ->
{Field, Line1} = field(Line),
fields1(Line1, [Field | Acc]).

field(Line) ->
{?NON_EMPTY = Key, Line1} = key(Line),
{Val, Line2} = field_val(Line1),
{{Key, Val}, Line2}.

field_val([$" | Line]) ->
{Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
%% Quoted val can be empty
{Val, strip_l(Line1, ?VAL_SEP)};
field_val(Line) ->
%% Unquoted value should not be un-escaped according to InfluxDB protocol,
%% as it can only hold float, integer, uinteger or boolean value.
%% However, as templates are possible, un-escaping is applied here,
%% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}"
{Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []),
{?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}.

timestamp([?SEP | Line]) ->
Line1 = string:trim(Line, leading, "\s"),
%% Similarly to unquoted field value, un-escape a timestamp to validate and handle
%% potentially escaped characters in a template
{T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []),
{timestamp1(T), Line2};
timestamp(Line) ->
{undefined, Line}.

timestamp1(?NON_EMPTY = Ts) -> Ts;
timestamp1(_Ts) -> undefined.

%% Common for both tag and field keys
key(Line) ->
{Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []),
{Key, strip_l(Line1, ?KEY_SEP)}.

%% Only strip a character between pairs, don't strip it(and let it fail)
%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val
strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP ->
[Ch1 | Str];
strip_l(Str, _Ch) ->
Str.

unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) ->
ShouldEscapeBackslash = lists:member($\\, EscapeChars),
Acc1 =
case lists:member(Char, EscapeChars) of
true -> [Char | Acc];
false when not ShouldEscapeBackslash -> [Char, $\\ | Acc]
end,
unescape(EscapeChars, SepChars, T, Acc1);
unescape(EscapeChars, SepChars, [Char | T] = L, Acc) ->
IsEscapeChar = lists:member(Char, EscapeChars),
case lists:member(Char, SepChars) of
true -> {lists:reverse(Acc), L};
false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc])
end;
unescape(_EscapeChars, _SepChars, [] = L, Acc) ->
{lists:reverse(Acc), L}.

str(A) when is_atom(A) ->
atom_to_list(A);
Expand Down