Skip to content

Commit

Permalink
[Feature] support create or replace pipe (backport #37658) (#37698)
Browse files Browse the repository at this point in the history
Co-authored-by: Murphy <96611012+mofeiatwork@users.noreply.github.com>
  • Loading branch information
mergify[bot] and mofeiatwork committed Dec 25, 2023
1 parent d595471 commit c0f03a4
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 14 deletions.
24 changes: 16 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeManager.java
Expand Up @@ -65,10 +65,16 @@ public void createPipe(CreatePipeStmt stmt) throws DdlException {
Pair<Long, String> dbIdAndName = resolvePipeNameUnlock(stmt.getPipeName());
boolean existed = nameToId.containsKey(dbIdAndName);
if (existed) {
if (!stmt.isIfNotExists()) {
if (!stmt.isIfNotExists() && !stmt.isReplace()) {
ErrorReport.reportSemanticException(ErrorCode.ERR_PIPE_EXISTS);
}
return;
if (stmt.isIfNotExists()) {
return;
} else if (stmt.isReplace()) {
LOG.info("Pipe {} already exist, replace it with a new one", stmt.getPipeName());
Pipe pipe = pipeMap.get(nameToId.get(dbIdAndName));
dropPipeImpl(pipe);
}
}

// Add pipe
Expand Down Expand Up @@ -97,12 +103,7 @@ public void dropPipe(DropPipeStmt stmt) throws DdlException {
}
pipe = pipeMap.get(nameToId.get(dbAndName));

pipe.suspend();
pipe.destroy();
removePipe(pipe);

// persistence
repo.deletePipe(pipe);
dropPipeImpl(pipe);
} catch (Throwable e) {
LOG.error("drop pipe {} failed", pipe, e);
throw e;
Expand All @@ -111,6 +112,13 @@ public void dropPipe(DropPipeStmt stmt) throws DdlException {
}
}

private void dropPipeImpl(Pipe pipe) {
pipe.suspend();
pipe.destroy();
removePipe(pipe);
repo.deletePipe(pipe);
}

public void dropPipesOfDb(String dbName, long dbId) {
try {
lock.writeLock().lock();
Expand Down
Expand Up @@ -25,6 +25,7 @@

public class CreatePipeStmt extends DdlStmt {

private final boolean orReplace;
private final boolean ifNotExists;
private final PipeName pipeName;
private final int insertSqlStartIndex;
Expand All @@ -34,16 +35,22 @@ public class CreatePipeStmt extends DdlStmt {
private TableName targetTable;
private FilePipeSource pipeSource;

public CreatePipeStmt(boolean ifNotExists, PipeName pipeName, int insertSqlStartIndex, InsertStmt insertStmt,
public CreatePipeStmt(boolean ifNotExists, boolean orReplace,
PipeName pipeName, int insertSqlStartIndex, InsertStmt insertStmt,
Map<String, String> properties, NodePosition pos) {
super(pos);
this.orReplace = orReplace;
this.ifNotExists = ifNotExists;
this.pipeName = pipeName;
this.insertSqlStartIndex = insertSqlStartIndex;
this.insertStmt = insertStmt;
this.properties = properties;
}

public boolean isReplace() {
return orReplace;
}

public boolean isIfNotExists() {
return ifNotExists;
}
Expand Down
Expand Up @@ -3801,7 +3801,12 @@ private PipeName resolvePipeName(StarRocksParser.QualifiedNameContext context) {
@Override
public ParseNode visitCreatePipeStatement(StarRocksParser.CreatePipeStatementContext context) {
PipeName pipeName = resolvePipeName(context.qualifiedName());
boolean ifNotExists = context.IF() != null;
boolean ifNotExists = context.ifNotExists() != null && context.ifNotExists().IF() != null;
boolean replace = context.orReplace() != null && context.orReplace().OR() != null;

if (ifNotExists && replace) {
throw new ParsingException(PARSER_ERROR_MSG.conflictedOptions("OR REPLACE", "IF NOT EXISTS"));
}
ParseNode insertNode = visit(context.insertStatement());
if (!(insertNode instanceof InsertStmt)) {
throw new ParsingException(PARSER_ERROR_MSG.unsupportedStatement(insertNode.toSql()),
Expand All @@ -3817,7 +3822,8 @@ public ParseNode visitCreatePipeStatement(StarRocksParser.CreatePipeStatementCon
InsertStmt insertStmt = (InsertStmt) insertNode;
int insertSqlIndex = context.insertStatement().start.getStartIndex();

return new CreatePipeStmt(ifNotExists, pipeName, insertSqlIndex, insertStmt, properties, createPos(context));
return new CreatePipeStmt(ifNotExists, replace, pipeName, insertSqlIndex, insertStmt, properties,
createPos(context));
}

@Override
Expand Down
Expand Up @@ -458,6 +458,13 @@ withRowAccessPolicy
: WITH ROW ACCESS POLICY policyName=qualifiedName (ON identifierList)?
;

orReplace:
(OR REPLACE)?
;
ifNotExists:
(IF NOT EXISTS)?
;

createTemporaryTableStatement
: CREATE TEMPORARY TABLE qualifiedName
queryStatement
Expand Down Expand Up @@ -1785,7 +1792,7 @@ showSmallFilesStatement
// -------------------------------------------- Pipe Statement ---------------------------------------------------------

createPipeStatement
: CREATE PIPE (IF NOT EXISTS)? qualifiedName
: CREATE orReplace PIPE ifNotExists qualifiedName
properties?
AS insertStatement
;
Expand Down
Expand Up @@ -620,11 +620,27 @@ public void pipeCRUD() throws Exception {
// create if not exists
CreatePipeStmt createAgain = createStmt;
Assert.assertThrows(SemanticException.class, () -> pm.createPipe(createAgain));
sql =
"create pipe if not exists p_crud as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')";
sql = "create pipe if not exists p_crud as insert into tbl1 " +
"select * from files('path'='fake://pipe', 'format'='parquet')";
createStmt = (CreatePipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx);
pm.createPipe(createStmt);

// create or replace
String createOrReplaceSql = "create or replace pipe p_crud as insert into tbl1 " +
"select * from files('path'='fake://pipe', 'format'='parquet')";
CreatePipeStmt createOrReplace = (CreatePipeStmt) UtFrameUtils.parseStmtWithNewParser(createOrReplaceSql, ctx);
long previousId = getPipe("p_crud").getId();
pm.createPipe(createOrReplace);
Assert.assertNotEquals(previousId, getPipe("p_crud").getId());
pipe = pm.mayGetPipe(name).get();

// create or replace when not exists
previousId = pipe.getId();
dropPipe(name.getPipeName());
pm.createPipe(createOrReplace);
pipe = pm.mayGetPipe(name).get();
Assert.assertNotEquals(previousId, pipe.getId());

// pause
sql = "alter pipe p_crud suspend";
AlterPipeStmt pauseStmt = (AlterPipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx);
Expand Down

0 comments on commit c0f03a4

Please sign in to comment.