Skip to content
Permalink
Browse files

Fix SQL where clause cannot handle integers

  • Loading branch information...
terry-xiaoyu committed Apr 24, 2019
1 parent e3bf5b4 commit c9c7616f86019657861dff408854e9c5238d666b
Showing with 143 additions and 51 deletions.
  1. +75 −28 src/emqx_rule_runtime.erl
  2. +68 −23 src/emqx_rule_sqlparser.erl
@@ -128,32 +128,29 @@ apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules([Rule = #rule{name = Name, for = 'message.publish', topics = Filters}|More], Input) ->
Topic = get_value(topic, Input),
try match_topic(Topic, Filters)
andalso apply_publish_rule(Rule, Input)
andalso apply_rule(Rule, Input)
catch
_:Error:StkTrace ->
?LOG(error, "Apply 'message.publish' rule ~s error: ~p. Statcktrace:~n~p",
?LOG(error, "Apply message.publish rule ~s error: ~p. Statcktrace:~n~p",
[Name, Error, StkTrace])
end,
apply_rules(More, Input);
apply_rules([Rule = #rule{name = Name, for = Hooks}|More], Input) ->
try apply_envent_rule(Rule, Input)
apply_rules([Rule = #rule{name = Name}|More], Input) ->
try apply_rule(Rule, Input)
catch
_:Error:StkTrace ->
?LOG(error, "Apply the ~s rule ~s error: ~p. Statcktrace:~n~p",
[Name, Hooks, Error, StkTrace])
?LOG(error, "Apply rule ~s error: ~p. Statcktrace:~n~p",
[Name, Error, StkTrace])
end,
apply_rules(More, Input).

apply_publish_rule(#rule{selects = Selects,
conditions = Conditions,
actions = Actions}, Input) ->
Selected = select_and_transform(Selects, Input),
apply_rule(#rule{selects = Selects,
conditions = Conditions,
actions = Actions}, Input) ->
Selected = select_and_transform(Selects, rule_input(Input)),
match_conditions(Conditions, Selected)
andalso take_actions(Actions, Selected, Input).

apply_envent_rule(#rule{actions = Actions}, Input) ->
take_actions(Actions, undefined, Input).

%% Step1 -> Match topic with filters
match_topic(_Topic, []) ->
true;
@@ -184,32 +181,42 @@ match_conditions({'and', L, R}, Data) ->
match_conditions(L, Data) andalso match_conditions(R, Data);
match_conditions({'or', L, R}, Data) ->
match_conditions(L, Data) orelse match_conditions(R, Data);
match_conditions({'=', L, R}, Data) ->
eval(L, Data) == eval(R, Data);
match_conditions({'>', L, R}, Data) ->
eval(L, Data) > eval(R, Data);
match_conditions({'<', L, R}, Data) ->
eval(L, Data) < eval(R, Data);
match_conditions({'<=', L, R}, Data) ->
eval(L, Data) =< eval(R, Data);
match_conditions({'>=', L, R}, Data) ->
eval(L, Data) >= eval(R, Data);
match_conditions({NotEq, L, R}, Data)
when NotEq =:= '<>'; NotEq =:= '!=' ->
eval(L, Data) =/= eval(R, Data);
match_conditions({'not', Var}, Data) ->
case eval(Var, Data) of
Bool when is_boolean(Bool) ->
not Bool;
_other -> false
end;
%%match_conditions({'like', Var, Pattern}, Data) ->
%% match_like(eval(Var, Data), Pattern);
match_conditions({in, Var, {list, Vals}}, Data) ->
lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]);
match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
compare(Op, eval(L, Data), eval(R, Data));
%%match_conditions({'like', Var, Pattern}, Data) ->
%% match_like(eval(Var, Data), Pattern);
match_conditions({}, _Data) ->
true.

%% comparing numbers against strings
compare(Op, L, R) when is_number(L), is_binary(R) ->
do_compare(Op, L, number(R));
compare(Op, L, R) when is_binary(L), is_number(R) ->
do_compare(Op, number(L), R);
compare(Op, L, R) ->
do_compare(Op, L, R).

do_compare('=', L, R) -> L == R;
do_compare('>', L, R) -> L > R;
do_compare('<', L, R) -> L < R;
do_compare('<=', L, R) -> L =< R;
do_compare('>=', L, R) -> L >= R;
do_compare('<>', L, R) -> L /= R;
do_compare('!=', L, R) -> L /= R.

number(Bin) ->
try binary_to_integer(Bin)
catch error:badarg -> binary_to_float(Bin)
end.

%% Step4 -> Take actions
take_actions(Actions, Selected, Envs) ->
lists:foreach(fun(Action) -> take_action(Action, Selected, Envs) end, Actions).
@@ -286,3 +293,43 @@ stop(_Env) ->
emqx:unhook('message.deliver', fun ?MODULE:on_message_deliver/3),
emqx:unhook('message.acked', fun ?MODULE:on_message_acked/3).

%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------

rule_input(Input) ->
rule_input(Input, #{}).
rule_input(Input = #{id := Id}, Result) ->
rule_input(maps:remove(id, Input),
Result#{id => emqx_guid:to_hexstr(Id)});
rule_input(Input = #{from := From}, Result) ->
rule_input(maps:remove(from, Input),
Result#{client_id => From});
rule_input(Input = #{headers := Headers}, Result) ->
Username = maps:get(username, Headers, null),
Peername = peername(maps:get(peername, Headers, undefined)),
rule_input(maps:remove(headers, Input),
maps:merge(Result, #{username => Username,
peername => Peername}));
rule_input(Input = #{timestamp := Timestamp}, Result) ->
rule_input(maps:remove(timestamp, Input),
Result#{timestamp => emqx_time:now_ms(Timestamp)});
rule_input(Input = #{peername := Peername}, Result) ->
rule_input(maps:remove(peername, Input),
Result#{peername => peername(Peername)});
rule_input(Input = #{connattrs := Conn}, Result) ->
ConnAt = maps:get(connected_at, Conn, null),
rule_input(maps:remove(connattrs, Input),
maps:merge(Result, #{connected_at => emqx_time:now_ms(ConnAt),
clean_start => maps:get(clean_start, Conn, null),
is_bridge => maps:get(is_bridge, Conn, null),
keepalive => maps:get(keepalive, Conn, null),
proto_ver => maps:get(proto_ver, Conn, null)
}));
rule_input(Input, Result) ->
maps:merge(Result, Input).

peername(undefined) ->
null;
peername({IPAddr, Port}) ->
list_to_binary(inet:ntoa(IPAddr) ++ ":" ++ integer_to_list(Port)).
@@ -65,39 +65,75 @@ select_where(#select{where = Where}) ->
Where.

preprocess(#select{fields = Fields, from = Topics, where = Conditions}) ->
#select{fields = [preprocess_field(Field) || Field <- Fields],
Selected = [preprocess_field(Field) || Field <- Fields],
#select{fields = Selected,
from = [unquote(Topic) || Topic <- Topics],
where = preprocess_condition(Conditions)}.
where = preprocess_condition(Conditions, Selected)}.

preprocess_field(<<"*">>) ->
'*';
preprocess_field({'as', Field, Alias}) when is_binary(Alias) ->
{'as', transform_field(Field), transform_alias(Alias)};
{'as', transform_non_const_field(Field), transform_alias(Alias)};
preprocess_field(Field) ->
transform_field(Field).

preprocess_condition({Op, L, R}) when ?is_logical(Op) orelse ?is_comp(Op) ->
{Op, preprocess_condition(L), preprocess_condition(R)};
preprocess_condition({in, Field, {list, Vals}}) ->
{in, transform_field(Field), {list, [transform_field(Val) || Val <- Vals]}};
preprocess_condition({'not', X}) ->
{'not', preprocess_condition(X)};
preprocess_condition({}) ->
transform_non_const_field(Field).

preprocess_condition({Op, L, R}, Selected) when ?is_logical(Op) orelse ?is_comp(Op) ->
{Op, preprocess_condition(L, Selected), preprocess_condition(R, Selected)};
preprocess_condition({in, Field, {list, Vals}}, Selected) ->
{in, transform_field(Field, Selected), {list, [transform_field(Val, Selected) || Val <- Vals]}};
preprocess_condition({'not', X}, Selected) ->
{'not', preprocess_condition(X, Selected)};
preprocess_condition({}, _Selected) ->
{};
preprocess_condition(Field) ->
transform_field(Field).
preprocess_condition(Field, Selected) ->
transform_field(Field, Selected).

transform_field({const, Val}) when is_number(Val); is_binary(Val) ->
transform_field({const, Val}, _Selected) ->
{const, Val};
transform_field(<<"payload.", Attr/binary>>) ->
transform_field(<<Q, Val/binary>>, _Selected) when Q =:= $'; Q =:= $" ->
{const, binary:part(Val, {0, byte_size(Val)-1})};
transform_field(Val, Selected) ->
case is_selected_field(Val, Selected) of
false -> {const, Val};
true -> do_transform_field(Val, Selected)
end.
do_transform_field(<<"payload.", Attr/binary>>, _Selected) ->
{payload, parse_nested(Attr)};
transform_field(Var) when is_binary(Var) ->
{var, parse_nested(unquote(Var))};
transform_field({Op, Arg1, Arg2}) when ?is_arith(Op) ->
{Op, transform_field(Arg1), transform_field(Arg2)};
transform_field({'fun', Name, Args}) when is_binary(Name) ->
do_transform_field({Op, Arg1, Arg2}, Selected) when ?is_arith(Op) ->
{Op, transform_field(Arg1, Selected), transform_field(Arg2, Selected)};
do_transform_field(Var, _Selected) when is_binary(Var) ->
{var, parse_nested(Var)};
do_transform_field({'fun', Name, Args}, Selected) when is_binary(Name) ->
Fun = list_to_existing_atom(binary_to_list(Name)),
{'fun', Fun, [transform_field(Arg) || Arg <- Args]}.
{'fun', Fun, [transform_field(Arg, Selected) || Arg <- Args]}.

transform_non_const_field(<<"payload.", Attr/binary>>) ->
{payload, parse_nested(Attr)};
transform_non_const_field({Op, Arg1, Arg2}) when ?is_arith(Op) ->
{Op, transform_non_const_field(Arg1), transform_non_const_field(Arg2)};
transform_non_const_field(Var) when is_binary(Var) ->
{var, parse_nested(Var)};
transform_non_const_field({'fun', Name, Args}) when is_binary(Name) ->
Fun = list_to_existing_atom(binary_to_list(Name)),
{'fun', Fun, [transform_non_const_field(Arg) || Arg <- Args]}.

is_selected_field(_Val, []) ->
false;
is_selected_field(_Val, ['*' | _Selected]) ->
true;
is_selected_field(Val, [{as, _, Val} | _Selected]) ->
true;
is_selected_field(Val, [{payload, Val} | _Selected]) ->
true;
is_selected_field(Val, [{payload, Nested} | _Selected]) when is_list(Nested) ->
NestedFields = join_str(Nested, <<".">>),
Val =:= <<"payload.", NestedFields/binary>>;
is_selected_field(Val, [{var, Val} | _Selected]) ->
true;
is_selected_field(Val, [{var, Nested} | _Selected]) when is_list(Nested) ->
Val =:= join_str(Nested, <<".">>);
is_selected_field(Val, [_ | Selected]) ->
is_selected_field(Val, Selected).

transform_alias(Alias) ->
parse_nested(unquote(Alias)).
@@ -109,5 +145,14 @@ parse_nested(Attr) ->
end.

unquote(Topic) ->
string:trim(Topic, both, "\"").
string:trim(Topic, both, "\"'").

join_str([], _Delim) ->
<<>>;
join_str([F | Fields], Delim) ->
join_str(Fields, Delim, F).

join_str([], _Delim, Joined) ->
Joined;
join_str([F | Fields], Delim, Joined) ->
join_str(Fields, Delim, <<Joined/binary, Delim/binary, F/binary>>).

0 comments on commit c9c7616

Please sign in to comment.
You can’t perform that action at this time.