Skip to content

Commit

Permalink
Merge pull request #7551 from RinChanNOWWW/improve_prewhere
Browse files Browse the repository at this point in the history
refactor(query): reduce deserialization times of prewhere.
  • Loading branch information
mergify[bot] committed Sep 9, 2022
2 parents 8202fed + c68bca7 commit 6024eaf
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/io/read/block_reader.rs
Expand Up @@ -87,6 +87,10 @@ impl BlockReader {
}))
}

pub fn schema(&self) -> DataSchemaRef {
self.projected_schema.clone()
}

fn to_array_iter(
metas: Vec<&ColumnMeta>,
chunks: Vec<Vec<u8>>,
Expand Down
55 changes: 38 additions & 17 deletions src/query/storages/fuse/src/operations/read.rs
Expand Up @@ -154,11 +154,16 @@ impl FuseTable {

type DataChunks = Vec<(usize, Vec<u8>)>;

struct PrewhereData {
data_block: DataBlock,
filter: ColumnRef,
}

enum State {
ReadDataPrewhere(PartInfoPtr),
ReadDataRemain(PartInfoPtr, DataChunks, ColumnRef),
ReadDataRemain(PartInfoPtr, PrewhereData),
PrewhereFilter(PartInfoPtr, DataChunks),
Deserialize(PartInfoPtr, DataChunks, Option<ColumnRef>),
Deserialize(PartInfoPtr, DataChunks, Option<PrewhereData>),
Generated(Option<PartInfoPtr>, DataBlock),
Finish,
}
Expand Down Expand Up @@ -285,7 +290,7 @@ impl Processor for FuseTableSource {
match self.state {
State::Finish => Ok(Event::Finished),
State::ReadDataPrewhere(_) => Ok(Event::Async),
State::ReadDataRemain(_, _, _) => Ok(Event::Async),
State::ReadDataRemain(_, _) => Ok(Event::Async),
State::PrewhereFilter(_, _) => Ok(Event::Sync),
State::Deserialize(_, _, _) => Ok(Event::Sync),
State::Generated(_, _) => Err(ErrorCode::LogicalError("It's a bug.")),
Expand All @@ -294,9 +299,28 @@ impl Processor for FuseTableSource {

fn process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, State::Finish) {
State::Deserialize(part, chunks, filter) => {
let data_block = if let Some(filter) = filter {
let block = self.output_reader.deserialize(part, chunks)?;
State::Deserialize(part, chunks, prewhere_data) => {
let data_block = if let Some(PrewhereData {
data_block: mut prewhere_blocks,
filter,
}) = prewhere_data
{
let block = if chunks.is_empty() {
prewhere_blocks
} else if let Some(remain_reader) = self.remain_reader.as_ref() {
let remain_block = remain_reader.deserialize(part, chunks)?;
for (col, field) in remain_block
.columns()
.iter()
.zip(remain_block.schema().fields())
{
prewhere_blocks =
prewhere_blocks.add_column(col.clone(), field.clone())?;
}
prewhere_blocks.resort(self.output_reader.schema())?
} else {
return Err(ErrorCode::LogicalError("It's a bug. Need remain reader"));
};
// the last step of prewhere
DataBlock::filter_block(block, &filter)?
} else {
Expand All @@ -308,12 +332,10 @@ impl Processor for FuseTableSource {
}
State::PrewhereFilter(part, chunks) => {
// deserialize prewhere data block first
let block = self
.prewhere_reader
.deserialize(part.clone(), chunks.clone())?;
let data_block = self.prewhere_reader.deserialize(part.clone(), chunks)?;
if let Some(filter) = self.prewhere_filter.as_ref() {
// do filter
let res = filter.execute(&block)?;
let res = filter.execute(&data_block)?;
let filter = DataBlock::cast_to_nonull_boolean(res.column(0))?;
// shortcut, if predicates is const boolean (or can be cast to boolean)
if !DataBlock::filter_exists(&filter)? {
Expand All @@ -324,10 +346,11 @@ impl Processor for FuseTableSource {
}
if self.remain_reader.is_none() {
// shortcut, we don't need to read remain data
let block = DataBlock::filter_block(block, &filter)?;
let block = DataBlock::filter_block(data_block, &filter)?;
self.generate_one_block(block)?;
} else {
self.state = State::ReadDataRemain(part, chunks, filter);
self.state =
State::ReadDataRemain(part, PrewhereData { data_block, filter });
}
Ok(())
} else {
Expand All @@ -353,12 +376,10 @@ impl Processor for FuseTableSource {
}
Ok(())
}
State::ReadDataRemain(part, prewhere_chunks, filter) => {
State::ReadDataRemain(part, prewhere_data) => {
if let Some(remain_reader) = self.remain_reader.as_ref() {
let mut chunks = remain_reader.read_columns_data(part.clone()).await?;
// merge two parts of chunks
chunks.extend(prewhere_chunks);
self.state = State::Deserialize(part, chunks, Some(filter));
let chunks = remain_reader.read_columns_data(part.clone()).await?;
self.state = State::Deserialize(part, chunks, Some(prewhere_data));
Ok(())
} else {
return Err(ErrorCode::LogicalError("It's a bug. No remain reader"));
Expand Down

1 comment on commit 6024eaf

@vercel
Copy link

@vercel vercel bot commented on 6024eaf Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-git-main-databend.vercel.app
databend.rs
databend-databend.vercel.app

Please sign in to comment.