Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Keep update statement output column's type consistent with target table's columns #41969

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 78 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.sql;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.analysis.SlotDescriptor;
Expand Down Expand Up @@ -41,25 +42,45 @@
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
import com.starrocks.sql.optimizer.base.ColumnRefSet;
import com.starrocks.sql.optimizer.base.PhysicalPropertySet;
import com.starrocks.sql.optimizer.operator.logical.LogicalProjectOperator;
import com.starrocks.sql.optimizer.operator.scalar.CastOperator;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.rewrite.ScalarOperatorRewriter;
import com.starrocks.sql.optimizer.rewrite.scalar.FoldConstantsRule;
import com.starrocks.sql.optimizer.rewrite.scalar.ScalarOperatorRewriteRule;
import com.starrocks.sql.optimizer.statistics.ColumnDict;
import com.starrocks.sql.optimizer.statistics.IDictManager;
import com.starrocks.sql.optimizer.transformer.LogicalPlan;
import com.starrocks.sql.optimizer.transformer.OptExprBuilder;
import com.starrocks.sql.optimizer.transformer.RelationTransformer;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.PlanFragmentBuilder;
import com.starrocks.thrift.TPartialUpdateMode;
import com.starrocks.thrift.TResultSinkType;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class UpdatePlanner {

public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) {
QueryRelation query = updateStmt.getQueryStatement().getQueryRelation();
List<String> colNames = query.getColumnOutputNames();
ColumnRefFactory columnRefFactory = new ColumnRefFactory();
LogicalPlan logicalPlan = new RelationTransformer(columnRefFactory, session).transform(query);

List<ColumnRefOperator> outputColumns = logicalPlan.getOutputColumn();
Table targetTable = updateStmt.getTable();

//1. Cast output columns type to target type
OptExprBuilder optExprBuilder = logicalPlan.getRootBuilder();
optExprBuilder = castOutputColumnsTypeToTargetColumns(columnRefFactory, targetTable,
colNames, outputColumns, optExprBuilder);

// TODO: remove forceDisablePipeline when all the operators support pipeline engine.
boolean isEnablePipeline = session.getSessionVariable().isEnablePipelineEngine();
boolean canUsePipeline = isEnablePipeline && DataSink.canTableSinkUsePipeline(updateStmt.getTable());
Expand All @@ -72,23 +93,23 @@ public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) {
// Non-query must use the strategy assign scan ranges per driver sequence, which local shuffle agg cannot use.
session.getSessionVariable().setEnableLocalShuffleAgg(false);

Table table = updateStmt.getTable();
long tableId = table.getId();
long tableId = targetTable.getId();
Optimizer optimizer = new Optimizer();
optimizer.setUpdateTableId(tableId);

OptExpression optimizedPlan = optimizer.optimize(
session,
logicalPlan.getRoot(),
optExprBuilder.getRoot(),
new PhysicalPropertySet(),
new ColumnRefSet(logicalPlan.getOutputColumn()),
new ColumnRefSet(outputColumns),
columnRefFactory);
ExecPlan execPlan = PlanFragmentBuilder.createPhysicalPlan(optimizedPlan, session,
logicalPlan.getOutputColumn(), columnRefFactory, colNames, TResultSinkType.MYSQL_PROTOCAL, false);
outputColumns, columnRefFactory, colNames, TResultSinkType.MYSQL_PROTOCAL, false);
DescriptorTable descriptorTable = execPlan.getDescTbl();
TupleDescriptor olapTuple = descriptorTable.createTupleDescriptor();

List<Pair<Integer, ColumnDict>> globalDicts = Lists.newArrayList();
for (Column column : table.getFullSchema()) {
for (Column column : targetTable.getFullSchema()) {
if (updateStmt.usePartialUpdate() && !column.isGeneratedColumn() &&
!updateStmt.isAssignmentColumn(column.getName()) && !column.isKey()) {
// When using partial update, skip columns which aren't key column and not be assign, except for
Expand All @@ -109,12 +130,12 @@ public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) {
}
olapTuple.computeMemLayout();

if (table instanceof OlapTable) {
if (targetTable instanceof OlapTable) {
List<Long> partitionIds = Lists.newArrayList();
for (Partition partition : table.getPartitions()) {
for (Partition partition : targetTable.getPartitions()) {
partitionIds.add(partition.getId());
}
OlapTable olapTable = (OlapTable) table;
OlapTable olapTable = (OlapTable) targetTable;
DataSink dataSink =
new OlapTableSink(olapTable, olapTuple, partitionIds, olapTable.writeQuorum(),
olapTable.enableReplicatedStorage(), false, olapTable.supportedAutomaticPartition());
Expand All @@ -139,11 +160,11 @@ public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) {
} catch (UserException e) {
throw new SemanticException(e.getMessage());
}
} else if (table instanceof SystemTable) {
DataSink dataSink = new SchemaTableSink((SystemTable) table);
} else if (targetTable instanceof SystemTable) {
DataSink dataSink = new SchemaTableSink((SystemTable) targetTable);
execPlan.getFragments().get(0).setSink(dataSink);
} else {
throw new SemanticException("Unsupported table type: " + table.getClass().getName());
throw new SemanticException("Unsupported table type: " + targetTable.getClass().getName());
}
if (canUsePipeline) {
PlanFragment sinkFragment = execPlan.getFragments().get(0);
Expand All @@ -153,7 +174,7 @@ public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) {
sinkFragment
.setPipelineDop(ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
}
if (table instanceof OlapTable) {
if (targetTable instanceof OlapTable) {
sinkFragment.setHasOlapTableSink();
}
sinkFragment.setForceSetTableSinkDop();
LiShuMing marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -170,4 +191,48 @@ public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) {
}
}
}

/**
* Cast output columns type to target type.
* @param columnRefFactory : column ref factory of update stmt.
* @param targetTable: target table of update stmt.
* @param colNames: column names of update stmt.
* @param outputColumns: output columns of update stmt.
* @param root: root logical plan of update stmt.
* @return: new root logical plan with cast operator.
*/
private static OptExprBuilder castOutputColumnsTypeToTargetColumns(ColumnRefFactory columnRefFactory,
Table targetTable,
List<String> colNames,
List<ColumnRefOperator> outputColumns,
OptExprBuilder root) {
Map<ColumnRefOperator, ScalarOperator> columnRefMap = new HashMap<>();
ScalarOperatorRewriter rewriter = new ScalarOperatorRewriter();
List<ScalarOperatorRewriteRule> rewriteRules = Arrays.asList(new FoldConstantsRule());
Preconditions.checkState(colNames.size() == outputColumns.size(), "Column name's size %s should be equal " +
"to output column refs' size %s", colNames.size(), outputColumns.size());

for (int columnIdx = 0; columnIdx < outputColumns.size(); ++columnIdx) {
ColumnRefOperator outputColumn = outputColumns.get(columnIdx);
String colName = colNames.get(columnIdx);
// It's safe to use getColumn directly, because the column name's case-insensitive is the same with table's schema.
Column column = targetTable.getColumn(colName);
Preconditions.checkState(column != null, "Column %s not found in table %s", colName,
packy92 marked this conversation as resolved.
Show resolved Hide resolved
targetTable.getName());
if (!column.getType().matchesType(outputColumn.getType())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the type can cast to target type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add it. But this should not happen since it will fail in the analyzer phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more tests.

// This should be always true but add a check here to avoid updating the wrong column type.
if (!column.getType().isFullyCompatible(outputColumn.getType())) {
throw new SemanticException(String.format("Output column type %s is not compatible table column type: %s",
outputColumn.getType(), column.getType()));
}
ColumnRefOperator k = columnRefFactory.create(column.getName(), column.getType(), column.isAllowNull());
ScalarOperator castOperator = new CastOperator(column.getType(), outputColumn, true);
columnRefMap.put(k, rewriter.rewrite(castOperator, rewriteRules));
outputColumns.set(columnIdx, k);
} else {
columnRefMap.put(outputColumn, outputColumn);
}
}
return root.withNewRoot(new LogicalProjectOperator(new HashMap<>(columnRefMap)));
}
}
84 changes: 84 additions & 0 deletions test/sql/test_dml/R/test_update
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
-- name: test_update
drop table if exists primary_key_with_null;
-- result:
-- !result
CREATE TABLE `primary_key_with_null` (
`k1` date,
`k2` datetime,
`k3` varchar(20),
`k4` varchar(20),
`k5` boolean,
`k6` tinyint,
`k7` smallint,
`k8` int,
`K9` bigint,
`K10` largeint,
`K11` float,
`K12` double,
`K13` decimal(27,9)
) PRIMARY KEY(`k1`, `k2`, `k3`)
DISTRIBUTED BY HASH(`k1`, `k2`, `k3`) BUCKETS 3
PROPERTIES ( "replication_num" = "1");
-- result:
-- !result
INSERT INTO primary_key_with_null VALUES
('2020-10-22','2020-10-23 12:12:12','k1','k4',0,1,2,3,4,5,1.1,1.12,2.889)
,('2020-10-23','2020-10-23 12:12:12','k2','k4',0,0,2,3,4,5,1.1,1.12,2.889)
,('2020-10-24','2020-10-23 12:12:12','k3','k4',0,1,2,3,4,5,1.1,1.12,2.889)
,('2020-10-25','2020-10-23 12:12:12','k4','k4',0,1,2,3,4,NULL,NULL,NULL,2.889);
-- result:
-- !result
UPDATE primary_key_with_null SET `k4` = 'update_k4', `k5` = 1, `k6` = 1, `k7` = 7, `k8` = 0, `k9` = 9, `k10` = 10, `k11` = 1.0, `k12` = 2.0,`k13` = 3.0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-23 12:12:12';
-- result:
-- !result
select * from primary_key_with_null order by k1, k2, k3;
-- result:
2020-10-22 2020-10-23 12:12:12 k1 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-23 2020-10-23 12:12:12 k2 k4 0 0 2 3 4 5 1.1 1.12 2.889000000
2020-10-24 2020-10-23 12:12:12 k3 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-25 2020-10-23 12:12:12 k4 k4 0 1 2 3 4 None None None 2.889000000
-- !result
UPDATE primary_key_with_null SET `k4` = 'update_k4', `k5` = 1, `k6` = 1, `k7` = 7, `k8` = 0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-22 00:00:00';
-- result:
-- !result
select * from primary_key_with_null order by k1, k2, k3;
-- result:
2020-10-22 2020-10-23 12:12:12 k1 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-23 2020-10-23 12:12:12 k2 k4 0 0 2 3 4 5 1.1 1.12 2.889000000
2020-10-24 2020-10-23 12:12:12 k3 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-25 2020-10-23 12:12:12 k4 k4 0 1 2 3 4 None None None 2.889000000
-- !result
UPDATE primary_key_with_null SET `K4` = 'update_k4', `K5` = 2, `K6` = 1, `k7` = 7, `k8` = 0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-22 00:00:00';
-- result:
-- !result
select * from primary_key_with_null order by k1, k2, k3;
-- result:
2020-10-22 2020-10-23 12:12:12 k1 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-23 2020-10-23 12:12:12 k2 k4 0 0 2 3 4 5 1.1 1.12 2.889000000
2020-10-24 2020-10-23 12:12:12 k3 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-25 2020-10-23 12:12:12 k4 k4 0 1 2 3 4 None None None 2.889000000
-- !result
UPDATE primary_key_with_null SET `K4` = 'update_k4', `K5` = 'INVALID', `K6` = 'xxx', `k7` = 7, `k8` = 0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-22 00:00:00';
-- result:
E: (1064, 'Getting analyzing error. Detail message: Invalid number format: xxx.')
-- !result
select * from primary_key_with_null order by k1, k2, k3;
-- result:
2020-10-22 2020-10-23 12:12:12 k1 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-23 2020-10-23 12:12:12 k2 k4 0 0 2 3 4 5 1.1 1.12 2.889000000
2020-10-24 2020-10-23 12:12:12 k3 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-25 2020-10-23 12:12:12 k4 k4 0 1 2 3 4 None None None 2.889000000
-- !result
UPDATE primary_key_with_null SET `k5` = 5;
-- result:
-- !result
select * from primary_key_with_null order by k1, k2, k3;
-- result:
2020-10-22 2020-10-23 12:12:12 k1 k4 1 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-23 2020-10-23 12:12:12 k2 k4 1 0 2 3 4 5 1.1 1.12 2.889000000
2020-10-24 2020-10-23 12:12:12 k3 k4 1 1 2 3 4 5 1.1 1.12 2.889000000
2020-10-25 2020-10-23 12:12:12 k4 k4 1 1 2 3 4 None None None 2.889000000
-- !result
drop table if exists primary_key_with_null;
-- result:
-- !result
43 changes: 43 additions & 0 deletions test/sql/test_dml/T/test_update
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- name: test_update
drop table if exists primary_key_with_null;
CREATE TABLE `primary_key_with_null` (
`k1` date,
`k2` datetime,
`k3` varchar(20),
`k4` varchar(20),
`k5` boolean,
`k6` tinyint,
`k7` smallint,
`k8` int,
`K9` bigint,
`K10` largeint,
`K11` float,
`K12` double,
`K13` decimal(27,9)
) PRIMARY KEY(`k1`, `k2`, `k3`)
DISTRIBUTED BY HASH(`k1`, `k2`, `k3`) BUCKETS 3
PROPERTIES ( "replication_num" = "1");

INSERT INTO primary_key_with_null VALUES
('2020-10-22','2020-10-23 12:12:12','k1','k4',0,1,2,3,4,5,1.1,1.12,2.889)
,('2020-10-23','2020-10-23 12:12:12','k2','k4',0,0,2,3,4,5,1.1,1.12,2.889)
,('2020-10-24','2020-10-23 12:12:12','k3','k4',0,1,2,3,4,5,1.1,1.12,2.889)
,('2020-10-25','2020-10-23 12:12:12','k4','k4',0,1,2,3,4,NULL,NULL,NULL,2.889);

UPDATE primary_key_with_null SET `k4` = 'update_k4', `k5` = 1, `k6` = 1, `k7` = 7, `k8` = 0, `k9` = 9, `k10` = 10, `k11` = 1.0, `k12` = 2.0,`k13` = 3.0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-23 12:12:12';
select * from primary_key_with_null order by k1, k2, k3;

UPDATE primary_key_with_null SET `k4` = 'update_k4', `k5` = 1, `k6` = 1, `k7` = 7, `k8` = 0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-22 00:00:00';
select * from primary_key_with_null order by k1, k2, k3;

-- test column case-insensitive
UPDATE primary_key_with_null SET `K4` = 'update_k4', `K5` = 2, `K6` = 1, `k7` = 7, `k8` = 0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-22 00:00:00';
select * from primary_key_with_null order by k1, k2, k3;

-- invalid column type
UPDATE primary_key_with_null SET `K4` = 'update_k4', `K5` = 'INVALID', `K6` = 'xxx', `k7` = 7, `k8` = 0 WHERE `k3` ="k3" and `k1`='2020-10-22' and `k2`='2020-10-22 00:00:00';
select * from primary_key_with_null order by k1, k2, k3;

UPDATE primary_key_with_null SET `k5` = 5;
select * from primary_key_with_null order by k1, k2, k3;
drop table if exists primary_key_with_null;