Skip to content
Permalink
Browse files

Add CASE/WHEN

  • Loading branch information
terry-xiaoyu committed Nov 5, 2019
1 parent a962e36 commit 40e68e9607198613cc93d001488d40b2bfb4f23e
Showing with 101 additions and 20 deletions.
  1. +36 −1 src/emqx_rule_runtime.erl
  2. +32 −15 src/emqx_rule_sqlparser.erl
  3. +1 −0 src/emqx_rule_sqltester.erl
  4. +32 −4 test/emqx_rule_engine_SUITE.erl
@@ -36,6 +36,7 @@

-export([ apply_rule/2
, columns/1
, clear_rule_payload/0
]).

-import(emqx_rule_maps,
@@ -123,7 +124,7 @@ rules_for(Hook) ->

-spec(apply_rules(list(emqx_rule_engine:rule()), map()) -> ok).
apply_rules([], _Input) ->
erlang:erase(rule_payload),
clear_rule_payload(),
ok;
apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules(More, Input);
@@ -181,6 +182,9 @@ apply_rule(#rule{id = RuleId,
{error, nomatch}
end.

clear_rule_payload() ->
erlang:erase(rule_payload).

%% Step1 -> Select and transform data
select_and_transform(Fields, Input) ->
select_and_transform(Fields, Input, #{}).
@@ -317,6 +321,10 @@ eval({const, Val}, _Input) ->
Val;
eval({Op, L, R}, Input) when ?is_arith(Op) ->
apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
eval({'case', undefined, CaseClauses, ElseClauses}, Input) ->
eval_case_clauses(CaseClauses, ElseClauses, Input);
eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
eval({'fun', Name, Args}, Input) ->
apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).

@@ -332,6 +340,33 @@ alias({const, Val}) ->
Val;
alias(_) -> undefined.

eval_case_clauses([], ElseClauses, Input) ->
case ElseClauses of
undefined -> undefined;
_ -> eval(ElseClauses, Input)
end;
eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
case match_conditions(Cond, Input) of
true ->
eval(Clause, Input);
_ ->
eval_case_clauses(CaseClauses, ElseClauses, Input)
end.

eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
case ElseClauses of
undefined -> undefined;
_ -> eval(ElseClauses, Input)
end;
eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
ConResult = eval(Cond, Input),
case eval(CaseOn, Input) of
ConResult ->
eval(Clause, Input);
_ ->
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
end.

apply_func(Name, Args, Input) when is_atom(Name) ->
case erlang:apply(emqx_rule_funcs, Name, Args) of
Func when is_function(Func) ->
@@ -104,22 +104,23 @@ select_where(#select{where = Where}) ->
Where.

preprocess(#select{fields = Fields, is_foreach = IsForeach, doeach = DoEach, incase = InCase, from = Hooks, where = Conditions}) ->
Selected = [preprocess_field(Field) || Field <- Fields],
Froms = [hook(unquote(H)) || H <- Hooks],
FullColumns = as_columns(Selected) ++ fixed_columns(Froms),
FixedColumns = fixed_columns(Froms),
Selected = [preprocess_field(Field, FixedColumns) || Field <- Fields],
FullColumns = as_columns(Selected) ++ FixedColumns,
#select{is_foreach = IsForeach,
fields = Selected,
doeach = [preprocess_field(Each) || Each <- DoEach],
doeach = [preprocess_field(Each, FullColumns) || Each <- DoEach],
incase = preprocess_condition(InCase, FullColumns),
from = Froms,
where = preprocess_condition(Conditions, FullColumns)}.

preprocess_field(<<"*">>) ->
preprocess_field(<<"*">>, _Columns) ->
'*';
preprocess_field({'as', Field, Alias}) when is_binary(Alias) ->
{'as', transform_select_field(Field), transform_alias(Alias)};
preprocess_field(Field) ->
transform_select_field(Field).
preprocess_field({'as', Field, Alias}, Columns) when is_binary(Alias) ->
{'as', transform_select_field(Field, Columns), transform_alias(Alias)};
preprocess_field(Field, Columns) ->
transform_select_field(Field, Columns).

preprocess_condition({Op, L, R}, Columns) when ?is_logical(Op) orelse ?is_comp(Op) ->
{Op, preprocess_condition(L, Columns), preprocess_condition(R, Columns)};
@@ -151,15 +152,31 @@ do_transform_field({'fun', Name, Args}, Columns) when is_binary(Name) ->
Fun = list_to_existing_atom(binary_to_list(Name)),
{'fun', Fun, [transform_field(Arg, Columns) || Arg <- Args]}.

transform_select_field({const, Val}) ->
transform_select_field({const, Val}, _Columns) ->
{const, Val};
transform_select_field({Op, Arg1, Arg2}) when ?is_arith(Op) ->
{Op, transform_select_field(Arg1), transform_select_field(Arg2)};
transform_select_field(Var) when is_binary(Var) ->
{var, escape(parse_nested(Var))};
transform_select_field({'fun', Name, Args}) when is_binary(Name) ->
transform_select_field({Op, Arg1, Arg2}, Columns) when ?is_arith(Op) ->
{Op, transform_select_field(Arg1, Columns), transform_select_field(Arg2, Columns)};
transform_select_field(Var, Columns) when is_binary(Var) ->
{var, validate_var(escape(parse_nested(Var)), Columns)};
transform_select_field({'case', CaseOn, CaseClauses, ElseClause}, Columns) ->
{'case', transform_caseon(CaseOn, Columns),
transform_case_clause(CaseClauses, Columns),
transform_caseelse(ElseClause, Columns)};
transform_select_field({'fun', Name, Args}, Columns) when is_binary(Name) ->
Fun = list_to_existing_atom(binary_to_list(Name)),
{'fun', Fun, [transform_select_field(Arg) || Arg <- Args]}.
{'fun', Fun, [transform_select_field(Arg, Columns) || Arg <- Args]}.

transform_caseon(<<>>, _Columns) -> undefined;
transform_caseon(CaseOn, Columns) ->
transform_select_field(CaseOn, Columns).

transform_case_clause(CaseClauses, Columns) ->
[{preprocess_condition(Cond, Columns), transform_select_field(Return, Columns)}
|| {Cond, Return} <- CaseClauses].

transform_caseelse({}, _Columns) -> undefined;
transform_caseelse(ElseClause, Columns) ->
transform_select_field(ElseClause, Columns).

validate_var(Var, SupportedColumns) ->
case {Var, lists:member(Var, SupportedColumns)} of
@@ -43,6 +43,7 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
#action_instance_params{id = ActInstId,
params = #{},
apply = sql_test_action()}),
emqx_rule_runtime:clear_rule_payload(),
emqx_rule_runtime:apply_rule(Rule, FullContext)
of
{ok, Data} -> {ok, flatten(Data)};
@@ -88,7 +88,8 @@ groups() ->
t_sqlselect_1,
t_sqlselect_2,
t_sqlselect_3,
t_sqlparse_foreach
t_sqlparse_foreach,
t_sqlparse_case_when
]}
].

@@ -807,15 +808,42 @@ t_sqlparse_foreach(_Config) ->
"incase is_not_null(s.cmd) "
"from \"message.publish\" "
"where topic =~ 't/#'",
{ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
ct:log("======Select: ~p", [Select]),
Res = emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> =>
#{<<"payload">> =>
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
<<"topic">> => <<"t/a">>}}),
ct:log("======Result: ~p", [Res]),
?assertMatch({ok,[#{msg_type := <<"1">>},#{msg_type := <<"2">>}]}, Res).

t_sqlparse_case_when(_Config) ->
Sql = "select "
" case when payload.x < 0 then 0 "
" when payload.x > 7 then 7 "
" else payload.x "
" end as y "
"from \"message.publish\" "
"where topic =~ 't/#'",
?assertMatch({ok, #{y := 1}}, emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 1}">>,
<<"topic">> => <<"t/a">>}})),
?assertMatch({ok, #{y := 0}}, emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 0}">>,
<<"topic">> => <<"t/a">>}})),
?assertMatch({ok, #{y := 0}}, emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"payload">> => <<"{\"x\": -1}">>,
<<"topic">> => <<"t/a">>}})),
?assertMatch({ok, #{y := 7}}, emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 7}">>,
<<"topic">> => <<"t/a">>}})),
?assertMatch({ok, #{y := 7}}, emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 8}">>,
<<"topic">> => <<"t/a">>}})),
ok.

%%------------------------------------------------------------------------------

0 comments on commit 40e68e9

Please sign in to comment.
You can’t perform that action at this time.