Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support create or replace pipe #37658

Merged
merged 1 commit into from Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 16 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeManager.java
Expand Up @@ -65,10 +65,16 @@ public void createPipe(CreatePipeStmt stmt) throws DdlException {
Pair<Long, String> dbIdAndName = resolvePipeNameUnlock(stmt.getPipeName());
boolean existed = nameToId.containsKey(dbIdAndName);
if (existed) {
if (!stmt.isIfNotExists()) {
if (!stmt.isIfNotExists() && !stmt.isReplace()) {
ErrorReport.reportSemanticException(ErrorCode.ERR_PIPE_EXISTS);
}
return;
if (stmt.isIfNotExists()) {
return;
} else if (stmt.isReplace()) {
LOG.info("Pipe {} already exist, replace it with a new one", stmt.getPipeName());
Pipe pipe = pipeMap.get(nameToId.get(dbIdAndName));
dropPipeImpl(pipe);
}
}

// Add pipe
Expand Down Expand Up @@ -97,12 +103,7 @@ public void dropPipe(DropPipeStmt stmt) throws DdlException {
}
pipe = pipeMap.get(nameToId.get(dbAndName));

pipe.suspend();
pipe.destroy();
removePipe(pipe);

// persistence
repo.deletePipe(pipe);
dropPipeImpl(pipe);
} catch (Throwable e) {
LOG.error("drop pipe {} failed", pipe, e);
throw e;
Expand All @@ -111,6 +112,13 @@ public void dropPipe(DropPipeStmt stmt) throws DdlException {
}
}

private void dropPipeImpl(Pipe pipe) {
pipe.suspend();
pipe.destroy();
removePipe(pipe);
repo.deletePipe(pipe);
}

public void dropPipesOfDb(String dbName, long dbId) {
try {
lock.writeLock().lock();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Attempting to replace a Pipe may fail if pipeMap.get(nameToId.get(dbIdAndName)) returns null, leading to a NullPointerException.

You can modify the code like this:

@@ -74,8 +74,13 @@
                 } else if (stmt.isReplace()) {
                     LOG.info("Pipe {} already exist, replace it with a new one", stmt.getPipeName());
                     Long pipeId = nameToId.get(dbIdAndName.getFirst());
-                    Pipe pipe = pipeMap.get(pipeId);
+                    Pipe pipe = pipeMap.get(dbIdAndName.getFirst());
+                    if (pipe == null) {
+                        throw new DdlException("Cannot replace non-existing pipe with name: " + dbIdAndName.getSecond());
+                    }
                     dropPipeImpl(pipe);
+                }
             }
 
             // Add pipe

In the modification above, I've added a null check for pipe before attempting to replace it. If pipe is null, an exception is thrown indicating that the pipe cannot be replaced because it doesn't exist. This avoids the potential NullPointerException. Additionally, make sure to use the correct key when obtaining pipe from pipeMap; the key should be derived from the resolved dbIdAndName, specifically the first element (dbId) in case dbIdAndName is a pair of database ID and name as the context suggests.

Expand Down
Expand Up @@ -25,6 +25,7 @@

public class CreatePipeStmt extends DdlStmt {

private final boolean orReplace;
private final boolean ifNotExists;
private final PipeName pipeName;
private final int insertSqlStartIndex;
Expand All @@ -34,16 +35,22 @@ public class CreatePipeStmt extends DdlStmt {
private TableName targetTable;
private FilePipeSource pipeSource;

public CreatePipeStmt(boolean ifNotExists, PipeName pipeName, int insertSqlStartIndex, InsertStmt insertStmt,
public CreatePipeStmt(boolean ifNotExists, boolean orReplace,
PipeName pipeName, int insertSqlStartIndex, InsertStmt insertStmt,
Map<String, String> properties, NodePosition pos) {
super(pos);
this.orReplace = orReplace;
this.ifNotExists = ifNotExists;
this.pipeName = pipeName;
this.insertSqlStartIndex = insertSqlStartIndex;
this.insertStmt = insertStmt;
this.properties = properties;
}

public boolean isReplace() {
return orReplace;
}

public boolean isIfNotExists() {
return ifNotExists;
}
Expand Down
Expand Up @@ -3934,7 +3934,12 @@ private PipeName resolvePipeName(StarRocksParser.QualifiedNameContext context) {
@Override
public ParseNode visitCreatePipeStatement(StarRocksParser.CreatePipeStatementContext context) {
PipeName pipeName = resolvePipeName(context.qualifiedName());
boolean ifNotExists = context.IF() != null;
boolean ifNotExists = context.ifNotExists() != null && context.ifNotExists().IF() != null;
boolean replace = context.orReplace() != null && context.orReplace().OR() != null;

if (ifNotExists && replace) {
throw new ParsingException(PARSER_ERROR_MSG.conflictedOptions("OR REPLACE", "IF NOT EXISTS"));
}
ParseNode insertNode = visit(context.insertStatement());
if (!(insertNode instanceof InsertStmt)) {
throw new ParsingException(PARSER_ERROR_MSG.unsupportedStatement(insertNode.toSql()),
Expand All @@ -3950,7 +3955,8 @@ public ParseNode visitCreatePipeStatement(StarRocksParser.CreatePipeStatementCon
InsertStmt insertStmt = (InsertStmt) insertNode;
int insertSqlIndex = context.insertStatement().start.getStartIndex();

return new CreatePipeStmt(ifNotExists, pipeName, insertSqlIndex, insertStmt, properties, createPos(context));
return new CreatePipeStmt(ifNotExists, replace, pipeName, insertSqlIndex, insertStmt, properties,
createPos(context));
}

@Override
Expand Down
Expand Up @@ -471,6 +471,13 @@ withRowAccessPolicy
: WITH ROW ACCESS POLICY policyName=qualifiedName (ON identifierList)?
;

orReplace:
(OR REPLACE)?
;
ifNotExists:
(IF NOT EXISTS)?
;

createTemporaryTableStatement
: CREATE TEMPORARY TABLE qualifiedName
queryStatement
Expand Down Expand Up @@ -1852,7 +1859,7 @@ showSmallFilesStatement
// -------------------------------------------- Pipe Statement ---------------------------------------------------------

createPipeStatement
: CREATE PIPE (IF NOT EXISTS)? qualifiedName
: CREATE orReplace PIPE ifNotExists qualifiedName
properties?
AS insertStatement
;
Expand Down
Expand Up @@ -620,11 +620,27 @@ public void pipeCRUD() throws Exception {
// create if not exists
CreatePipeStmt createAgain = createStmt;
Assert.assertThrows(SemanticException.class, () -> pm.createPipe(createAgain));
sql =
"create pipe if not exists p_crud as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')";
sql = "create pipe if not exists p_crud as insert into tbl1 " +
"select * from files('path'='fake://pipe', 'format'='parquet')";
createStmt = (CreatePipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx);
pm.createPipe(createStmt);

// create or replace
String createOrReplaceSql = "create or replace pipe p_crud as insert into tbl1 " +
"select * from files('path'='fake://pipe', 'format'='parquet')";
CreatePipeStmt createOrReplace = (CreatePipeStmt) UtFrameUtils.parseStmtWithNewParser(createOrReplaceSql, ctx);
long previousId = getPipe("p_crud").getId();
pm.createPipe(createOrReplace);
Assert.assertNotEquals(previousId, getPipe("p_crud").getId());
pipe = pm.mayGetPipe(name).get();

// create or replace when not exists
previousId = pipe.getId();
dropPipe(name.getPipeName());
pm.createPipe(createOrReplace);
pipe = pm.mayGetPipe(name).get();
Assert.assertNotEquals(previousId, pipe.getId());

// pause
sql = "alter pipe p_crud suspend";
AlterPipeStmt pauseStmt = (AlterPipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx);
Expand Down