Skip to content

Commit

Permalink
Merge pull request #2891 from miamia0/fix_2857
Browse files Browse the repository at this point in the history
[bugfix] fix issue 2857:Wrong query result when use alias
  • Loading branch information
zhang2014 committed Nov 19, 2021
2 parents 940d2e5 + c616815 commit 388c9f7
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 14 deletions.
40 changes: 26 additions & 14 deletions query/src/pipelines/transforms/transform_expression_executor.rs
Expand Up @@ -74,8 +74,12 @@ impl ExpressionExecutor {

let mut column_map: HashMap<String, DataColumnWithField> = HashMap::new();

// a + 1 as b, a + 1 as c
let mut alias_map: HashMap<String, Vec<String>> = HashMap::new();
let mut alias_map: HashMap<String, DataColumnWithField> = HashMap::new();

// supported a + 1 as b, a + 1 as c
// supported a + 1 as a, a as b
// !currently not supported a+1 as c, b+1 as c
let mut alias_action_map: HashMap<String, Vec<String>> = HashMap::new();

for f in block.schema().fields().iter() {
let column =
Expand All @@ -84,13 +88,12 @@ impl ExpressionExecutor {
}

let rows = block.num_rows();

for action in self.chain.actions.iter() {
if let ExpressionAction::Alias(alias) = action {
if let Some(v) = alias_map.get_mut(&alias.arg_name) {
if let Some(v) = alias_action_map.get_mut(&alias.arg_name) {
v.push(alias.name.clone());
} else {
alias_map.insert(alias.arg_name.clone(), vec![alias.name.clone()]);
alias_action_map.insert(alias.arg_name.clone(), vec![alias.name.clone()]);
}
}

Expand Down Expand Up @@ -149,26 +152,35 @@ impl ExpressionExecutor {
}

if self.alias_project {
for (k, v) in alias_map.iter() {
for (k, v) in alias_action_map.iter() {
let column = column_map.get(k).cloned().ok_or_else(|| {
ErrorCode::LogicalError("Arguments must be prepared before alias transform")
})?;

for name in v.iter() {
column_map.insert(name.clone(), column.clone());
match alias_map.insert(name.clone(), column.clone()) {
Some(_) => Err(ErrorCode::UnImplement(format!(
"Duplicate alias name :{}",
name
))),
_ => Ok(()),
}?;
}
}
}

let mut project_columns = Vec::with_capacity(self.output_schema.fields().len());
for f in self.output_schema.fields() {
let column = column_map.get(f.name()).ok_or_else(|| {
ErrorCode::LogicalError(format!(
"Projection column: {} not exists in {:?}, there are bugs!",
f.name(),
column_map.keys()
))
})?;
let column = match alias_map.get(f.name()) {
Some(data_column) => data_column,
None => column_map.get(f.name()).ok_or_else(|| {
ErrorCode::LogicalError(format!(
"Projection column: {} not exists in {:?}, there are bugs!",
f.name(),
column_map.keys()
))
})?,
};
project_columns.push(column.column().clone());
}
// projection to remove unused columns
Expand Down
49 changes: 49 additions & 0 deletions query/src/pipelines/transforms/transform_expression_test.rs
Expand Up @@ -100,3 +100,52 @@ async fn test_transform_expression_error() -> Result<()> {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_transform_expression_issue2857() -> Result<()> {
let ctx = crate::tests::try_create_context()?;
let test_source = crate::tests::NumberTestData::create(ctx.clone());

let mut pipeline = Pipeline::create(ctx.clone());
let source = test_source.number_source_transform_for_test(8)?;
pipeline.add_source(Arc::new(source))?;

if let PlanNode::Projection(plan) = PlanBuilder::create(test_source.number_schema_for_test()?)
.project(&[
col("number").alias("number"),
add(col("number"), lit(1u8)).alias("number1"),
])?
.build()?
{
pipeline.add_simple_transform(|| {
Ok(Box::new(ProjectionTransform::try_create(
plan.input.schema(),
plan.schema.clone(),
plan.expr.clone(),
)?))
})?;
}

let stream = pipeline.execute().await?;
let result = stream.try_collect::<Vec<_>>().await?;
let block = &result[0];
assert_eq!(block.num_columns(), 2);

let expected = vec![
"+--------+---------+",
"| number | number1 |",
"+--------+---------+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"| 5 | 6 |",
"| 6 | 7 |",
"| 7 | 8 |",
"+--------+---------+",
];
common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice());

Ok(())
}

0 comments on commit 388c9f7

Please sign in to comment.