Skip to content

Commit

Permalink
Merge pull request #9824 from RinChanNOWWW/parquet-topk-opt
Browse files Browse the repository at this point in the history
feat(storage): `ParquetTable` support topk optimization.
  • Loading branch information
BohuTANG committed Feb 5, 2023
2 parents 2ba16d2 + c29ad67 commit 54a475a
Show file tree
Hide file tree
Showing 17 changed files with 876 additions and 539 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions src/query/catalog/src/plan/projection.rs
Expand Up @@ -74,6 +74,34 @@ impl Projection {
};
Ok(column_nodes)
}

pub fn add_col(&mut self, col: usize) {
match self {
Projection::Columns(indices) => {
if indices.contains(&col) {
return;
}
indices.push(col);
indices.sort();
}
Projection::InnerColumns(path_indices) => {
path_indices.entry(col).or_insert(vec![col]);
}
}
}

pub fn remove_col(&mut self, col: usize) {
match self {
Projection::Columns(indices) => {
if let Some(pos) = indices.iter().position(|x| *x == col) {
indices.remove(pos);
}
}
Projection::InnerColumns(path_indices) => {
path_indices.remove(&col);
}
}
}
}

impl core::fmt::Debug for Projection {
Expand Down
1 change: 1 addition & 0 deletions src/query/catalog/src/plan/pushdown.rs
Expand Up @@ -72,6 +72,7 @@ impl PushDownInfo {
}

if let RemoteExpr::<String>::ColumnRef { id, .. } = &order.0 {
// TODO: support sub column of nested type.
let field = schema.field_with_name(id).unwrap();
let data_type: DataType = field.data_type().into();
if !support(&data_type) {
Expand Down
7 changes: 7 additions & 0 deletions src/query/expression/src/kernels/topk.rs
Expand Up @@ -25,6 +25,7 @@ use crate::with_number_mapped_type;
use crate::Column;
use crate::Scalar;

#[derive(Clone)]
pub struct TopKSorter {
data: Vec<Scalar>,
limit: usize,
Expand Down Expand Up @@ -152,6 +153,12 @@ impl TopKSorter {
fn make_heap<T, F>(v: &mut [T], is_less: &mut F)
where F: FnMut(&T, &T) -> bool {
let len = v.len();

if len < 2 {
// no need to adjust heap
return;
}

let mut parent = (len - 2) / 2;

loop {
Expand Down
4 changes: 4 additions & 0 deletions src/query/storages/parquet/Cargo.toml
Expand Up @@ -21,11 +21,15 @@ common-functions = { path = "../../functions" }
common-meta-app = { path = "../../../meta/app" }
common-meta-types = { path = "../../../meta/types" }
common-pipeline-core = { path = "../../pipeline/core" }
common-pipeline-sources = { path = "../../pipeline/sources" }
common-storage = { path = "../../../common/storage" }

storages-common-index = { path = "../common/index" }
storages-common-pruner = { path = "../common/pruner" }
storages-common-table-meta = { path = "../common/table-meta" }

async-trait = { version = "0.1.57", package = "async-trait-fn" }
backon = "0.2"
chrono = { workspace = true }
futures = "0.3.24"
opendal = { workspace = true }
Expand Down

1 comment on commit 54a475a

@vercel
Copy link

@vercel vercel bot commented on 54a475a Feb 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.