Skip to content

Commit

Permalink
hstream: fix incorrect metadata of querys with namespace (#1744)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Jan 18, 2024
1 parent 05db1f0 commit 43b96cb
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
30 changes: 15 additions & 15 deletions hstream/src/HStream/Server/Core/Query.hs
Expand Up @@ -175,10 +175,11 @@ createQueryWithNamespace'
_ -> throw $ HE.WrongExecutionPlan "Create query only support select / create stream as select statements"
#else
parseAndRefine createQueryRequestSql >>= \rSQL -> case rSQL of
RQCreate (RCreateAs stream select rOptions) ->
hstreamCodegen (RQCreate (RCreateAs (namespace <> stream)
(modifySelect namespace select)
rOptions)) >>= \case
RQCreate (RCreateAs stream select rOptions) -> do
let rSQLWithNamespace = RQCreate (RCreateAs (namespace <> stream)
(modifySelect namespace select)
rOptions)
hstreamCodegen rSQLWithNamespace >>= \case
CreateBySelectPlan sources sink builder factor persist -> do
-- validate names
mapM_ (validateNameAndThrow ResStream) sources
Expand Down Expand Up @@ -206,7 +207,7 @@ createQueryWithNamespace'
}
-- update metadata
let relatedStreams = (sources, sink)
qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams rSQL serverID metaHandle
qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams rSQLWithNamespace serverID metaHandle
-- run core task
consumerClosed <- newTVarIO False
createQueryAndRun sc QueryRunner {
Expand All @@ -221,10 +222,9 @@ createQueryWithNamespace'
_ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view <name> as select statements"

RQInsert (RInsertSel streamName rSel) -> do
(hstreamCodegen $ RQInsert (RInsertSel
(namespace <> streamName)
(modifySelect namespace rSel))
) >>= \case
let rSQLWithNamespace = RQInsert (RInsertSel (namespace <> streamName)
(modifySelect namespace rSel))
hstreamCodegen rSQLWithNamespace >>= \case
InsertBySelectPlan srcs sink builder persist -> do
-- validate names
traverse_ (validateNameAndThrow ResStream) (sink : srcs)
Expand All @@ -250,7 +250,7 @@ createQueryWithNamespace'
-- update metadata
qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql
(srcs, sink)
rSQL serverID metaHandle
rSQLWithNamespace serverID metaHandle
-- run core task
consumerClosed <- newTVarIO False
createQueryAndRun sc QueryRunner
Expand All @@ -264,13 +264,13 @@ createQueryWithNamespace'
hstreamQueryToQuery metaHandle qInfo
_ -> throw $ HE.WrongExecutionPlan "Insert by Select query only supports `INSERT INTO <stream_name> SELECT FROM STREAM`"

RQCreate (RCreateView view select) ->
hstreamCodegen (RQCreate (RCreateView (namespace <> view)
(modifySelect namespace select)
)) >>= \case
RQCreate (RCreateView view select) -> do
let rSQLWithNamespace = RQCreate (RCreateView (namespace <> view)
(modifySelect namespace select))
hstreamCodegen rSQLWithNamespace >>= \case
CreateViewPlan sources sink view builder persist -> do
validateNameAndThrow ResView view
Core.createView' sc view sources sink builder persist createQueryRequestSql rSQL createQueryRequestQueryName
Core.createView' sc view sources sink builder persist createQueryRequestSql rSQLWithNamespace createQueryRequestQueryName
>>= hstreamViewToQuery metaHandle
_ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view <name> as select statements"
_ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view <name> as select statements"
Expand Down
21 changes: 11 additions & 10 deletions hstream/src/HStream/Server/Core/QueryNew.hs
Expand Up @@ -174,10 +174,11 @@ createQueryWithNamespace'
_ -> throw $ HE.WrongExecutionPlan "Create query only support select / create stream as select statements"
#else
parseAndBind createQueryRequestSql (P.getSchema metaHandle) >>= \bSQL -> case bSQL of
BoundQCreate (BoundCreateAs stream select rOptions) ->
hstreamCodegen (BoundQCreate (BoundCreateAs (namespace <> stream)
(modifySelect namespace select)
rOptions)) (P.getSchema metaHandle) >>= \case
BoundQCreate (BoundCreateAs stream select rOptions) -> do
let bSQLWithNamespace = BoundQCreate (BoundCreateAs (namespace <> stream)
(modifySelect namespace select)
rOptions)
hstreamCodegen bSQLWithNamespace (P.getSchema metaHandle) >>= \case
CreateBySelectPlan sources sink schema builder factor persist -> do
-- validate names
mapM_ (validateNameAndThrow ResStream) sources
Expand Down Expand Up @@ -206,7 +207,7 @@ createQueryWithNamespace'
P.registerSchema metaHandle schema
-- update metadata
let relatedStreams = (sources, sink)
qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams bSQL serverID metaHandle
qInfo <- P.createInsertQueryInfo createQueryRequestQueryName createQueryRequestSql relatedStreams bSQLWithNamespace serverID metaHandle
-- run core task
consumerClosed <- newTVarIO False
createQueryAndRun sc QueryRunner {
Expand Down Expand Up @@ -264,13 +265,13 @@ createQueryWithNamespace'
hstreamQueryToQuery metaHandle qInfo
_ -> throw $ HE.WrongExecutionPlan "Insert by Select query only supports `INSERT INTO <stream_name> SELECT FROM STREAM`"
-}
BoundQCreate (BoundCreateView view select) ->
hstreamCodegen (BoundQCreate (BoundCreateView (namespace <> view)
(modifySelect namespace select)
)) (P.getSchema metaHandle) >>= \case
BoundQCreate (BoundCreateView view select) -> do
let bSQLWithNamespace = BoundQCreate (BoundCreateView (namespace <> view)
(modifySelect namespace select))
hstreamCodegen bSQLWithNamespace (P.getSchema metaHandle) >>= \case
CreateViewPlan sources sink view schema builder persist -> do
validateNameAndThrow ResView view
Core.createView' sc view sources sink schema builder persist createQueryRequestSql bSQL createQueryRequestQueryName
Core.createView' sc view sources sink schema builder persist createQueryRequestSql bSQLWithNamespace createQueryRequestQueryName
>>= hstreamViewToQuery metaHandle
_ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view <name> as select statements"
_ -> throw $ HE.WrongExecutionPlan "Create query only support create stream/view <name> as select statements"
Expand Down

0 comments on commit 43b96cb

Please sign in to comment.