Skip to content

Commit

Permalink
hstream-sql: modify grammar for time window (#1329)
Browse files Browse the repository at this point in the history
  • Loading branch information
Time-Hu committed Mar 28, 2023
1 parent f0c4152 commit bc31f85
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
14 changes: 9 additions & 5 deletions hstream-sql/etc/SQL-v1.cf
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,17 @@ SelectItemWildcard. SelectItem ::= "*" ;

-- From
DFrom. From ::= "FROM" TableRef ;

TableRefAs. TableRef ::= TableRef1 "AS" HIdent ;

TableRefCrossJoin. TableRef1 ::= TableRef1 JoinTypeWithoutCond "JOIN" TableRef2 "WITHIN" "(" Interval ")" ;
TableRefNaturalJoin. TableRef1 ::= TableRef1 "NATURAL" JoinTypeWithCond "JOIN" TableRef2 "WITHIN" "(" Interval ")" ;
TableRefJoinOn. TableRef1 ::= TableRef1 JoinTypeWithCond "JOIN" TableRef2 "ON" ValueExpr "WITHIN" "(" Interval ")" ;
TableRefJoinUsing. TableRef1 ::= TableRef1 JoinTypeWithCond "JOIN" TableRef2 "USING" "(" [ColName] ")" "WITHIN" "(" Interval ")" ;

TableRefTumbling. TableRef1 ::= "TUMBLE" "(" HIdent "," Interval ")" ;
TableRefHopping. TableRef1 ::= "HOP" "(" HIdent "," Interval "," Interval ")" ;
TableRefSession. TableRef1 ::= "SESSION" "(" HIdent "," Interval ")" ;

TableRefIdent. TableRef2 ::= HIdent ;
TableRefSubquery. TableRef2 ::= "(" Select ")" ;
coercions TableRef 2 ;
Expand All @@ -138,13 +142,13 @@ DWhere. Where ::= "WHERE" ValueExpr ;

-- GroupBy

DTumbling. TimeWindow ::= "TUMBLING" "(" Interval ")" ;
DHopping. TimeWindow ::= "HOPPING" "(" Interval "," Interval ")" ;
DSession. TimeWindow ::= "SESSION" "(" Interval ")" ;
-- DTumbling. TimeWindow ::= "TUMBLING" "(" Interval ")" ;
-- DHopping. TimeWindow ::= "HOPPING" "(" Interval "," Interval ")" ;
-- DSession. TimeWindow ::= "SESSION" "(" Interval ")" ;

DGroupByEmpty. GroupBy ::= "" ;
DGroupBy. GroupBy ::= "GROUP" "BY" [ColName] ;
DGroupByWin. GroupBy ::= "GROUP" "BY" [ColName] TimeWindow ;
-- DGroupByWin. GroupBy ::= "GROUP" "BY" [ColName] TimeWindow ;

-- Having
DHavingEmpty. Having ::= "" ;
Expand Down
28 changes: 20 additions & 8 deletions hstream-sql/src/HStream/SQL/AST.hs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ data RTableRef = RTableRefSimple StreamName (Maybe StreamName)
| RTableRefNaturalJoin RTableRef RJoinType RTableRef RInterval (Maybe StreamName)
| RTableRefJoinOn RTableRef RJoinType RTableRef RValueExpr RInterval (Maybe StreamName)
| RTableRefJoinUsing RTableRef RJoinType RTableRef [Text] RInterval (Maybe StreamName)
| RTableRefWindowed StreamName WindowType (Maybe StreamName)
#endif
deriving (Show, Eq)
setRTableRefAlias :: RTableRef -> StreamName -> RTableRef
Expand All @@ -703,6 +704,7 @@ setRTableRefAlias ref alias = case ref of
RTableRefNaturalJoin r1 typ r2 t _ -> RTableRefNaturalJoin r1 typ r2 t (Just alias)
RTableRefJoinOn r1 typ r2 e t _ -> RTableRefJoinOn r1 typ r2 e t (Just alias)
RTableRefJoinUsing r1 typ r2 cols t _ -> RTableRefJoinUsing r1 typ r2 cols t (Just alias)
RTableRefWindowed r win _ -> RTableRefWindowed r win (Just alias)
#endif

data RJoinType = InnerJoin | LeftJoin | RightJoin | FullJoin
Expand Down Expand Up @@ -750,6 +752,9 @@ instance Refine TableRef where
where extractStreamNameFromColName col = case col of
ColNameSimple _ colIdent -> refine colIdent
ColNameStream pos _ _ -> throwImpossible
refine (TableRefTumbling _ ref interval) = RTableRefWindowed (refine ref) (Tumbling (refine interval)) Nothing
refine (TableRefHopping _ ref len hop) = RTableRefWindowed (refine ref) (Hopping (refine len) (refine hop)) Nothing
refine (TableRefSession _ ref interval) = RTableRefWindowed (refine ref) (Session (refine interval)) Nothing
#endif

#ifdef HStreamUseV2Engine
Expand Down Expand Up @@ -785,11 +790,6 @@ instance Refine GroupBy where
L.map (\col -> let (RExprCol _ m_stream field) = refine col
in (m_stream, field)) cols
#else
type instance RefinedType TimeWindow = WindowType
instance Refine TimeWindow where
refine (DTumbling _ i) = Tumbling (refine i)
refine (DHopping _ i1 i2) = Hopping (refine i1) (refine i2)
refine (DSession _ i) = Session (refine i)

data RGroupBy = RGroupByEmpty
| RGroupBy [(Maybe StreamName, FieldName)] (Maybe WindowType)
Expand All @@ -801,9 +801,9 @@ instance Refine GroupBy where
(L.map (\col -> let (RExprCol _ m_stream field) = refine col
in (m_stream, field)) cols
) Nothing
refine (DGroupByWin pos cols win) =
let (RGroupBy tups Nothing) = refine (DGroupBy pos cols)
in RGroupBy tups (Just $ refine win)
-- refine (DGroupByWin pos cols win) =
-- let (RGroupBy tups Nothing) = refine (DGroupBy pos cols)
-- in RGroupBy tups (Just $ refine win)
#endif

---- Hav
Expand All @@ -816,11 +816,23 @@ instance Refine Having where
refine (DHaving _ expr) = RHaving (refine expr)

---- SELECT

data RSelect = RSelect RSel RFrom RWhere RGroupBy RHaving deriving (Show, Eq)
type instance RefinedType Select = RSelect
#ifdef HStreamUseV2Engine
instance Refine Select where
refine (DSelect _ sel frm whr grp hav) =
RSelect (refine sel) (refine frm) (refine whr) (refine grp) (refine hav)
#else
instance Refine Select where
refine (DSelect _ sel frm whr grp hav) =
case refine frm of
RFrom (RTableRefWindowed r win alias)->
let newFrm = RFrom (RTableRefSimple r alias) in
let newGrp = case refine grp of RGroupBy x _ -> RGroupBy x (Just win); x -> x in
RSelect (refine sel) newFrm (refine whr) newGrp (refine hav)
rfrm -> RSelect (refine sel) rfrm (refine whr) (refine grp) (refine hav)
#endif

---- EXPLAIN
type RExplain = RSelect
Expand Down
24 changes: 16 additions & 8 deletions hstream-sql/src/HStream/SQL/Internal/Validate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ instance Validate TableRef where
validate r@(TableRefSubquery _ select) = validate select >> return r
#else
instance Validate TableRef where
validate r@(TableRefTumbling _ ref interval) = validate ref >> validate interval >> return r
validate r@(TableRefHopping _ ref interval1 interval2) = validate ref >> validate interval1 >> validate interval2 >> return r
validate r@(TableRefSession _ ref interval) = validate ref >> validate interval >> return r
validate r@(TableRefAs _ ref hIdent) = validate ref >> validate hIdent >> return r
validate r@(TableRefCrossJoin _ ref1 _ ref2 i) = validate ref1 >> validate ref2 >> validate i >> return r
validate r@(TableRefNaturalJoin _ ref1 _ ref2 i) = validate ref1 >> validate ref2 >> validate i >> return r
Expand Down Expand Up @@ -668,28 +671,33 @@ instance Validate GroupBy where
(DGroupByEmpty _) -> return grp
(DGroupBy _ cols) -> mapM_ validate cols >> return grp
#else
instance Validate TimeWindow where
validate win = case win of
(DTumbling _ i) -> validate i >> return win
(DHopping _ i1 i2) -> validate i1 >> validate i2 >> return win
(DSession _ i) -> validate i >> return win

instance Validate GroupBy where
validate grp = case grp of
(DGroupByEmpty _) -> return grp
(DGroupBy _ cols) -> mapM_ validate cols >> return grp
(DGroupByWin _ cols win) -> mapM_ validate cols >> validate win >> return grp
#endif

-- Having
-- 1. ValueExpr in it should be legal
instance Validate Having where
validate hav@(DHavingEmpty _) = Right hav
validate hav@(DHaving _ expr) = validate expr >> return hav

---- Select

instance Validate Select where
validate select@(DSelect _ sel@(DSel selPos selList) frm@(DFrom _ refs) whr grp hav) = do
#ifndef HStreamUseV2Engine
case grp of
DGroupByEmpty pos -> case refs of
TableRefTumbling {} -> Left $ buildSQLException ParseException pos
"Time window function `TUMBLE` requires a `GROUP BY` CLAUSE"
TableRefHopping {} -> Left $ buildSQLException ParseException pos
"Time window function `HOP` requires a `GROUP BY` CLAUSE"
TableRefSession {} -> Left $ buildSQLException ParseException pos
"Time window function `SESSION` requires a `GROUP BY` CLAUSE"
_ -> pure ()
_ -> pure ()
#endif
void $ validate sel
void $ validate frm
void $ validate whr
Expand Down

0 comments on commit bc31f85

Please sign in to comment.