Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): reduce deserialization times of prewhere. #7551

Merged
merged 2 commits into from Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/io/read/block_reader.rs
Expand Up @@ -87,6 +87,10 @@ impl BlockReader {
}))
}

pub fn schema(&self) -> DataSchemaRef {
self.projected_schema.clone()
}

fn to_array_iter(
metas: Vec<&ColumnMeta>,
chunks: Vec<Vec<u8>>,
Expand Down
55 changes: 38 additions & 17 deletions src/query/storages/fuse/src/operations/read.rs
Expand Up @@ -154,11 +154,16 @@ impl FuseTable {

type DataChunks = Vec<(usize, Vec<u8>)>;

struct PrewhereData {
data_block: DataBlock,
filter: ColumnRef,
}

enum State {
ReadDataPrewhere(PartInfoPtr),
ReadDataRemain(PartInfoPtr, DataChunks, ColumnRef),
ReadDataRemain(PartInfoPtr, PrewhereData),
PrewhereFilter(PartInfoPtr, DataChunks),
Deserialize(PartInfoPtr, DataChunks, Option<ColumnRef>),
Deserialize(PartInfoPtr, DataChunks, Option<PrewhereData>),
Generated(Option<PartInfoPtr>, DataBlock),
Finish,
}
Expand Down Expand Up @@ -285,7 +290,7 @@ impl Processor for FuseTableSource {
match self.state {
State::Finish => Ok(Event::Finished),
State::ReadDataPrewhere(_) => Ok(Event::Async),
State::ReadDataRemain(_, _, _) => Ok(Event::Async),
State::ReadDataRemain(_, _) => Ok(Event::Async),
State::PrewhereFilter(_, _) => Ok(Event::Sync),
State::Deserialize(_, _, _) => Ok(Event::Sync),
State::Generated(_, _) => Err(ErrorCode::LogicalError("It's a bug.")),
Expand All @@ -294,9 +299,28 @@ impl Processor for FuseTableSource {

fn process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, State::Finish) {
State::Deserialize(part, chunks, filter) => {
let data_block = if let Some(filter) = filter {
let block = self.output_reader.deserialize(part, chunks)?;
State::Deserialize(part, chunks, prewhere_data) => {
let data_block = if let Some(PrewhereData {
data_block: mut prewhere_blocks,
filter,
}) = prewhere_data
{
let block = if chunks.is_empty() {
prewhere_blocks
} else if let Some(remain_reader) = self.remain_reader.as_ref() {
let remain_block = remain_reader.deserialize(part, chunks)?;
for (col, field) in remain_block
.columns()
.iter()
.zip(remain_block.schema().fields())
{
prewhere_blocks =
prewhere_blocks.add_column(col.clone(), field.clone())?;
}
prewhere_blocks.resort(self.output_reader.schema())?
} else {
return Err(ErrorCode::LogicalError("It's a bug. Need remain reader"));
};
// the last step of prewhere
DataBlock::filter_block(block, &filter)?
} else {
Expand All @@ -308,12 +332,10 @@ impl Processor for FuseTableSource {
}
State::PrewhereFilter(part, chunks) => {
// deserialize prewhere data block first
let block = self
.prewhere_reader
.deserialize(part.clone(), chunks.clone())?;
let data_block = self.prewhere_reader.deserialize(part.clone(), chunks)?;
if let Some(filter) = self.prewhere_filter.as_ref() {
// do filter
let res = filter.execute(&block)?;
let res = filter.execute(&data_block)?;
let filter = DataBlock::cast_to_nonull_boolean(res.column(0))?;
// shortcut, if predicates is const boolean (or can be cast to boolean)
if !DataBlock::filter_exists(&filter)? {
Expand All @@ -324,10 +346,11 @@ impl Processor for FuseTableSource {
}
if self.remain_reader.is_none() {
// shortcut, we don't need to read remain data
let block = DataBlock::filter_block(block, &filter)?;
let block = DataBlock::filter_block(data_block, &filter)?;
self.generate_one_block(block)?;
} else {
self.state = State::ReadDataRemain(part, chunks, filter);
self.state =
State::ReadDataRemain(part, PrewhereData { data_block, filter });
}
Ok(())
} else {
Expand All @@ -353,12 +376,10 @@ impl Processor for FuseTableSource {
}
Ok(())
}
State::ReadDataRemain(part, prewhere_chunks, filter) => {
State::ReadDataRemain(part, prewhere_data) => {
if let Some(remain_reader) = self.remain_reader.as_ref() {
let mut chunks = remain_reader.read_columns_data(part.clone()).await?;
// merge two parts of chunks
chunks.extend(prewhere_chunks);
self.state = State::Deserialize(part, chunks, Some(filter));
let chunks = remain_reader.read_columns_data(part.clone()).await?;
self.state = State::Deserialize(part, chunks, Some(prewhere_data));
Ok(())
} else {
return Err(ErrorCode::LogicalError("It's a bug. No remain reader"));
Expand Down