Skip to content
Permalink
Browse files

Fix column validation failure in select and foreach (#120)

  • Loading branch information
terry-xiaoyu committed Nov 14, 2019
1 parent 0cb68c0 commit 6a1267cb1530d00972899ecb3abb7a3220e28175
Showing with 101 additions and 11 deletions.
  1. +23 −10 src/emqx_rule_sqlparser.erl
  2. +78 −1 test/emqx_rule_engine_SUITE.erl
@@ -105,22 +105,32 @@ select_where(#select{where = Where}) ->

preprocess(#select{fields = Fields, is_foreach = IsForeach, doeach = DoEach, incase = InCase, from = Hooks, where = Conditions}) ->
Froms = [hook(unquote(H)) || H <- Hooks],
FixedColumns = fixed_columns(Froms),
Selected = [preprocess_field(Field, FixedColumns) || Field <- Fields],
FullColumns = as_columns(Selected) ++ FixedColumns,
{SelectedFileds, KnownColmuns1} = preprocess_columns(Fields, fixed_columns(Froms)),
{SelectedEach, KnownColmuns2} = preprocess_columns(DoEach, KnownColmuns1),
#select{is_foreach = IsForeach,
fields = Selected,
doeach = [preprocess_field(Each, FullColumns) || Each <- DoEach],
incase = preprocess_condition(InCase, FullColumns),
fields = SelectedFileds,
doeach = SelectedEach,
incase = preprocess_condition(InCase, KnownColmuns2),
from = Froms,
where = preprocess_condition(Conditions, FullColumns)}.
where = preprocess_condition(Conditions, KnownColmuns2)}.

preprocess_columns(Fields, KnownColumns) ->
lists:foldl(
fun(Field, {Slct0, KnwnClmn0}) ->
case preprocess_field(Field, KnwnClmn0) of
{Slct1, no_as_column} ->
{Slct0 ++ [Slct1], KnwnClmn0};
{Slct1, AsColumn} ->
{Slct0 ++ [Slct1], KnwnClmn0 ++ [AsColumn]}
end
end, {[], KnownColumns}, Fields).

preprocess_field(<<"*">>, _Columns) ->
'*';
{'*', no_as_column};
preprocess_field({'as', Field, Alias}, Columns) when is_binary(Alias) ->
{'as', transform_select_field(Field, Columns), transform_alias(Alias)};
{{'as', transform_select_field(Field, Columns), transform_alias(Alias)}, head(transform_alias(Alias))};
preprocess_field(Field, Columns) ->
transform_select_field(Field, Columns).
{transform_select_field(Field, Columns), no_as_column}.

preprocess_condition({Op, L, R}, Columns) when ?is_logical(Op) orelse ?is_comp(Op) ->
{Op, preprocess_condition(L, Columns), preprocess_condition(R, Columns)};
@@ -225,6 +235,9 @@ escape(Var) -> Var.
unquote(Topic) ->
string:trim(Topic, both, "\"'").

head([H | _]) -> H;
head(Var) -> Var.

hook(<<"client.connected">>) ->
'client.connected';
hook(<<"client.disconnected">>) ->
@@ -85,6 +85,7 @@ groups() ->
[t_events,
t_sqlselect_0,
t_sqlselect_01,
t_sqlselect_02,
t_sqlselect_1,
t_sqlselect_2,
t_sqlselect_3,
@@ -94,6 +95,7 @@ groups() ->
t_sqlparse_foreach_4,
t_sqlparse_foreach_5,
t_sqlparse_foreach_6,
t_sqlparse_foreach_7,
t_sqlparse_case_when_1,
t_sqlparse_case_when_2,
t_sqlparse_case_when_3
@@ -647,6 +649,55 @@ client_disconnected(Client) ->
ok.

t_sqlselect_0(_Config) ->
%% Verify SELECT with and without 'AS'
Sql = "select * "
"from \"message.publish\" "
"where topic =~ 't/#' and payload.cmd.info = 'tt'",
?assertMatch({ok,#{payload := <<"{\"cmd\": {\"info\":\"tt\"}}">>,
event := 'message.publish'}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> =>
#{<<"payload">> =>
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
<<"topic">> => <<"t/a">>}})),
Sql2 = "select payload.cmd as cmd, event "
"from \"message.publish\" "
"where topic =~ 't/#' and cmd.info = 'tt'",
?assertMatch({ok,#{cmd := #{<<"info">> := <<"tt">>}, event := 'message.publish'}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql2,
<<"ctx">> =>
#{<<"payload">> =>
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
<<"topic">> => <<"t/a">>}})),
Sql3 = "select payload.cmd as cmd, cmd.info as info, event "
"from \"message.publish\" "
"where topic =~ 't/#' and cmd.info = 'tt' and info = 'tt'",
?assertMatch({ok,#{cmd := #{<<"info">> := <<"tt">>},
info := <<"tt">>,
event := 'message.publish'}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql3,
<<"ctx">> =>
#{<<"payload">> =>
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
<<"topic">> => <<"t/a">>}})),
%% cascaded as
Sql4 = "select payload.cmd as cmd, cmd.info as meta.info, event "
"from \"message.publish\" "
"where topic =~ 't/#' and cmd.info = 'tt' and meta.info = 'tt'",
?assertMatch({ok,#{cmd := #{<<"info">> := <<"tt">>},
meta := #{info := <<"tt">>},
event := 'message.publish'}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql4,
<<"ctx">> =>
#{<<"payload">> =>
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
<<"topic">> => <<"t/a">>}})).

t_sqlselect_01(_Config) ->
ok = emqx_rule_engine:load_providers(),
TopicRule = create_simple_repub_rule(
<<"t2">>,
@@ -683,7 +734,7 @@ t_sqlselect_0(_Config) ->
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).

t_sqlselect_01(_Config) ->
t_sqlselect_02(_Config) ->
ok = emqx_rule_engine:load_providers(),
TopicRule = create_simple_repub_rule(
<<"t2">>,
@@ -973,6 +1024,32 @@ t_sqlparse_foreach_6(_Config) ->
?assert(Zid1 == 5 orelse Zid1 == 15),
?assert(Zid2 == 5 orelse Zid2 == 15).

t_sqlparse_foreach_7(_Config) ->
%% Verify foreach-do-incase and cascaded AS
Sql = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
"do info.cmd as msg_type, info.name as name "
"incase is_not_null(info.cmd) "
"from \"message.publish\" "
"where topic =~ 't/#' and s.page = '2' ",
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": {\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }">>,
?assertMatch({ok,[#{name := <<"cmd1">>, msg_type := <<"1">>}, #{msg_type := <<"2">>}]},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> =>
#{<<"payload">> => Payload,
<<"topic">> => <<"t/a">>}})),
Sql2 = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
"do info.cmd as msg_type, info.name as name "
"incase is_not_null(info.cmd) "
"from \"message.publish\" "
"where topic =~ 't/#' and s.page = '3' ",
?assertMatch({error, nomatch},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql2,
<<"ctx">> =>
#{<<"payload">> => Payload,
<<"topic">> => <<"t/a">>}})).

t_sqlparse_case_when_1(_Config) ->
%% case-when-else clause
Sql = "select "

0 comments on commit 6a1267c

Please sign in to comment.
You can’t perform that action at this time.