Skip to content

Commit

Permalink
fix(cubestore): fix index schema used for partition pruning (#2125)
Browse files Browse the repository at this point in the history
The code was an incorrect attempt to handle aliasing in the intermediate
version. The logical plan actually has the qualifiers stripped so it
is not required.

Moreover, the code contained an error that could cause comparisons of
different columns during partition pruning.
  • Loading branch information
ilya-biryukov committed Feb 17, 2021
1 parent c4c009c commit 99635be
Showing 1 changed file with 10 additions and 35 deletions.
45 changes: 10 additions & 35 deletions rust/cubestore/src/queryplanner/serialized_plan.rs
Expand Up @@ -624,10 +624,8 @@ impl SerializedPlan {
.get_active_partitions_and_chunks_by_index_id_for_select(index.get_id())
.await?;

let partition_filter = PartitionFilter::extract(
&partition_filter_schema(&projection, &table, &index),
filters,
);
let partition_filter =
PartitionFilter::extract(&partition_filter_schema(&index), filters);
log::trace!("Extracted partition filter is {:?}", partition_filter);
let candidate_partitions = partitions.len();
let mut pruned_partitions = 0;
Expand Down Expand Up @@ -999,37 +997,14 @@ impl SerializedPlan {
}
}

fn partition_filter_schema(
projection: &Option<Vec<usize>>,
table: &IdRow<Table>,
index: &IdRow<Index>,
) -> arrow::datatypes::Schema {
let mapped_projection = projection.as_ref().map(|p| {
CubeTable::project_to_index_positions(&CubeTable::project_to_table(table, p), index)
.into_iter()
.map(|i| i.unwrap())
.collect::<Vec<_>>()
});

let sort_key_len = index.get_row().sort_key_size() as usize;

let index_cols = index.get_row().columns();
fn partition_filter_schema(index: &IdRow<Index>) -> arrow::datatypes::Schema {
let schema_fields: Vec<Field>;
if let Some(p) = &mapped_projection {
schema_fields = index_cols
.iter()
.enumerate()
.filter_map(|(i, f)| p.iter().find(|p_i| *p_i == &i).map(|_| f.clone().into()))
.take(sort_key_len)
.collect();
} else {
schema_fields = index_cols
.iter()
.map(|c| c.clone().into())
.take(sort_key_len)
.collect()
};

assert!(schema_fields.len() <= sort_key_len);
schema_fields = index
.get_row()
.columns()
.iter()
.map(|c| c.clone().into())
.take(index.get_row().sort_key_size() as usize)
.collect();
arrow::datatypes::Schema::new(schema_fields)
}

0 comments on commit 99635be

Please sign in to comment.