From c0f03a42cdbd6ad3acdee9bcb5ce37e356a1a071 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 25 Dec 2023 14:57:53 +0800 Subject: [PATCH] [Feature] support create or replace pipe (backport #37658) (#37698) Co-authored-by: Murphy <96611012+mofeiatwork@users.noreply.github.com> --- .../com/starrocks/load/pipe/PipeManager.java | 24 ++++++++++++------- .../sql/ast/pipe/CreatePipeStmt.java | 9 ++++++- .../com/starrocks/sql/parser/AstBuilder.java | 10 ++++++-- .../com/starrocks/sql/parser/StarRocks.g4 | 9 ++++++- .../starrocks/load/pipe/PipeManagerTest.java | 20 ++++++++++++++-- 5 files changed, 58 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeManager.java b/fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeManager.java index 2113ebc4c7365..c3b5359761e50 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeManager.java @@ -65,10 +65,16 @@ public void createPipe(CreatePipeStmt stmt) throws DdlException { Pair dbIdAndName = resolvePipeNameUnlock(stmt.getPipeName()); boolean existed = nameToId.containsKey(dbIdAndName); if (existed) { - if (!stmt.isIfNotExists()) { + if (!stmt.isIfNotExists() && !stmt.isReplace()) { ErrorReport.reportSemanticException(ErrorCode.ERR_PIPE_EXISTS); } - return; + if (stmt.isIfNotExists()) { + return; + } else if (stmt.isReplace()) { + LOG.info("Pipe {} already exist, replace it with a new one", stmt.getPipeName()); + Pipe pipe = pipeMap.get(nameToId.get(dbIdAndName)); + dropPipeImpl(pipe); + } } // Add pipe @@ -97,12 +103,7 @@ public void dropPipe(DropPipeStmt stmt) throws DdlException { } pipe = pipeMap.get(nameToId.get(dbAndName)); - pipe.suspend(); - pipe.destroy(); - removePipe(pipe); - - // persistence - repo.deletePipe(pipe); + dropPipeImpl(pipe); } catch (Throwable e) { LOG.error("drop pipe {} failed", pipe, e); throw e; @@ -111,6 +112,13 @@ public void dropPipe(DropPipeStmt stmt) throws DdlException { } } + private void dropPipeImpl(Pipe pipe) { + pipe.suspend(); + pipe.destroy(); + removePipe(pipe); + repo.deletePipe(pipe); + } + public void dropPipesOfDb(String dbName, long dbId) { try { lock.writeLock().lock(); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/pipe/CreatePipeStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/pipe/CreatePipeStmt.java index 99243626af161..8306a57923c9b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/pipe/CreatePipeStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/pipe/CreatePipeStmt.java @@ -25,6 +25,7 @@ public class CreatePipeStmt extends DdlStmt { + private final boolean orReplace; private final boolean ifNotExists; private final PipeName pipeName; private final int insertSqlStartIndex; @@ -34,9 +35,11 @@ public class CreatePipeStmt extends DdlStmt { private TableName targetTable; private FilePipeSource pipeSource; - public CreatePipeStmt(boolean ifNotExists, PipeName pipeName, int insertSqlStartIndex, InsertStmt insertStmt, + public CreatePipeStmt(boolean ifNotExists, boolean orReplace, + PipeName pipeName, int insertSqlStartIndex, InsertStmt insertStmt, Map properties, NodePosition pos) { super(pos); + this.orReplace = orReplace; this.ifNotExists = ifNotExists; this.pipeName = pipeName; this.insertSqlStartIndex = insertSqlStartIndex; @@ -44,6 +47,10 @@ public CreatePipeStmt(boolean ifNotExists, PipeName pipeName, int insertSqlStart this.properties = properties; } + public boolean isReplace() { + return orReplace; + } + public boolean isIfNotExists() { return ifNotExists; } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java index b401a80be5d50..e02b4fe241a68 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java @@ -3801,7 +3801,12 @@ private PipeName resolvePipeName(StarRocksParser.QualifiedNameContext context) { @Override public ParseNode visitCreatePipeStatement(StarRocksParser.CreatePipeStatementContext context) { PipeName pipeName = resolvePipeName(context.qualifiedName()); - boolean ifNotExists = context.IF() != null; + boolean ifNotExists = context.ifNotExists() != null && context.ifNotExists().IF() != null; + boolean replace = context.orReplace() != null && context.orReplace().OR() != null; + + if (ifNotExists && replace) { + throw new ParsingException(PARSER_ERROR_MSG.conflictedOptions("OR REPLACE", "IF NOT EXISTS")); + } ParseNode insertNode = visit(context.insertStatement()); if (!(insertNode instanceof InsertStmt)) { throw new ParsingException(PARSER_ERROR_MSG.unsupportedStatement(insertNode.toSql()), @@ -3817,7 +3822,8 @@ public ParseNode visitCreatePipeStatement(StarRocksParser.CreatePipeStatementCon InsertStmt insertStmt = (InsertStmt) insertNode; int insertSqlIndex = context.insertStatement().start.getStartIndex(); - return new CreatePipeStmt(ifNotExists, pipeName, insertSqlIndex, insertStmt, properties, createPos(context)); + return new CreatePipeStmt(ifNotExists, replace, pipeName, insertSqlIndex, insertStmt, properties, + createPos(context)); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 index 62212778d8e86..27fa4300a6c00 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 @@ -458,6 +458,13 @@ withRowAccessPolicy : WITH ROW ACCESS POLICY policyName=qualifiedName (ON identifierList)? ; +orReplace: + (OR REPLACE)? + ; +ifNotExists: + (IF NOT EXISTS)? + ; + createTemporaryTableStatement : CREATE TEMPORARY TABLE qualifiedName queryStatement @@ -1785,7 +1792,7 @@ showSmallFilesStatement // -------------------------------------------- Pipe Statement --------------------------------------------------------- createPipeStatement - : CREATE PIPE (IF NOT EXISTS)? qualifiedName + : CREATE orReplace PIPE ifNotExists qualifiedName properties? AS insertStatement ; diff --git a/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java index 799ca129702ac..804ac9223f698 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java @@ -620,11 +620,27 @@ public void pipeCRUD() throws Exception { // create if not exists CreatePipeStmt createAgain = createStmt; Assert.assertThrows(SemanticException.class, () -> pm.createPipe(createAgain)); - sql = - "create pipe if not exists p_crud as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')"; + sql = "create pipe if not exists p_crud as insert into tbl1 " + + "select * from files('path'='fake://pipe', 'format'='parquet')"; createStmt = (CreatePipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx); pm.createPipe(createStmt); + // create or replace + String createOrReplaceSql = "create or replace pipe p_crud as insert into tbl1 " + + "select * from files('path'='fake://pipe', 'format'='parquet')"; + CreatePipeStmt createOrReplace = (CreatePipeStmt) UtFrameUtils.parseStmtWithNewParser(createOrReplaceSql, ctx); + long previousId = getPipe("p_crud").getId(); + pm.createPipe(createOrReplace); + Assert.assertNotEquals(previousId, getPipe("p_crud").getId()); + pipe = pm.mayGetPipe(name).get(); + + // create or replace when not exists + previousId = pipe.getId(); + dropPipe(name.getPipeName()); + pm.createPipe(createOrReplace); + pipe = pm.mayGetPipe(name).get(); + Assert.assertNotEquals(previousId, pipe.getId()); + // pause sql = "alter pipe p_crud suspend"; AlterPipeStmt pauseStmt = (AlterPipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx);