From 4d92802382948dea5912276dc5c29042dce46c37 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Tue, 31 Jan 2023 21:58:26 +0800 Subject: [PATCH 1/7] read_partition add logic about topk optimization. --- Cargo.lock | 1 + src/query/storages/parquet/Cargo.toml | 2 + .../storages/parquet/src/parquet_part.rs | 27 +++--- .../parquet/src/parquet_table/partition.rs | 8 ++ src/query/storages/parquet/src/pruning.rs | 85 ++++++++++++++++--- 5 files changed, 99 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e149fad8ae5f..a9d06a3adcca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2290,6 +2290,7 @@ dependencies = [ "futures", "opendal", "serde", + "storages-common-index", "storages-common-pruner", "storages-common-table-meta", "typetag", diff --git a/src/query/storages/parquet/Cargo.toml b/src/query/storages/parquet/Cargo.toml index b9f854de41ad..268e4b3e6dbf 100644 --- a/src/query/storages/parquet/Cargo.toml +++ b/src/query/storages/parquet/Cargo.toml @@ -22,6 +22,8 @@ common-meta-app = { path = "../../../meta/app" } common-meta-types = { path = "../../../meta/types" } common-pipeline-core = { path = "../../pipeline/core" } common-storage = { path = "../../../common/storage" } + +storages-common-index = { path = "../common/index" } storages-common-pruner = { path = "../common/pruner" } storages-common-table-meta = { path = "../common/table-meta" } diff --git a/src/query/storages/parquet/src/parquet_part.rs b/src/query/storages/parquet/src/parquet_part.rs index 93d26ca1d8e3..55c1ac92c06d 100644 --- a/src/query/storages/parquet/src/parquet_part.rs +++ b/src/query/storages/parquet/src/parquet_part.rs @@ -26,12 +26,14 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::table::ColumnId; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::Scalar; #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct ColumnMeta { pub offset: u64, pub length: u64, pub compression: Compression, + pub min_max: Option<(Scalar, Scalar)>, } #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -40,6 +42,17 @@ pub struct ParquetRowGroupPart { pub num_rows: usize, pub column_metas: HashMap, pub row_selection: Option>, + /// If all row group parts have min/max stats. This is used for topk push down optimization. + /// + /// If there is one row group part does not have min/max stats, we cannot conduct topk push down optimization. + pub all_have_minmax: bool, +} + +impl ParquetRowGroupPart { + pub fn convert_to_part_info(mut self, all_have_minmax: bool) -> PartInfoPtr { + self.all_have_minmax = all_have_minmax; + Arc::new(Box::new(self)) + } } #[typetag::serde(name = "parquet_row_group")] @@ -63,20 +76,6 @@ impl PartInfo for ParquetRowGroupPart { } impl ParquetRowGroupPart { - pub fn create( - location: String, - num_rows: usize, - column_metas: HashMap, - row_selection: Option>, - ) -> Arc> { - Arc::new(Box::new(ParquetRowGroupPart { - location, - num_rows, - column_metas, - row_selection, - })) - } - pub fn from_part(info: &PartInfoPtr) -> Result<&ParquetRowGroupPart> { match info.as_any().downcast_ref::() { Some(part_ref) => Ok(part_ref), diff --git a/src/query/storages/parquet/src/parquet_table/partition.rs b/src/query/storages/parquet/src/parquet_table/partition.rs index 42cf070ca204..afd915aa75c1 100644 --- a/src/query/storages/parquet/src/parquet_table/partition.rs +++ b/src/query/storages/parquet/src/parquet_table/partition.rs @@ -21,6 +21,8 @@ use common_catalog::plan::PushDownInfo; use common_catalog::table_context::TableContext; use common_exception::Result; use common_functions::scalars::BUILTIN_FUNCTIONS; +use storages_common_index::Index; +use storages_common_index::RangeIndex; use storages_common_pruner::RangePrunerCreator; use super::table::arrow_to_table_schema; @@ -50,6 +52,11 @@ impl ParquetTable { Projection::Columns(indices) }; + let top_k = push_down + .as_ref() + .map(|p| p.top_k(&self.table_info.schema(), RangeIndex::supported_type)) + .unwrap_or_default(); + // Currently, arrow2 doesn't support reading stats of a inner column of a nested type. // Therefore, if there is inner fields in projection, we skip the row group pruning. let skip_pruning = matches!(projection, Projection::InnerColumns(_)); @@ -111,6 +118,7 @@ impl ParquetTable { columns_to_read, column_nodes: projected_column_nodes, skip_pruning, + top_k, }; pruner.read_and_prune_partitions().await diff --git a/src/query/storages/parquet/src/pruning.rs b/src/query/storages/parquet/src/pruning.rs index bb567800bfa3..4c5299d0d52f 100644 --- a/src/query/storages/parquet/src/pruning.rs +++ b/src/query/storages/parquet/src/pruning.rs @@ -31,6 +31,7 @@ use common_base::base::tokio; use common_catalog::plan::PartStatistics; use common_catalog::plan::Partitions; use common_catalog::plan::PartitionsShuffleKind; +use common_catalog::plan::TopK; use common_exception::ErrorCode; use common_exception::Result; use common_expression::Expr; @@ -64,6 +65,11 @@ pub struct PartitionPruner { pub column_nodes: ColumnNodes, /// Whether to skip pruning. pub skip_pruning: bool, + /// top k information from pushed down information. + pub top_k: Option, + // TODO: use limit information for pruning + // /// Limit of this query. If there is order by and filter, it will not be used (assign to `usize::MAX`). + // pub limit: usize, } impl PartitionPruner { @@ -79,6 +85,7 @@ impl PartitionPruner { columns_to_read, column_nodes, skip_pruning, + top_k, } = self; // part stats @@ -128,6 +135,10 @@ impl PartitionPruner { }; // 2. Use file meta to prune row groups or pages. + + // If one row group does not have stats, we cannot use the stats for topk optimization. + let mut all_have_minmax = true; + for (file_id, file_meta) in file_metas.iter().enumerate() { partitions_total += file_meta.row_groups.len(); let mut row_group_pruned = vec![false; file_meta.row_groups.len()]; @@ -138,7 +149,7 @@ impl PartitionPruner { .any(|c| c.metadata().statistics.is_none()) }); - if row_group_pruner.is_some() && !skip_pruning && !no_stats { + let row_group_stats = if row_group_pruner.is_some() && !skip_pruning && !no_stats { let pruner = row_group_pruner.as_ref().unwrap(); // If collecting stats fails or `should_keep` is true, we still read the row group. // Otherwise, the row group will be pruned. @@ -152,11 +163,21 @@ impl PartitionPruner { { row_group_pruned[idx] = !pruner.should_keep(stats); } + Some(row_group_stats) + } else { + None } - } + } else if top_k.is_some() { + collect_row_group_stats(column_nodes, &file_meta.row_groups).ok() + } else { + None + }; + + // If one row group does not have stats, we cannot use the stats for topk optimization. + all_have_minmax &= row_group_stats.is_some(); - for (idx, rg) in file_meta.row_groups.iter().enumerate() { - if row_group_pruned[idx] { + for (rg_idx, rg) in file_meta.row_groups.iter().enumerate() { + if row_group_pruned[rg_idx] { continue; } @@ -185,22 +206,66 @@ impl PartitionPruner { for index in columns_to_read { let c = &rg.columns()[*index]; let (offset, length) = c.byte_range(); - column_metas.insert(*index as u32, ColumnMeta { + + let min_max = row_group_stats.as_ref().map(|stats| { + let stat = stats[rg_idx].get(&(*index as u32)).unwrap(); + (stat.min.clone(), stat.max.clone()) + }); + + column_metas.insert(*index, ColumnMeta { offset, length, compression: c.compression(), + min_max, }); } - partitions.push(ParquetRowGroupPart::create( - locations[file_id].clone(), - rg.num_rows(), + partitions.push(ParquetRowGroupPart { + location: locations[file_id].clone(), + num_rows: rg.num_rows(), column_metas, row_selection, - )) + all_have_minmax: false, + }) } } + // 3. Check if can conduct topk push down optimization. + // Only all row groups have min/max stats can we use topk optimization. + // If we can use topk optimization, we should use `PartitionsShuffleKind::Seq`. + let partition_kind = if let (Some(top_k), true) = (top_k, all_have_minmax) { + partitions.sort_by(|a, b| { + let (a_min, a_max) = a + .column_metas + .get(&(top_k.column_id as usize)) + .unwrap() + .min_max + .as_ref() + .unwrap(); + let (b_min, b_max) = b + .column_metas + .get(&(top_k.column_id as usize)) + .unwrap() + .min_max + .as_ref() + .unwrap(); + + if top_k.asc { + (a_min.as_ref(), a_max.as_ref()).cmp(&(b_min.as_ref(), b_max.as_ref())) + } else { + (b_max.as_ref(), b_min.as_ref()).cmp(&(a_max.as_ref(), a_min.as_ref())) + } + }); + PartitionsShuffleKind::Seq + } else { + PartitionsShuffleKind::Mod + }; + + let partitions = partitions + .into_iter() + .map(|p| p.convert_to_part_info(all_have_minmax)) + .collect(); + Ok(( PartStatistics::new_estimated( read_rows, @@ -208,7 +273,7 @@ impl PartitionPruner { partitions_scanned, partitions_total, ), - Partitions::create(PartitionsShuffleKind::Mod, partitions), + Partitions::create(partition_kind, partitions), )) } } From eeccbe8746119ceda76ae78fcdf17f7ff4d8a5e9 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Wed, 1 Feb 2023 21:33:59 +0800 Subject: [PATCH 2/7] Split deserialize to another pipeline node. --- Cargo.lock | 2 + .../storages/hive/hive/src/hive_table.rs | 2 +- src/query/storages/parquet/Cargo.toml | 2 + .../parquet/src/deserialize_transform.rs | 322 ++++++++++++++++++ src/query/storages/parquet/src/lib.rs | 1 + .../parquet/src/parquet_reader/mod.rs | 2 + .../parquet/src/parquet_reader/reader.rs | 82 +++++ .../storages/parquet/src/parquet_source.rs | 181 +++++++++- .../parquet/src/parquet_table/read.rs | 204 +++++------ 9 files changed, 669 insertions(+), 129 deletions(-) create mode 100644 src/query/storages/parquet/src/deserialize_transform.rs diff --git a/Cargo.lock b/Cargo.lock index a9d06a3adcca..e2339d60d920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2275,6 +2275,7 @@ name = "common-storages-parquet" version = "0.1.0" dependencies = [ "async-trait-fn", + "backon", "chrono", "common-arrow", "common-base", @@ -2285,6 +2286,7 @@ dependencies = [ "common-meta-app", "common-meta-types", "common-pipeline-core", + "common-pipeline-sources", "common-sql", "common-storage", "futures", diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index a7dee2236674..6ee263686469 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -312,7 +312,7 @@ impl HiveTable { ) { (true, _) | (_, None) => { let projection = - PushDownInfo::projection_of_push_downs(&plan.schema(), &plan.push_downs); + PushDownInfo::projection_of_push_downs(&plan.table_schema(), &plan.push_downs); HiveBlockReader::create( self.dal.clone(), self.table_info.schema(), diff --git a/src/query/storages/parquet/Cargo.toml b/src/query/storages/parquet/Cargo.toml index 268e4b3e6dbf..5869230e796d 100644 --- a/src/query/storages/parquet/Cargo.toml +++ b/src/query/storages/parquet/Cargo.toml @@ -21,6 +21,7 @@ common-functions = { path = "../../functions" } common-meta-app = { path = "../../../meta/app" } common-meta-types = { path = "../../../meta/types" } common-pipeline-core = { path = "../../pipeline/core" } +common-pipeline-sources = { path = "../../pipeline/sources" } common-storage = { path = "../../../common/storage" } storages-common-index = { path = "../common/index" } @@ -28,6 +29,7 @@ storages-common-pruner = { path = "../common/pruner" } storages-common-table-meta = { path = "../common/table-meta" } async-trait = { version = "0.1.57", package = "async-trait-fn" } +backon = "0.2" chrono = { workspace = true } futures = "0.3.24" opendal = { workspace = true } diff --git a/src/query/storages/parquet/src/deserialize_transform.rs b/src/query/storages/parquet/src/deserialize_transform.rs new file mode 100644 index 000000000000..b855c1906c7e --- /dev/null +++ b/src/query/storages/parquet/src/deserialize_transform.rs @@ -0,0 +1,322 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use common_base::base::Progress; +use common_base::base::ProgressValues; +use common_catalog::plan::PartInfoPtr; +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; +use common_expression::DataBlock; +use common_expression::DataSchemaRef; +use common_expression::Evaluator; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_functions::scalars::BUILTIN_FUNCTIONS; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; + +use crate::parquet_part::ParquetRowGroupPart; +use crate::parquet_reader::IndexedReaders; +use crate::parquet_reader::ParquetReader; +use crate::parquet_source::ParquetSourceMeta; + +pub struct ParquetPrewhereInfo { + pub func_ctx: FunctionContext, + pub reader: Arc, + pub filter: Expr, +} + +pub struct ParquetDeserializeTransform { + // Used for pipeline operations + scan_progress: Arc, + input: Arc, + output: Arc, + output_data: Option, + + // data from input + parts: VecDeque, + data_readers: VecDeque, + + src_schema: DataSchemaRef, + output_schema: DataSchemaRef, + + // Used for prewhere reading and filtering + prewhere_info: Arc>, + + // Used for remain reading + remain_reader: Arc, + // Used for top k optimization + // top_k: Option<(TopK, TopKSorter, usize)>, + // top_k_finished: bool, +} + +impl ParquetDeserializeTransform { + pub fn create( + ctx: Arc, + input: Arc, + output: Arc, + src_schema: DataSchemaRef, + output_schema: DataSchemaRef, + prewhere_info: Arc>, + remain_reader: Arc, + ) -> Result { + let scan_progress = ctx.get_scan_progress(); + + // let top_k = plan + // .push_downs + // .as_ref() + // .map(|p| p.top_k(table_schema.as_ref(), RangeIndex::supported_type)) + // .unwrap_or_default(); + + // let top_k = top_k.map(|top_k| { + // let index = src_schema.index_of(top_k.order_by.name()).unwrap(); + // let sorter = TopKSorter::new(top_k.limit, top_k.asc); + + // if !prewhere_columns.contains(&index) { + // prewhere_columns.push(index); + // prewhere_columns.sort(); + // } + // (top_k, sorter, index) + // }); + + Ok(ProcessorPtr::create(Box::new( + ParquetDeserializeTransform { + scan_progress, + input, + output, + output_data: None, + + parts: VecDeque::new(), + data_readers: VecDeque::new(), + + src_schema, + output_schema, + + prewhere_info, + remain_reader, + }, + ))) + } + + fn add_block(&mut self, data_block: DataBlock) -> Result<()> { + let rows = data_block.num_rows(); + if rows == 0 { + return Ok(()); + } + let progress_values = ProgressValues { + rows, + bytes: data_block.memory_size(), + }; + self.scan_progress.incr(&progress_values); + self.output_data = Some(data_block); + Ok(()) + } + + // /// check topk should return finished or not + // fn check_topn(&mut self) { + // if let Some((_, sorter, _)) = &mut self.top_k { + // if let Some(next_part) = self.parts.front() { + // let next_part = next_part + // .as_any() + // .downcast_ref::() + // .unwrap(); + + // if let Some(sort_min_max) = &next_part.sort_min_max { + // self.top_k_finished = sorter.never_match(sort_min_max); + // } + // } + // } + // } +} + +impl Processor for ParquetDeserializeTransform { + fn name(&self) -> String { + String::from("ParquetDeserializeTransform") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(data_block) = self.output_data.take() { + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + + if !self.data_readers.is_empty() { + if !self.input.has_data() { + self.input.set_need_data(); + } + return Ok(Event::Sync); + } + + // if self.top_k_finished { + // self.input.finish(); + // self.output.finish(); + // return Ok(Event::Finished); + // } + + if self.input.has_data() { + let mut data_block = self.input.pull_data().unwrap()?; + let mut source_meta = data_block.take_meta().unwrap(); + let source_meta = source_meta + .as_mut_any() + .downcast_mut::() + .unwrap(); + + self.parts = VecDeque::from(std::mem::take(&mut source_meta.parts)); + + // self.check_topn(); + // if self.top_k_finished { + // self.input.finish(); + // self.output.finish(); + // return Ok(Event::Finished); + // } + self.data_readers = VecDeque::from(std::mem::take(&mut source_meta.readers)); + return Ok(Event::Sync); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(mut readers) = self.data_readers.pop_front() { + let part = self.parts.pop_front().unwrap(); + let part = ParquetRowGroupPart::from_part(&part)?; + + // this means it's empty projection + if readers.is_empty() { + let _ = self.data_readers.pop_front(); + let part = self.parts.pop_front().unwrap(); + + let part = ParquetRowGroupPart::from_part(&part)?; + let data_block = DataBlock::new(vec![], part.num_rows); + self.add_block(data_block)?; + return Ok(()); + } + + // Step 1: Check TOP_K, if prewhere_columns contains not only TOP_K, we can check if TOP_K column can satisfy the heap. + // if self.prewhere_columns.len() > 1 { + // if let Some((top_k, sorter, index)) = self.top_k.as_mut() { + // let chunk = chunks.get_mut(*index).unwrap(); + // let array = chunk.1.next_array()?; + // self.read_columns.push(*index); + + // let data_type = top_k.order_by.datatype().into(); + // let col = Column::from_arrow(array.a_s_ref(), &data_type); + + // arrays.push((chunk.0, array)); + // if sorter.never_match_any(&col) { + // return; + // } + // } + // } + + // Step 2: Read Prewhere columns and get the filter + + let data_block = match self.prewhere_info.as_ref() { + Some(ParquetPrewhereInfo { + func_ctx, + reader, + filter, + }) => { + let chunks = reader.read_from_readers(&mut readers)?; + let mut prewhere_block = reader.deserialize(part, chunks, None)?; + let evaluator = Evaluator::new(&prewhere_block, *func_ctx, &BUILTIN_FUNCTIONS); + let result = evaluator + .run(filter) + .map_err(|e| e.add_message("eval prewhere filter failed:"))?; + let filter = FilterHelpers::cast_to_nonull_boolean(&result).unwrap(); + + // Step 3: Apply the filter, if it's all filtered, we can skip the remain columns. + if FilterHelpers::is_all_unset(&filter) { + return Ok(()); + } + + // Step 4: Apply the filter to topk and update the bitmap, this will filter more results + // let filter = if let Some((_, sorter, index)) = &mut self.top_k { + // let index_prewhere = self + // .prewhere_columns + // .iter() + // .position(|x| x == index) + // .unwrap(); + // let top_k_column = prewhere_block + // .get_by_offset(index_prewhere) + // .value + // .as_column() + // .unwrap(); + + // let mut bitmap = + // FilterHelpers::filter_to_bitmap(filter, prewhere_block.num_rows()); + // sorter.push_column(top_k_column, &mut bitmap); + // Value::Column(bitmap.into()) + // } else { + // filter + // }; + + // if FilterHelpers::is_all_unset(&filter) { + // return Self::skip_chunks_page(&self.read_columns, chunks); + // } + + let chunks = self.remain_reader.read_from_readers(&mut readers)?; + + // TODO: the filter may be able to pushed down + let remain_block = self.remain_reader.deserialize(part, chunks, None)?; + + // Combine two blocks. + for col in remain_block.columns() { + prewhere_block.add_column(col.clone()); + } + + let block = remain_block.resort(&self.src_schema, &self.output_schema)?; + block.filter_boolean_value(filter) + } + None => { + let chunks = self.remain_reader.read_from_readers(&mut readers)?; + self.remain_reader.deserialize(part, chunks, None) + } + }?; + + // Step 5: Add the block to output data + self.add_block(data_block)?; + } + + Ok(()) + } +} diff --git a/src/query/storages/parquet/src/lib.rs b/src/query/storages/parquet/src/lib.rs index 0a4ca39d4187..488fe56a3f41 100644 --- a/src/query/storages/parquet/src/lib.rs +++ b/src/query/storages/parquet/src/lib.rs @@ -15,6 +15,7 @@ #![allow(clippy::uninlined_format_args)] #![deny(unused_crate_dependencies)] +mod deserialize_transform; mod parquet_part; mod parquet_reader; mod parquet_source; diff --git a/src/query/storages/parquet/src/parquet_reader/mod.rs b/src/query/storages/parquet/src/parquet_reader/mod.rs index 1f39e096e13d..6f0884220775 100644 --- a/src/query/storages/parquet/src/parquet_reader/mod.rs +++ b/src/query/storages/parquet/src/parquet_reader/mod.rs @@ -16,5 +16,7 @@ mod deserialize; mod filter; mod reader; +pub use reader::DataReader; pub use reader::IndexedChunk; +pub use reader::IndexedReaders; pub use reader::ParquetReader; diff --git a/src/query/storages/parquet/src/parquet_reader/reader.rs b/src/query/storages/parquet/src/parquet_reader/reader.rs index 98c652971810..7a7bdf132e57 100644 --- a/src/query/storages/parquet/src/parquet_reader/reader.rs +++ b/src/query/storages/parquet/src/parquet_reader/reader.rs @@ -21,7 +21,9 @@ use common_arrow::arrow::io::parquet::write::to_parquet_schema; use common_arrow::parquet::metadata::ColumnDescriptor; use common_arrow::schema_projection as ap; use common_base::base::tokio; +use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Projection; +use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataSchema; use common_expression::DataSchemaRef; @@ -31,7 +33,25 @@ use opendal::Operator; use crate::parquet_part::ParquetRowGroupPart; use crate::parquet_table::arrow_to_table_schema; +pub struct DataReader { + bytes: usize, + inner: Box, +} + +impl DataReader { + pub fn new(inner: Box, bytes: usize) -> Self { + Self { inner, bytes } + } + + pub fn read_to_end(&mut self) -> Result> { + let mut data = Vec::with_capacity(self.bytes); + self.inner.read_to_end(&mut data)?; + Ok(data) + } +} + pub type IndexedChunk = (usize, Vec); +pub type IndexedReaders = HashMap; /// The reader to parquet files with a projected schema. /// @@ -149,6 +169,68 @@ impl ParquetReader { )) } + pub fn read_from_readers(&self, readers: &mut IndexedReaders) -> Result> { + let mut chunks = Vec::with_capacity(self.columns_to_read.len()); + + for index in &self.columns_to_read { + let reader = readers.get_mut(index).unwrap(); + let data = reader.read_to_end()?; + + chunks.push((*index, data)); + } + + Ok(chunks) + } + + pub fn readers_from_blocking_io(&self, part: PartInfoPtr) -> Result { + let part = ParquetRowGroupPart::from_part(&part)?; + + let mut readers: HashMap = + HashMap::with_capacity(self.columns_to_read.len()); + + for index in &self.columns_to_read { + let obj = self.operator.object(&part.location); + let meta = &part.column_metas[index]; + let reader = obj.blocking_range_reader(meta.offset..meta.offset + meta.length)?; + readers.insert( + *index, + DataReader::new(Box::new(reader), meta.length as usize), + ); + } + Ok(readers) + } + + pub async fn readers_from_non_blocking_io(&self, part: PartInfoPtr) -> Result { + use backon::ExponentialBackoff; + use backon::Retryable; + + let part = ParquetRowGroupPart::from_part(&part)?; + + let mut join_handlers = Vec::with_capacity(self.columns_to_read.len()); + let obj = self.operator.object(&part.location); + + for index in self.columns_to_read.iter() { + let meta = &part.column_metas[index]; + let obj = obj.clone(); + let (offset, length) = (meta.offset, meta.length); + + join_handlers.push(async move { + let data = { || async { obj.range_read(offset..offset + length).await } } + .retry(ExponentialBackoff::default()) + .when(|err| err.is_temporary()) + .await?; + Ok::<_, ErrorCode>(( + *index, + DataReader::new(Box::new(std::io::Cursor::new(data)), length as usize), + )) + }); + } + + let res = futures::future::try_join_all(join_handlers).await?; + + Ok(res.into_iter().collect()) + } + /// Read columns data of one row group. pub fn sync_read_columns(&self, part: &ParquetRowGroupPart) -> Result> { let mut chunks = Vec::with_capacity(self.columns_to_read.len()); diff --git a/src/query/storages/parquet/src/parquet_source.rs b/src/query/storages/parquet/src/parquet_source.rs index 022068a8cce8..dbc96c89ecaa 100644 --- a/src/query/storages/parquet/src/parquet_source.rs +++ b/src/query/storages/parquet/src/parquet_source.rs @@ -13,11 +13,14 @@ // limitations under the License. use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; use std::sync::Arc; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::bitmap::MutableBitmap; use common_arrow::parquet::indexes::Interval; +use common_base::base::tokio; use common_base::base::Progress; use common_base::base::ProgressValues; use common_catalog::plan::ParquetReadOptions; @@ -27,6 +30,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::filter_helper::FilterHelpers; use common_expression::types::BooleanType; +use common_expression::BlockMetaInfo; use common_expression::DataBlock; use common_expression::DataSchemaRef; use common_expression::Evaluator; @@ -37,12 +41,181 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; +use common_pipeline_sources::processors::sources::SyncSource; +use common_pipeline_sources::processors::sources::SyncSourcer; +use serde::Deserializer; +use serde::Serializer; use crate::parquet_part::ParquetRowGroupPart; use crate::parquet_reader::IndexedChunk; +use crate::parquet_reader::IndexedReaders; use crate::parquet_reader::ParquetReader; use crate::parquet_source::State::Generated; +pub struct ParquetSourceMeta { + pub parts: Vec, + /// The readers' order is the same of column nodes of the source schema. + pub readers: Vec, +} + +impl Debug for ParquetSourceMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetSourceMeta") + .field("part", &self.parts) + .finish() + } +} + +impl serde::Serialize for ParquetSourceMeta { + fn serialize(&self, _: S) -> common_exception::Result + where S: Serializer { + unimplemented!("Unimplemented serialize ParquetSourceMeta") + } +} + +impl<'de> serde::Deserialize<'de> for ParquetSourceMeta { + fn deserialize(_: D) -> common_exception::Result + where D: Deserializer<'de> { + unimplemented!("Unimplemented deserialize ParquetSourceMeta") + } +} + +#[typetag::serde(name = "parquet_source")] +impl BlockMetaInfo for ParquetSourceMeta { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn clone_self(&self) -> Box { + unimplemented!("Unimplemented clone ParquetSourceMeta") + } + + fn equals(&self, _: &Box) -> bool { + unimplemented!("Unimplemented equals ParquetSourceMeta") + } +} + +pub struct SyncParquetSource { + ctx: Arc, + block_reader: Arc, +} + +impl SyncParquetSource { + pub fn create( + ctx: Arc, + output: Arc, + block_reader: Arc, + ) -> Result { + SyncSourcer::create(ctx.clone(), output, SyncParquetSource { ctx, block_reader }) + } +} + +impl SyncSource for SyncParquetSource { + const NAME: &'static str = "SyncParquetSource"; + + fn generate(&mut self) -> Result> { + match self.ctx.get_partition() { + None => Ok(None), + Some(part) => Ok(Some(DataBlock::empty_with_meta(Box::new( + ParquetSourceMeta { + parts: vec![part.clone()], + readers: vec![self.block_reader.readers_from_blocking_io(part)?], + }, + )))), + } + } +} + +pub struct AsyncParquetSource { + finished: bool, + batch_size: usize, + ctx: Arc, + block_reader: Arc, + + output: Arc, + output_data: Option<(Vec, Vec)>, +} + +impl AsyncParquetSource { + pub fn create( + ctx: Arc, + output: Arc, + block_reader: Arc, + ) -> Result { + let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; + Ok(ProcessorPtr::create(Box::new(AsyncParquetSource { + ctx, + output, + batch_size, + block_reader, + finished: false, + output_data: None, + }))) + } +} + +#[async_trait::async_trait] +impl Processor for AsyncParquetSource { + fn name(&self) -> String { + String::from("AsyncParquetSource") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.finished { + self.output.finish(); + return Ok(Event::Finished); + } + + if self.output.is_finished() { + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + if let Some((parts, readers)) = self.output_data.take() { + let output = DataBlock::empty_with_meta(Box::new(ParquetSourceMeta { parts, readers })); + self.output.push_data(Ok(output)); + } + + Ok(Event::Async) + } + + async fn async_process(&mut self) -> Result<()> { + let parts = self.ctx.get_partitions(self.batch_size); + + if !parts.is_empty() { + let mut readers = Vec::with_capacity(parts.len()); + for part in &parts { + let part = part.clone(); + let block_reader = self.block_reader.clone(); + + readers.push(async move { + let handler = tokio::spawn(async move { + block_reader.readers_from_non_blocking_io(part).await + }); + handler.await.unwrap() + }); + } + + self.output_data = Some((parts, futures::future::try_join_all(readers).await?)); + return Ok(()); + } + + self.finished = true; + Ok(()) + } +} + struct PrewhereData { data_block: DataBlock, filter: Value, @@ -63,7 +236,7 @@ enum State { Finish, } -pub struct ParquetSource { +pub struct OldParquetSource { state: State, progress_values: ProgressValues, @@ -87,7 +260,7 @@ pub struct ParquetSource { read_options: ParquetReadOptions, } -impl ParquetSource { +impl OldParquetSource { pub fn create( ctx: Arc, output: Arc, @@ -103,7 +276,7 @@ impl ParquetSource { ) -> Result { let scan_progress = ctx.get_scan_progress(); - Ok(ProcessorPtr::create(Box::new(ParquetSource { + Ok(ProcessorPtr::create(Box::new(OldParquetSource { ctx, output, scan_progress, @@ -290,7 +463,7 @@ impl ParquetSource { } #[async_trait::async_trait] -impl Processor for ParquetSource { +impl Processor for OldParquetSource { fn name(&self) -> String { "ParquetSource".to_string() } diff --git a/src/query/storages/parquet/src/parquet_table/read.rs b/src/query/storages/parquet/src/parquet_table/read.rs index abc6ac72b675..9e4de4b001f4 100644 --- a/src/query/storages/parquet/src/parquet_table/read.rs +++ b/src/query/storages/parquet/src/parquet_table/read.rs @@ -18,71 +18,39 @@ use common_catalog::plan::DataSourcePlan; use common_catalog::plan::Projection; use common_catalog::plan::PushDownInfo; use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; use common_exception::Result; use common_expression::ConstantFolder; use common_expression::DataSchema; +use common_expression::DataSchemaRefExt; use common_expression::Expr; use common_expression::FunctionContext; +use common_expression::RemoteExpr; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::Pipeline; use super::ParquetTable; +use crate::deserialize_transform::ParquetDeserializeTransform; +use crate::deserialize_transform::ParquetPrewhereInfo; use crate::parquet_reader::ParquetReader; -use crate::parquet_source::ParquetSource; +use crate::parquet_source::AsyncParquetSource; +use crate::parquet_source::SyncParquetSource; impl ParquetTable { pub fn create_reader(&self, projection: Projection) -> Result> { ParquetReader::create(self.operator.clone(), self.arrow_schema.clone(), projection) } - // Build the prewhere reader. - fn build_prewhere_reader(&self, plan: &DataSourcePlan) -> Result> { - match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { - None => { - let projection = - PushDownInfo::projection_of_push_downs(&plan.schema(), &plan.push_downs); - self.create_reader(projection) - } - Some(v) => self.create_reader(v.prewhere_columns), - } - } - - // Build the prewhere filter expression. - fn build_prewhere_filter_expr( - &self, + fn build_filter( ctx: FunctionContext, - plan: &DataSourcePlan, + filter: &RemoteExpr, schema: &DataSchema, - ) -> Result>> { - Ok( - match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { - None => Arc::new(None), - Some(v) => { - let expr = v - .filter - .as_expr(&BUILTIN_FUNCTIONS) - .project_column_ref(|name| schema.index_of(name).unwrap()); - let (expr, _) = ConstantFolder::fold(&expr, ctx, &BUILTIN_FUNCTIONS); - Arc::new(Some(expr)) - } - }, - ) - } - - // Build the remain reader. - fn build_remain_reader(&self, plan: &DataSourcePlan) -> Result>> { - Ok( - match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { - None => Arc::new(None), - Some(v) => { - if v.remain_columns.is_empty() { - Arc::new(None) - } else { - Arc::new(Some((*self.create_reader(v.remain_columns)?).clone())) - } - } - }, - ) + ) -> Expr { + let expr = filter + .as_expr(&BUILTIN_FUNCTIONS) + .project_column_ref(|name| schema.index_of(name).unwrap()); + let (expr, _) = ConstantFolder::fold(&expr, ctx, &BUILTIN_FUNCTIONS); + expr } #[inline] @@ -92,103 +60,91 @@ impl ParquetTable { plan: &DataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { - // If there is a `PrewhereInfo`, the final output should be `PrehwereInfo.output_columns`. - // `PrewhereInfo.output_columns` should be a subset of `PushDownInfo.projection`. - let output_projection = match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { - None => { - PushDownInfo::projection_of_push_downs(&self.table_info.schema(), &plan.push_downs) - } - Some(v) => v.output_columns, - }; + let source_projection = + PushDownInfo::projection_of_push_downs(&self.table_info.schema(), &plan.push_downs); + + // The front of the src_fields are prewhere columns (if exist). + // The back of the src_fields are remain columns. + let mut src_fields = Vec::with_capacity(source_projection.len()); + + // The schema of the data block `read_data` output. + let output_schema: Arc = Arc::new(plan.schema().into()); - let prewhere_reader = self.build_prewhere_reader(plan)?; - let prewhere_filter = self.build_prewhere_filter_expr( - ctx.get_function_context()?, - plan, - prewhere_reader.output_schema(), + // Build the reader for parquet source. + let source_reader = ParquetReader::create( + self.operator.clone(), + self.arrow_schema.clone(), + source_projection, )?; - let remain_reader = self.build_remain_reader(plan)?; - - // Build three kinds of schemas. - // The schemas are used for `DataBlock::resort` to remove columns that are not needed. - // Remove columns before `DataBlock::filter` can reduce memory copy. - // 1. The final output schema. - let output_schema = Arc::new(DataSchema::from( - &output_projection.project_schema(&plan.source_info.schema()), - )); - // 2. The schema after filter. Remove columns read by prewhere reader but will not be output. - let output_fields = output_schema.fields(); - let prewhere_schema = prewhere_reader.output_schema(); - let remain_fields = if let Some(reader) = remain_reader.as_ref() { - reader.output_schema().fields().clone() + + let push_down_prewhere = PushDownInfo::prewhere_of_push_downs(&plan.push_downs); + + // Build remain reader. + // If there is no prewhere filter, remain reader is the same as source reader (no prewhere phase, deserialize directly). + let remain_reader = if let Some(p) = &push_down_prewhere { + ParquetReader::create( + self.operator.clone(), + self.arrow_schema.clone(), + p.remain_columns.clone(), + )? } else { - vec![] + source_reader.clone() }; - let mut after_filter_fields = Vec::with_capacity(output_fields.len() - remain_fields.len()); - // Ensure the order of fields in `after_filter_fields` is the same as `output_fields`. - // It will reduce the resort times. - for field in output_fields { - if prewhere_schema.field_with_name(field.name()).is_ok() { - after_filter_fields.push(field.clone()); - } - } - // 3. The schema after add remain columns. - let mut after_remain_fields = after_filter_fields.clone(); - after_remain_fields.extend(remain_fields); - let after_filter_schema = Arc::new(DataSchema::new(after_filter_fields)); - let after_remain_schema = Arc::new(DataSchema::new(after_remain_fields)); + // Build prewhere info. + + let prewhere_info = Arc::new( + PushDownInfo::prewhere_of_push_downs(&plan.push_downs) + .map(|p| { + let reader = ParquetReader::create( + self.operator.clone(), + self.arrow_schema.clone(), + p.prewhere_columns, + )?; + src_fields.extend_from_slice(reader.output_schema.fields()); + let func_ctx = ctx.get_function_context()?; + let filter = Self::build_filter(func_ctx, &p.filter, &reader.output_schema); + Ok::<_, ErrorCode>(ParquetPrewhereInfo { + func_ctx, + reader, + filter, + }) + }) + .transpose()?, + ); + + src_fields.extend_from_slice(remain_reader.output_schema.fields()); + let src_schema = DataSchemaRefExt::create(src_fields); let max_threads = ctx.get_settings().get_max_threads()? as usize; // Add source pipe. if self.operator.metadata().can_blocking() { pipeline.add_source( - |output| { - ParquetSource::create( - ctx.clone(), - output, - ( - after_filter_schema.clone(), - after_remain_schema.clone(), - output_schema.clone(), - ), - prewhere_reader.clone(), - prewhere_filter.clone(), - remain_reader.clone(), - self.read_options, - ) - }, + |output| SyncParquetSource::create(ctx.clone(), output, source_reader.clone()), max_threads, - ) + )?; } else { let max_io_requests = std::cmp::max( max_threads, ctx.get_settings().get_max_storage_io_requests()? as usize, ); - pipeline.add_source( - |output| { - ParquetSource::create( - ctx.clone(), - output, - ( - after_filter_schema.clone(), - after_remain_schema.clone(), - output_schema.clone(), - ), - prewhere_reader.clone(), - prewhere_filter.clone(), - remain_reader.clone(), - self.read_options, - ) - }, + |output| AsyncParquetSource::create(ctx.clone(), output, source_reader.clone()), max_io_requests, )?; - - // Resize pipeline to max threads. - let resize_to = std::cmp::min(max_threads, max_io_requests); - pipeline.resize(resize_to) + pipeline.resize(std::cmp::min(max_threads, max_io_requests))?; } + pipeline.add_transform(|input, output| { + ParquetDeserializeTransform::create( + ctx.clone(), + input, + output, + src_schema.clone(), + output_schema.clone(), + prewhere_info.clone(), + remain_reader.clone(), + ) + }) } } From df3c79cdca596f63fc11da55db5bb1db180fba6e Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Thu, 2 Feb 2023 20:12:52 +0800 Subject: [PATCH 3/7] Push down filter to remain reader. --- .../parquet/src/deserialize_transform.rs | 89 ++++++++++++++++--- .../storages/parquet/src/parquet_part.rs | 3 +- .../parquet/src/parquet_reader/deserialize.rs | 2 +- .../parquet/src/parquet_reader/reader.rs | 4 +- 4 files changed, 79 insertions(+), 19 deletions(-) diff --git a/src/query/storages/parquet/src/deserialize_transform.rs b/src/query/storages/parquet/src/deserialize_transform.rs index b855c1906c7e..3f71fc749702 100644 --- a/src/query/storages/parquet/src/deserialize_transform.rs +++ b/src/query/storages/parquet/src/deserialize_transform.rs @@ -16,17 +16,24 @@ use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::bitmap::MutableBitmap; +use common_arrow::parquet::indexes::Interval; use common_base::base::Progress; use common_base::base::ProgressValues; use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::filter_helper::FilterHelpers; +use common_expression::types::DataType; +use common_expression::BlockEntry; use common_expression::DataBlock; use common_expression::DataSchemaRef; use common_expression::Evaluator; use common_expression::Expr; use common_expression::FunctionContext; +use common_expression::Scalar; +use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; @@ -219,6 +226,10 @@ impl Processor for ParquetDeserializeTransform { if let Some(mut readers) = self.data_readers.pop_front() { let part = self.parts.pop_front().unwrap(); let part = ParquetRowGroupPart::from_part(&part)?; + let row_selection = part + .row_selection + .as_ref() + .map(|sel| intervals_to_bitmap(sel, part.num_rows)); // this means it's empty projection if readers.is_empty() { @@ -257,7 +268,8 @@ impl Processor for ParquetDeserializeTransform { filter, }) => { let chunks = reader.read_from_readers(&mut readers)?; - let mut prewhere_block = reader.deserialize(part, chunks, None)?; + let mut prewhere_block = + reader.deserialize(part, chunks, row_selection.clone())?; let evaluator = Evaluator::new(&prewhere_block, *func_ctx, &BUILTIN_FUNCTIONS); let result = evaluator .run(filter) @@ -269,7 +281,18 @@ impl Processor for ParquetDeserializeTransform { return Ok(()); } - // Step 4: Apply the filter to topk and update the bitmap, this will filter more results + // Step 4: Remove columns that are not needed for output. Use dummy column to replce them. + let mut columns = prewhere_block.columns().to_vec(); + for (col, f) in columns.iter_mut().zip(reader.output_schema().fields()) { + if !self.output_schema.has_field(f.name()) { + *col = BlockEntry { + data_type: DataType::Null, + value: Value::Scalar(Scalar::Null), + }; + } + } + + // Step 5: Apply the filter to topk and update the bitmap, this will filter more results // let filter = if let Some((_, sorter, index)) = &mut self.top_k { // let index_prewhere = self // .prewhere_columns @@ -294,29 +317,67 @@ impl Processor for ParquetDeserializeTransform { // return Self::skip_chunks_page(&self.read_columns, chunks); // } + // Step 6: Read remain columns. let chunks = self.remain_reader.read_from_readers(&mut readers)?; - - // TODO: the filter may be able to pushed down - let remain_block = self.remain_reader.deserialize(part, chunks, None)?; - - // Combine two blocks. - for col in remain_block.columns() { - prewhere_block.add_column(col.clone()); + if row_selection.is_some() { + let remain_block = + self.remain_reader + .deserialize(part, chunks, row_selection)?; + + // Combine two blocks. + for col in remain_block.columns() { + prewhere_block.add_column(col.clone()); + } + + let block = prewhere_block.resort(&self.src_schema, &self.output_schema)?; + block.filter_boolean_value(filter) + } else { + // filter prewhere columns first. + let mut prewhere_block = + prewhere_block.filter_boolean_value(filter.clone())?; + // If row_selection is None, we can push down the prewhere filter to remain data deserialization. + let remain_block = match filter { + Value::Column(bitmap) => { + self.remain_reader.deserialize(part, chunks, Some(bitmap))? + } + _ => self.remain_reader.deserialize(part, chunks, None)?, // all true + }; + for col in remain_block.columns() { + prewhere_block.add_column(col.clone()); + } + + prewhere_block.resort(&self.src_schema, &self.output_schema) } - - let block = remain_block.resort(&self.src_schema, &self.output_schema)?; - block.filter_boolean_value(filter) } None => { let chunks = self.remain_reader.read_from_readers(&mut readers)?; - self.remain_reader.deserialize(part, chunks, None) + self.remain_reader.deserialize(part, chunks, row_selection) } }?; - // Step 5: Add the block to output data self.add_block(data_block)?; } Ok(()) } } + +/// Convert intervals to a bitmap. The `intervals` represents the row selection across `num_rows`. +fn intervals_to_bitmap(interval: &[Interval], num_rows: usize) -> Bitmap { + debug_assert!( + interval.is_empty() + || interval.last().unwrap().start + interval.last().unwrap().length < num_rows + ); + + let mut bitmap = MutableBitmap::with_capacity(num_rows); + let mut offset = 0; + + for intv in interval { + bitmap.extend_constant(intv.start - offset, false); + bitmap.extend_constant(intv.length, true); + offset = intv.start + intv.length; + } + bitmap.extend_constant(num_rows - offset, false); + + bitmap.into() +} diff --git a/src/query/storages/parquet/src/parquet_part.rs b/src/query/storages/parquet/src/parquet_part.rs index 55c1ac92c06d..b3e79c72e235 100644 --- a/src/query/storages/parquet/src/parquet_part.rs +++ b/src/query/storages/parquet/src/parquet_part.rs @@ -23,7 +23,6 @@ use common_arrow::parquet::compression::Compression; use common_arrow::parquet::indexes::Interval; use common_catalog::plan::PartInfo; use common_catalog::plan::PartInfoPtr; -use common_catalog::table::ColumnId; use common_exception::ErrorCode; use common_exception::Result; use common_expression::Scalar; @@ -40,7 +39,7 @@ pub struct ColumnMeta { pub struct ParquetRowGroupPart { pub location: String, pub num_rows: usize, - pub column_metas: HashMap, + pub column_metas: HashMap, pub row_selection: Option>, /// If all row group parts have min/max stats. This is used for topk push down optimization. /// diff --git a/src/query/storages/parquet/src/parquet_reader/deserialize.rs b/src/query/storages/parquet/src/parquet_reader/deserialize.rs index 94a2a6bfdcba..a6f9ef45aed6 100644 --- a/src/query/storages/parquet/src/parquet_reader/deserialize.rs +++ b/src/query/storages/parquet/src/parquet_reader/deserialize.rs @@ -65,7 +65,7 @@ impl ParquetReader { let mut chunks = Vec::with_capacity(indices.len()); for index in indices { // in `read_parquet` function, there is no `TableSchema`, so index treated as column id - let column_meta = &part.column_metas[&(*index as u32)]; + let column_meta = &part.column_metas[index]; let cnt = cnt_map.get_mut(index).unwrap(); *cnt -= 1; let column_chunk = if cnt > &mut 0 { diff --git a/src/query/storages/parquet/src/parquet_reader/reader.rs b/src/query/storages/parquet/src/parquet_reader/reader.rs index 7a7bdf132e57..0128169c8e2b 100644 --- a/src/query/storages/parquet/src/parquet_reader/reader.rs +++ b/src/query/storages/parquet/src/parquet_reader/reader.rs @@ -238,7 +238,7 @@ impl ParquetReader { for index in &self.columns_to_read { let obj = self.operator.object(&part.location); // in `read_parquet` function, there is no `TableSchema`, so index treated as column id - let meta = &part.column_metas[&(*index as u32)]; + let meta = &part.column_metas[index]; let chunk = obj.blocking_range_read(meta.offset..meta.offset + meta.length)?; chunks.push((*index, chunk)); @@ -253,7 +253,7 @@ impl ParquetReader { for &index in &self.columns_to_read { // in `read_parquet` function, there is no `TableSchema`, so index treated as column id - let meta = &part.column_metas[&(index as u32)]; + let meta = &part.column_metas[&index]; let obj = self.operator.object(&part.location); let range = meta.offset..meta.offset + meta.length; chunks.push(async move { From 695f3b24c6a206592c25ec19d9a23ba3eedac38c Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Thu, 2 Feb 2023 20:38:47 +0800 Subject: [PATCH 4/7] Remove old parquet source. --- .../storages/hive/hive/src/hive_table.rs | 2 +- .../parquet/src/deserialize_transform.rs | 2 +- .../parquet/src/parquet_reader/reader.rs | 46 -- .../storages/parquet/src/parquet_source.rs | 415 ------------------ 4 files changed, 2 insertions(+), 463 deletions(-) diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index 6ee263686469..a7dee2236674 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -312,7 +312,7 @@ impl HiveTable { ) { (true, _) | (_, None) => { let projection = - PushDownInfo::projection_of_push_downs(&plan.table_schema(), &plan.push_downs); + PushDownInfo::projection_of_push_downs(&plan.schema(), &plan.push_downs); HiveBlockReader::create( self.dal.clone(), self.table_info.schema(), diff --git a/src/query/storages/parquet/src/deserialize_transform.rs b/src/query/storages/parquet/src/deserialize_transform.rs index 3f71fc749702..4c3c4fab5901 100644 --- a/src/query/storages/parquet/src/deserialize_transform.rs +++ b/src/query/storages/parquet/src/deserialize_transform.rs @@ -283,7 +283,7 @@ impl Processor for ParquetDeserializeTransform { // Step 4: Remove columns that are not needed for output. Use dummy column to replce them. let mut columns = prewhere_block.columns().to_vec(); - for (col, f) in columns.iter_mut().zip(reader.output_schema().fields()) { + for (col, f) in columns.iter_mut().zip(reader.output_schema.fields()) { if !self.output_schema.has_field(f.name()) { *col = BlockEntry { data_type: DataType::Null, diff --git a/src/query/storages/parquet/src/parquet_reader/reader.rs b/src/query/storages/parquet/src/parquet_reader/reader.rs index 0128169c8e2b..844c148e68e6 100644 --- a/src/query/storages/parquet/src/parquet_reader/reader.rs +++ b/src/query/storages/parquet/src/parquet_reader/reader.rs @@ -20,7 +20,6 @@ use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::write::to_parquet_schema; use common_arrow::parquet::metadata::ColumnDescriptor; use common_arrow::schema_projection as ap; -use common_base::base::tokio; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Projection; use common_exception::ErrorCode; @@ -117,10 +116,6 @@ impl ParquetReader { })) } - pub fn output_schema(&self) -> &DataSchema { - &self.output_schema - } - /// Project the schema and get the needed column leaves. #[allow(clippy::type_complexity)] pub fn do_projection( @@ -230,45 +225,4 @@ impl ParquetReader { Ok(res.into_iter().collect()) } - - /// Read columns data of one row group. - pub fn sync_read_columns(&self, part: &ParquetRowGroupPart) -> Result> { - let mut chunks = Vec::with_capacity(self.columns_to_read.len()); - - for index in &self.columns_to_read { - let obj = self.operator.object(&part.location); - // in `read_parquet` function, there is no `TableSchema`, so index treated as column id - let meta = &part.column_metas[index]; - let chunk = obj.blocking_range_read(meta.offset..meta.offset + meta.length)?; - - chunks.push((*index, chunk)); - } - - Ok(chunks) - } - - /// Read columns data of one row group (but async). - pub async fn read_columns(&self, part: &ParquetRowGroupPart) -> Result> { - let mut chunks = Vec::with_capacity(self.columns_to_read.len()); - - for &index in &self.columns_to_read { - // in `read_parquet` function, there is no `TableSchema`, so index treated as column id - let meta = &part.column_metas[&index]; - let obj = self.operator.object(&part.location); - let range = meta.offset..meta.offset + meta.length; - chunks.push(async move { - tokio::spawn(async move { obj.range_read(range).await.map(|chunk| (index, chunk)) }) - .await - .unwrap() - }); - } - - let chunks = futures::future::try_join_all(chunks).await?; - Ok(chunks) - } - - #[inline] - pub fn support_blocking(&self) -> bool { - self.operator.metadata().can_blocking() - } } diff --git a/src/query/storages/parquet/src/parquet_source.rs b/src/query/storages/parquet/src/parquet_source.rs index dbc96c89ecaa..8e9d496bd5d9 100644 --- a/src/query/storages/parquet/src/parquet_source.rs +++ b/src/query/storages/parquet/src/parquet_source.rs @@ -17,26 +17,12 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use common_arrow::arrow::bitmap::Bitmap; -use common_arrow::arrow::bitmap::MutableBitmap; -use common_arrow::parquet::indexes::Interval; use common_base::base::tokio; -use common_base::base::Progress; -use common_base::base::ProgressValues; -use common_catalog::plan::ParquetReadOptions; use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; use common_exception::Result; -use common_expression::filter_helper::FilterHelpers; -use common_expression::types::BooleanType; use common_expression::BlockMetaInfo; use common_expression::DataBlock; -use common_expression::DataSchemaRef; -use common_expression::Evaluator; -use common_expression::Expr; -use common_expression::Value; -use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; @@ -46,11 +32,8 @@ use common_pipeline_sources::processors::sources::SyncSourcer; use serde::Deserializer; use serde::Serializer; -use crate::parquet_part::ParquetRowGroupPart; -use crate::parquet_reader::IndexedChunk; use crate::parquet_reader::IndexedReaders; use crate::parquet_reader::ParquetReader; -use crate::parquet_source::State::Generated; pub struct ParquetSourceMeta { pub parts: Vec, @@ -215,401 +198,3 @@ impl Processor for AsyncParquetSource { Ok(()) } } - -struct PrewhereData { - data_block: DataBlock, - filter: Value, -} - -/// The states for [`ParquetSource`]. The states will recycle for each row group of a parquet file. -enum State { - ReadDataPrewhere(Option), - ReadDataRemain(PartInfoPtr, PrewhereData, Option), - PrewhereFilter(PartInfoPtr, Vec, Option), - Deserialize( - PartInfoPtr, - Vec, - Option, - Option, - ), - Generated(Option, DataBlock), - Finish, -} - -pub struct OldParquetSource { - state: State, - progress_values: ProgressValues, - - ctx: Arc, - scan_progress: Arc, - output: Arc, - - // The schemas are used for `DataBlock::resort` to remove columns that are not needed. - // Remove columns before `DataBlock::filter` can reduce memory copy. - /// The schema after prewhere filter. (Remove columns not output) - after_filter_schema: DataSchemaRef, - /// The schema after add remain columns. - after_remain_schema: DataSchemaRef, - /// The final output schema - output_schema: DataSchemaRef, - - prewhere_reader: Arc, - prewhere_filter: Arc>, - remain_reader: Arc>, - - read_options: ParquetReadOptions, -} - -impl OldParquetSource { - pub fn create( - ctx: Arc, - output: Arc, - (after_filter_schema, after_remain_schema, output_schema): ( - DataSchemaRef, - DataSchemaRef, - DataSchemaRef, - ), - prewhere_reader: Arc, - prewhere_filter: Arc>, - remain_reader: Arc>, - read_options: ParquetReadOptions, - ) -> Result { - let scan_progress = ctx.get_scan_progress(); - - Ok(ProcessorPtr::create(Box::new(OldParquetSource { - ctx, - output, - scan_progress, - progress_values: ProgressValues::default(), - state: State::ReadDataPrewhere(None), - after_filter_schema, - after_remain_schema, - output_schema, - prewhere_reader, - prewhere_filter, - remain_reader, - read_options, - }))) - } - - fn do_prewhere_filter( - &mut self, - part: PartInfoPtr, - raw_chunks: Vec, - row_selection: Option, - ) -> Result<()> { - let rg_part = ParquetRowGroupPart::from_part(&part)?; - // deserialize prewhere data block first - let data_block = if let Some(row_selection) = &row_selection { - self.prewhere_reader - .deserialize(rg_part, raw_chunks, Some(row_selection.clone()))? - } else { - self.prewhere_reader - .deserialize(rg_part, raw_chunks, None)? - }; - - self.progress_values.rows = data_block.num_rows(); - self.progress_values.bytes = data_block.memory_size(); - - if let Some(filter) = self.prewhere_filter.as_ref() { - // do filter - let func_ctx = self.ctx.get_function_context()?; - let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); - - let res = evaluator - .run(filter) - .map_err(|e| e.add_message("eval prewhere filter failed:"))?; - let filter = FilterHelpers::cast_to_nonull_boolean(&res).ok_or_else(|| { - ErrorCode::BadArguments( - "Result of filter expression cannot be converted to boolean.", - ) - })?; - - let all_filtered = match &filter { - Value::Scalar(v) => !v, - Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), - }; - - if all_filtered { - // shortcut: - // all rows in this block are filtered out - // turn to begin the next state cycle. - // Generate a empty block. - self.state = Generated( - self.ctx.get_partition(), - DataBlock::empty_with_schema(self.output_schema.clone()), - ); - return Ok(()); - } - - let block_removed_columns = data_block.resort( - &self.prewhere_reader.output_schema, - &self.after_filter_schema, - )?; - - let filtered_block = match &filter { - Value::Scalar(_) => block_removed_columns, - Value::Column(bitmap) => { - DataBlock::filter_with_bitmap(block_removed_columns, bitmap)? - } - }; - - if self.remain_reader.is_none() { - // shortcut, we don't need to read remain data - self.state = Generated(self.ctx.get_partition(), filtered_block); - } else { - self.state = State::ReadDataRemain( - part, - PrewhereData { - data_block: filtered_block, - filter, - }, - row_selection, - ); - } - Ok(()) - } else { - Err(ErrorCode::Internal( - "It's a bug. No need to do prewhere filter", - )) - } - } - - fn do_deserialize( - &mut self, - part: PartInfoPtr, - raw_chunks: Vec, - prewhere_data: Option, - row_selection: Option, - ) -> Result<()> { - let rg_part = ParquetRowGroupPart::from_part(&part)?; - let output_block = if let Some(PrewhereData { - data_block: mut prewhere_block, - filter, - }) = prewhere_data - { - let block = if raw_chunks.is_empty() { - prewhere_block - } else if let Some(remain_reader) = self.remain_reader.as_ref() { - // If reach in this branch, it means `read_options.do_prewhere = true` - let remain_block = match filter { - Value::Scalar(_) => { - // The case of all filtered is already covered in `do_prewhere_filter`. - // don't need filter - let block = remain_reader.deserialize(rg_part, raw_chunks, None)?; - - self.progress_values.bytes += block.memory_size(); - - block - } - Value::Column(bitmap) => { - if !self.read_options.push_down_bitmap() || bitmap.unset_bits() == 0 { - let block = if let Some(row_selection) = &row_selection { - remain_reader.deserialize( - rg_part, - raw_chunks, - Some(row_selection.clone()), - )? - } else { - remain_reader.deserialize(rg_part, raw_chunks, None)? - }; - - self.progress_values.bytes += block.memory_size(); - - DataBlock::filter_with_bitmap(block, &bitmap)? - } else { - let block = - remain_reader.deserialize(rg_part, raw_chunks, Some(bitmap))?; - - self.progress_values.bytes += block.memory_size(); - - block - } - } - }; - - assert_eq!( - prewhere_block.num_rows(), - remain_block.num_rows(), - "prewhere and remain blocks should have same row number. (prewhere: {}, remain: {})", - prewhere_block.num_rows(), - remain_block.num_rows() - ); - - // Combine two blocks. - for col in remain_block.columns() { - prewhere_block.add_column(col.clone()); - } - prewhere_block - } else { - return Err(ErrorCode::Internal("It's a bug. Need remain reader")); - }; - block.resort(&self.after_remain_schema, &self.output_schema)? - } else { - // There is only prewhere reader. - assert!(self.remain_reader.is_none()); - let block = self - .prewhere_reader - .deserialize(rg_part, raw_chunks, None)?; - - self.progress_values.rows = block.num_rows(); - self.progress_values.bytes = block.memory_size(); - - block.resort(&self.prewhere_reader.output_schema, &self.output_schema)? - }; - self.state = State::Generated(self.ctx.get_partition(), output_block); - Ok(()) - } -} - -#[async_trait::async_trait] -impl Processor for OldParquetSource { - fn name(&self) -> String { - "ParquetSource".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if matches!(self.state, State::ReadDataPrewhere(None)) { - self.state = match self.ctx.get_partition() { - None => State::Finish, - Some(part) => State::ReadDataPrewhere(Some(part)), - } - } - - if matches!(self.state, State::Finish) { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if matches!(self.state, State::Generated(_, _)) { - if let Generated(part, data_block) = std::mem::replace(&mut self.state, State::Finish) { - if let Some(part) = part { - self.state = State::ReadDataPrewhere(Some(part)); - } - self.scan_progress.incr(&self.progress_values); - self.progress_values = ProgressValues::default(); - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } - } - - match self.state { - State::Finish => Ok(Event::Finished), - State::PrewhereFilter(_, _, _) | State::Deserialize(_, _, _, _) => Ok(Event::Sync), - State::ReadDataPrewhere(_) | State::ReadDataRemain(_, _, _) => { - if self.prewhere_reader.support_blocking() { - Ok(Event::Sync) - } else { - Ok(Event::Async) - } - } - State::Generated(_, _) => Err(ErrorCode::Internal("It's a bug.")), - } - } - - fn process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::ReadDataPrewhere(Some(part)) => { - let rg_part = ParquetRowGroupPart::from_part(&part)?; - let row_selection = rg_part - .row_selection - .as_ref() - .map(|sel| intervals_to_bitmap(sel, rg_part.num_rows)); - let chunks = self.prewhere_reader.sync_read_columns(rg_part)?; - if self.prewhere_filter.is_some() { - self.state = State::PrewhereFilter(part, chunks, row_selection); - } else { - // If there is no prewhere filter, it means there is only the prewhere reader. - assert!(self.remain_reader.is_none()); - // So all the needed columns are read. - self.state = State::Deserialize(part, chunks, None, row_selection) - } - Ok(()) - } - State::ReadDataRemain(part, prewhere_data, row_selection) => { - if let Some(remain_reader) = self.remain_reader.as_ref() { - let rg_part = ParquetRowGroupPart::from_part(&part)?; - let chunks = remain_reader.sync_read_columns(rg_part)?; - self.state = - State::Deserialize(part, chunks, Some(prewhere_data), row_selection); - Ok(()) - } else { - Err(ErrorCode::Internal("It's a bug. No remain reader")) - } - } - State::PrewhereFilter(part, chunks, row_selection) => { - self.do_prewhere_filter(part, chunks, row_selection) - } - State::Deserialize(part, chunks, prewhere_data, row_selection) => { - self.do_deserialize(part, chunks, prewhere_data, row_selection) - } - _ => Err(ErrorCode::Internal("It's a bug.")), - } - } - - async fn async_process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::ReadDataPrewhere(Some(part)) => { - let rg_part = ParquetRowGroupPart::from_part(&part)?; - let row_selection = rg_part - .row_selection - .as_ref() - .map(|sel| intervals_to_bitmap(sel, rg_part.num_rows)); - let chunks = self.prewhere_reader.read_columns(rg_part).await?; - if self.prewhere_filter.is_some() { - self.state = State::PrewhereFilter(part, chunks, row_selection); - } else { - // If there is no prewhere filter, it means there is only the prewhere reader. - assert!(self.remain_reader.is_none()); - // So all the needed columns are read. - self.state = State::Deserialize(part, chunks, None, row_selection) - } - Ok(()) - } - State::ReadDataRemain(part, prewhere_data, row_selection) => { - if let Some(remain_reader) = self.remain_reader.as_ref() { - let rg_part = ParquetRowGroupPart::from_part(&part)?; - let chunks = remain_reader.read_columns(rg_part).await?; - self.state = - State::Deserialize(part, chunks, Some(prewhere_data), row_selection); - Ok(()) - } else { - Err(ErrorCode::Internal("It's a bug. No remain reader")) - } - } - _ => Err(ErrorCode::Internal("It's a bug.")), - } - } -} - -/// Convert intervals to a bitmap. The `intervals` represents the row selection across `num_rows`. -fn intervals_to_bitmap(interval: &[Interval], num_rows: usize) -> Bitmap { - debug_assert!( - interval.is_empty() - || interval.last().unwrap().start + interval.last().unwrap().length < num_rows - ); - - let mut bitmap = MutableBitmap::with_capacity(num_rows); - let mut offset = 0; - - for intv in interval { - bitmap.extend_constant(intv.start - offset, false); - bitmap.extend_constant(intv.length, true); - offset = intv.start + intv.length; - } - bitmap.extend_constant(num_rows - offset, false); - - bitmap.into() -} From f148f26b0976cf976c8ae74cfd67610212fbe4c9 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Thu, 2 Feb 2023 22:07:29 +0800 Subject: [PATCH 5/7] Fix bug. --- .../parquet/src/deserialize_transform.rs | 4 ---- .../parquet/src/parquet_reader/reader.rs | 22 +++++++++++++++---- .../storages/parquet/src/parquet_source.rs | 4 ++-- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/query/storages/parquet/src/deserialize_transform.rs b/src/query/storages/parquet/src/deserialize_transform.rs index 4c3c4fab5901..89130e3a612a 100644 --- a/src/query/storages/parquet/src/deserialize_transform.rs +++ b/src/query/storages/parquet/src/deserialize_transform.rs @@ -233,10 +233,6 @@ impl Processor for ParquetDeserializeTransform { // this means it's empty projection if readers.is_empty() { - let _ = self.data_readers.pop_front(); - let part = self.parts.pop_front().unwrap(); - - let part = ParquetRowGroupPart::from_part(&part)?; let data_block = DataBlock::new(vec![], part.num_rows); self.add_block(data_block)?; return Ok(()); diff --git a/src/query/storages/parquet/src/parquet_reader/reader.rs b/src/query/storages/parquet/src/parquet_reader/reader.rs index 844c148e68e6..598a7b680823 100644 --- a/src/query/storages/parquet/src/parquet_reader/reader.rs +++ b/src/query/storages/parquet/src/parquet_reader/reader.rs @@ -32,18 +32,32 @@ use opendal::Operator; use crate::parquet_part::ParquetRowGroupPart; use crate::parquet_table::arrow_to_table_schema; +pub trait SeekRead: std::io::Read + std::io::Seek {} + +impl SeekRead for T where T: std::io::Read + std::io::Seek {} + pub struct DataReader { bytes: usize, - inner: Box, + inner: Box, } impl DataReader { - pub fn new(inner: Box, bytes: usize) -> Self { + pub fn new(inner: Box, bytes: usize) -> Self { Self { inner, bytes } } - pub fn read_to_end(&mut self) -> Result> { + pub fn read_all(&mut self) -> Result> { let mut data = Vec::with_capacity(self.bytes); + // `DataReader` might be reused if there is nested-type data, example: + // Table: t Tuple(a int, b int); + // Query: select t from table where t:a > 1; + // The query will create two readers: Reader(a), Reader(b). + // Prewhere phase: Reader(a).read_all(); + // Remain phase: Reader(a).read_all(); Reader(b).read_all(); + // If we don't seek to the start of the reader, the second read_all will read nothing. + self.inner.rewind()?; + // TODO(1): don't seek and read, but reuse the data (reduce IO). + // TODO(2): for nested types, merge sub columns into one column (reduce deserialization). self.inner.read_to_end(&mut data)?; Ok(data) } @@ -169,7 +183,7 @@ impl ParquetReader { for index in &self.columns_to_read { let reader = readers.get_mut(index).unwrap(); - let data = reader.read_to_end()?; + let data = reader.read_all()?; chunks.push((*index, data)); } diff --git a/src/query/storages/parquet/src/parquet_source.rs b/src/query/storages/parquet/src/parquet_source.rs index 8e9d496bd5d9..859c7e0da8ad 100644 --- a/src/query/storages/parquet/src/parquet_source.rs +++ b/src/query/storages/parquet/src/parquet_source.rs @@ -27,8 +27,8 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; -use common_pipeline_sources::processors::sources::SyncSource; -use common_pipeline_sources::processors::sources::SyncSourcer; +use common_pipeline_sources::SyncSource; +use common_pipeline_sources::SyncSourcer; use serde::Deserializer; use serde::Serializer; From 4f4288bb6d38e27850baac7275a415567a094bd0 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Fri, 3 Feb 2023 21:09:49 +0800 Subject: [PATCH 6/7] Top k optimiztion. --- src/query/catalog/src/plan/projection.rs | 28 +++ src/query/catalog/src/plan/pushdown.rs | 1 + src/query/expression/src/kernels/topk.rs | 7 + .../parquet/src/deserialize_transform.rs | 161 ++++++++---------- .../storages/parquet/src/parquet_part.rs | 9 +- .../parquet/src/parquet_table/read.rs | 77 ++++++--- src/query/storages/parquet/src/pruning.rs | 12 +- 7 files changed, 175 insertions(+), 120 deletions(-) diff --git a/src/query/catalog/src/plan/projection.rs b/src/query/catalog/src/plan/projection.rs index a9a47536a838..f2ad806f3526 100644 --- a/src/query/catalog/src/plan/projection.rs +++ b/src/query/catalog/src/plan/projection.rs @@ -74,6 +74,34 @@ impl Projection { }; Ok(column_nodes) } + + pub fn add_col(&mut self, col: usize) { + match self { + Projection::Columns(indices) => { + if indices.contains(&col) { + return; + } + indices.push(col); + indices.sort(); + } + Projection::InnerColumns(path_indices) => { + path_indices.entry(col).or_insert(vec![col]); + } + } + } + + pub fn remove_col(&mut self, col: usize) { + match self { + Projection::Columns(indices) => { + if let Some(pos) = indices.iter().position(|x| *x == col) { + indices.remove(pos); + } + } + Projection::InnerColumns(path_indices) => { + path_indices.remove(&col); + } + } + } } impl core::fmt::Debug for Projection { diff --git a/src/query/catalog/src/plan/pushdown.rs b/src/query/catalog/src/plan/pushdown.rs index fd4f5511d9c4..9249aefe2c90 100644 --- a/src/query/catalog/src/plan/pushdown.rs +++ b/src/query/catalog/src/plan/pushdown.rs @@ -72,6 +72,7 @@ impl PushDownInfo { } if let RemoteExpr::::ColumnRef { id, .. } = &order.0 { + // TODO: support sub column of nested type. let field = schema.field_with_name(id).unwrap(); let data_type: DataType = field.data_type().into(); if !support(&data_type) { diff --git a/src/query/expression/src/kernels/topk.rs b/src/query/expression/src/kernels/topk.rs index 74f32160e1e8..6c0b0782fd61 100644 --- a/src/query/expression/src/kernels/topk.rs +++ b/src/query/expression/src/kernels/topk.rs @@ -24,6 +24,7 @@ use crate::with_number_mapped_type; use crate::Column; use crate::Scalar; +#[derive(Clone)] pub struct TopKSorter { data: Vec, limit: usize, @@ -147,6 +148,12 @@ impl TopKSorter { fn make_heap(v: &mut [T], is_less: &mut F) where F: FnMut(&T, &T) -> bool { let len = v.len(); + + if len < 2 { + // no need to adjust heap + return; + } + let mut parent = (len - 2) / 2; loop { diff --git a/src/query/storages/parquet/src/deserialize_transform.rs b/src/query/storages/parquet/src/deserialize_transform.rs index 89130e3a612a..8c6df927c6a9 100644 --- a/src/query/storages/parquet/src/deserialize_transform.rs +++ b/src/query/storages/parquet/src/deserialize_transform.rs @@ -33,6 +33,7 @@ use common_expression::Evaluator; use common_expression::Expr; use common_expression::FunctionContext; use common_expression::Scalar; +use common_expression::TopKSorter; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::InputPort; @@ -46,10 +47,12 @@ use crate::parquet_reader::IndexedReaders; use crate::parquet_reader::ParquetReader; use crate::parquet_source::ParquetSourceMeta; +#[derive(Clone)] pub struct ParquetPrewhereInfo { pub func_ctx: FunctionContext, pub reader: Arc, pub filter: Expr, + pub top_k: Option<(usize, TopKSorter)>, /* the usize is the index of the column in ParquetReader.schema */ } pub struct ParquetDeserializeTransform { @@ -67,13 +70,12 @@ pub struct ParquetDeserializeTransform { output_schema: DataSchemaRef, // Used for prewhere reading and filtering - prewhere_info: Arc>, + prewhere_info: Option, // Used for remain reading remain_reader: Arc, // Used for top k optimization - // top_k: Option<(TopK, TopKSorter, usize)>, - // top_k_finished: bool, + top_k_finished: bool, } impl ParquetDeserializeTransform { @@ -83,28 +85,11 @@ impl ParquetDeserializeTransform { output: Arc, src_schema: DataSchemaRef, output_schema: DataSchemaRef, - prewhere_info: Arc>, + prewhere_info: Option, remain_reader: Arc, ) -> Result { let scan_progress = ctx.get_scan_progress(); - // let top_k = plan - // .push_downs - // .as_ref() - // .map(|p| p.top_k(table_schema.as_ref(), RangeIndex::supported_type)) - // .unwrap_or_default(); - - // let top_k = top_k.map(|top_k| { - // let index = src_schema.index_of(top_k.order_by.name()).unwrap(); - // let sorter = TopKSorter::new(top_k.limit, top_k.asc); - - // if !prewhere_columns.contains(&index) { - // prewhere_columns.push(index); - // prewhere_columns.sort(); - // } - // (top_k, sorter, index) - // }); - Ok(ProcessorPtr::create(Box::new( ParquetDeserializeTransform { scan_progress, @@ -120,6 +105,8 @@ impl ParquetDeserializeTransform { prewhere_info, remain_reader, + + top_k_finished: false, }, ))) } @@ -138,21 +125,25 @@ impl ParquetDeserializeTransform { Ok(()) } - // /// check topk should return finished or not - // fn check_topn(&mut self) { - // if let Some((_, sorter, _)) = &mut self.top_k { - // if let Some(next_part) = self.parts.front() { - // let next_part = next_part - // .as_any() - // .downcast_ref::() - // .unwrap(); - - // if let Some(sort_min_max) = &next_part.sort_min_max { - // self.top_k_finished = sorter.never_match(sort_min_max); - // } - // } - // } - // } + /// check topk should return finished or not + fn check_topn(&mut self) { + if let Some(ParquetPrewhereInfo { + top_k: Some((_, sorter)), + .. + }) = &mut self.prewhere_info.as_mut() + { + if let Some(next_part) = self.parts.front() { + let next_part = next_part + .as_any() + .downcast_ref::() + .unwrap(); + + if let Some(sort_min_max) = &next_part.sort_min_max { + self.top_k_finished = sorter.never_match(sort_min_max); + } + } + } + } } impl Processor for ParquetDeserializeTransform { @@ -187,11 +178,11 @@ impl Processor for ParquetDeserializeTransform { return Ok(Event::Sync); } - // if self.top_k_finished { - // self.input.finish(); - // self.output.finish(); - // return Ok(Event::Finished); - // } + if self.top_k_finished { + self.input.finish(); + self.output.finish(); + return Ok(Event::Finished); + } if self.input.has_data() { let mut data_block = self.input.pull_data().unwrap()?; @@ -203,12 +194,13 @@ impl Processor for ParquetDeserializeTransform { self.parts = VecDeque::from(std::mem::take(&mut source_meta.parts)); - // self.check_topn(); - // if self.top_k_finished { - // self.input.finish(); - // self.output.finish(); - // return Ok(Event::Finished); - // } + self.check_topn(); + if self.top_k_finished { + self.input.finish(); + self.output.finish(); + return Ok(Event::Finished); + } + self.data_readers = VecDeque::from(std::mem::take(&mut source_meta.readers)); return Ok(Event::Sync); } @@ -238,34 +230,29 @@ impl Processor for ParquetDeserializeTransform { return Ok(()); } - // Step 1: Check TOP_K, if prewhere_columns contains not only TOP_K, we can check if TOP_K column can satisfy the heap. - // if self.prewhere_columns.len() > 1 { - // if let Some((top_k, sorter, index)) = self.top_k.as_mut() { - // let chunk = chunks.get_mut(*index).unwrap(); - // let array = chunk.1.next_array()?; - // self.read_columns.push(*index); - - // let data_type = top_k.order_by.datatype().into(); - // let col = Column::from_arrow(array.a_s_ref(), &data_type); - - // arrays.push((chunk.0, array)); - // if sorter.never_match_any(&col) { - // return; - // } - // } - // } - - // Step 2: Read Prewhere columns and get the filter - - let data_block = match self.prewhere_info.as_ref() { + let data_block = match self.prewhere_info.as_mut() { Some(ParquetPrewhereInfo { func_ctx, reader, filter, + top_k, }) => { let chunks = reader.read_from_readers(&mut readers)?; let mut prewhere_block = reader.deserialize(part, chunks, row_selection.clone())?; + // Step 1: Check TOP_K, if prewhere_columns contains not only TOP_K, we can check if TOP_K column can satisfy the heap. + if let Some((index, sorter)) = top_k { + let col = prewhere_block + .get_by_offset(*index) + .value + .as_column() + .unwrap(); + if sorter.never_match_any(col) { + return Ok(()); + } + } + + // Step 2: Read Prewhere columns and get the filter let evaluator = Evaluator::new(&prewhere_block, *func_ctx, &BUILTIN_FUNCTIONS); let result = evaluator .run(filter) @@ -289,29 +276,23 @@ impl Processor for ParquetDeserializeTransform { } // Step 5: Apply the filter to topk and update the bitmap, this will filter more results - // let filter = if let Some((_, sorter, index)) = &mut self.top_k { - // let index_prewhere = self - // .prewhere_columns - // .iter() - // .position(|x| x == index) - // .unwrap(); - // let top_k_column = prewhere_block - // .get_by_offset(index_prewhere) - // .value - // .as_column() - // .unwrap(); - - // let mut bitmap = - // FilterHelpers::filter_to_bitmap(filter, prewhere_block.num_rows()); - // sorter.push_column(top_k_column, &mut bitmap); - // Value::Column(bitmap.into()) - // } else { - // filter - // }; - - // if FilterHelpers::is_all_unset(&filter) { - // return Self::skip_chunks_page(&self.read_columns, chunks); - // } + let filter = if let Some((index, sorter)) = top_k { + let top_k_column = prewhere_block + .get_by_offset(*index) + .value + .as_column() + .unwrap(); + let mut bitmap = + FilterHelpers::filter_to_bitmap(filter, prewhere_block.num_rows()); + sorter.push_column(top_k_column, &mut bitmap); + Value::Column(bitmap.into()) + } else { + filter + }; + + if FilterHelpers::is_all_unset(&filter) { + return Ok(()); + } // Step 6: Read remain columns. let chunks = self.remain_reader.read_from_readers(&mut readers)?; diff --git a/src/query/storages/parquet/src/parquet_part.rs b/src/query/storages/parquet/src/parquet_part.rs index b3e79c72e235..6ce834f05043 100644 --- a/src/query/storages/parquet/src/parquet_part.rs +++ b/src/query/storages/parquet/src/parquet_part.rs @@ -41,15 +41,12 @@ pub struct ParquetRowGroupPart { pub num_rows: usize, pub column_metas: HashMap, pub row_selection: Option>, - /// If all row group parts have min/max stats. This is used for topk push down optimization. - /// - /// If there is one row group part does not have min/max stats, we cannot conduct topk push down optimization. - pub all_have_minmax: bool, + + pub sort_min_max: Option<(Scalar, Scalar)>, } impl ParquetRowGroupPart { - pub fn convert_to_part_info(mut self, all_have_minmax: bool) -> PartInfoPtr { - self.all_have_minmax = all_have_minmax; + pub fn convert_to_part_info(self) -> PartInfoPtr { Arc::new(Box::new(self)) } } diff --git a/src/query/storages/parquet/src/parquet_table/read.rs b/src/query/storages/parquet/src/parquet_table/read.rs index 9e4de4b001f4..49daf4f42e8e 100644 --- a/src/query/storages/parquet/src/parquet_table/read.rs +++ b/src/query/storages/parquet/src/parquet_table/read.rs @@ -26,8 +26,12 @@ use common_expression::DataSchemaRefExt; use common_expression::Expr; use common_expression::FunctionContext; use common_expression::RemoteExpr; +use common_expression::TableSchemaRef; +use common_expression::TopKSorter; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::Pipeline; +use storages_common_index::Index; +use storages_common_index::RangeIndex; use super::ParquetTable; use crate::deserialize_transform::ParquetDeserializeTransform; @@ -60,8 +64,9 @@ impl ParquetTable { plan: &DataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { + let table_schema: TableSchemaRef = self.table_info.schema(); let source_projection = - PushDownInfo::projection_of_push_downs(&self.table_info.schema(), &plan.push_downs); + PushDownInfo::projection_of_push_downs(&table_schema, &plan.push_downs); // The front of the src_fields are prewhere columns (if exist). // The back of the src_fields are remain columns. @@ -77,7 +82,31 @@ impl ParquetTable { source_projection, )?; - let push_down_prewhere = PushDownInfo::prewhere_of_push_downs(&plan.push_downs); + // build top k information + let top_k = plan + .push_downs + .as_ref() + .map(|p| p.top_k(&table_schema, RangeIndex::supported_type)) + .unwrap_or_default(); + + // Build prewhere info. + let mut push_down_prewhere = PushDownInfo::prewhere_of_push_downs(&plan.push_downs); + + let top_k = if let Some((prewhere, top_k)) = push_down_prewhere.as_mut().zip(top_k) { + // If there is a top k, we need to add the top k columns to the prewhere columns. + if let RemoteExpr::::ColumnRef { id, .. } = + &plan.push_downs.as_ref().unwrap().order_by[0].0 + { + let index = table_schema.index_of(id)?; + prewhere.remain_columns.remove_col(index); + prewhere.prewhere_columns.add_col(index); + Some((id.clone(), top_k)) + } else { + None + } + } else { + None + }; // Build remain reader. // If there is no prewhere filter, remain reader is the same as source reader (no prewhere phase, deserialize directly). @@ -91,27 +120,30 @@ impl ParquetTable { source_reader.clone() }; - // Build prewhere info. - - let prewhere_info = Arc::new( - PushDownInfo::prewhere_of_push_downs(&plan.push_downs) - .map(|p| { - let reader = ParquetReader::create( - self.operator.clone(), - self.arrow_schema.clone(), - p.prewhere_columns, - )?; - src_fields.extend_from_slice(reader.output_schema.fields()); - let func_ctx = ctx.get_function_context()?; - let filter = Self::build_filter(func_ctx, &p.filter, &reader.output_schema); - Ok::<_, ErrorCode>(ParquetPrewhereInfo { - func_ctx, - reader, - filter, - }) + let prewhere_info = push_down_prewhere + .map(|p| { + let reader = ParquetReader::create( + self.operator.clone(), + self.arrow_schema.clone(), + p.prewhere_columns, + )?; + src_fields.extend_from_slice(reader.output_schema.fields()); + let func_ctx = ctx.get_function_context()?; + let filter = Self::build_filter(func_ctx, &p.filter, &reader.output_schema); + let top_k = top_k.map(|(name, top_k)| { + ( + reader.output_schema.index_of(&name).unwrap(), + TopKSorter::new(top_k.limit, top_k.asc), + ) + }); + Ok::<_, ErrorCode>(ParquetPrewhereInfo { + func_ctx, + reader, + filter, + top_k, }) - .transpose()?, - ); + }) + .transpose()?; src_fields.extend_from_slice(remain_reader.output_schema.fields()); let src_schema = DataSchemaRefExt::create(src_fields); @@ -135,6 +167,7 @@ impl ParquetTable { )?; pipeline.resize(std::cmp::min(max_threads, max_io_requests))?; } + pipeline.add_transform(|input, output| { ParquetDeserializeTransform::create( ctx.clone(), diff --git a/src/query/storages/parquet/src/pruning.rs b/src/query/storages/parquet/src/pruning.rs index 4c5299d0d52f..52ceec83d7e8 100644 --- a/src/query/storages/parquet/src/pruning.rs +++ b/src/query/storages/parquet/src/pruning.rs @@ -225,7 +225,7 @@ impl PartitionPruner { num_rows: rg.num_rows(), column_metas, row_selection, - all_have_minmax: false, + sort_min_max: None, }) } } @@ -256,6 +256,14 @@ impl PartitionPruner { (b_max.as_ref(), b_min.as_ref()).cmp(&(a_max.as_ref(), a_min.as_ref())) } }); + for part in partitions.iter_mut() { + part.sort_min_max = part + .column_metas + .get(&(top_k.column_id as usize)) + .unwrap() + .min_max + .clone(); + } PartitionsShuffleKind::Seq } else { PartitionsShuffleKind::Mod @@ -263,7 +271,7 @@ impl PartitionPruner { let partitions = partitions .into_iter() - .map(|p| p.convert_to_part_info(all_have_minmax)) + .map(|p| p.convert_to_part_info()) .collect(); Ok(( From c29ad67e3d706da5b6342cc49e0c5615644bd883 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Sat, 4 Feb 2023 12:20:58 +0800 Subject: [PATCH 7/7] Fix bugs. --- .../parquet/src/deserialize_transform.rs | 47 ++++++++++++------- .../storages/parquet/src/parquet_part.rs | 3 ++ .../parquet/src/parquet_table/partition.rs | 8 ++++ src/query/storages/parquet/src/pruning.rs | 23 +++++---- .../08_00_parquet/08_00_03_ontime.result | 20 ++++++++ .../08_00_parquet/08_00_03_ontime.sh | 9 +++- 6 files changed, 84 insertions(+), 26 deletions(-) diff --git a/src/query/storages/parquet/src/deserialize_transform.rs b/src/query/storages/parquet/src/deserialize_transform.rs index 8c6df927c6a9..be8ccfc0ac29 100644 --- a/src/query/storages/parquet/src/deserialize_transform.rs +++ b/src/query/storages/parquet/src/deserialize_transform.rs @@ -238,8 +238,18 @@ impl Processor for ParquetDeserializeTransform { top_k, }) => { let chunks = reader.read_from_readers(&mut readers)?; - let mut prewhere_block = - reader.deserialize(part, chunks, row_selection.clone())?; + + // only if there is not dictionary page, we can push down the row selection + let can_push_down = chunks + .iter() + .all(|(id, _)| !part.column_metas[id].has_dictionary); + let push_down = if can_push_down { + row_selection.clone() + } else { + None + }; + + let mut prewhere_block = reader.deserialize(part, chunks, push_down)?; // Step 1: Check TOP_K, if prewhere_columns contains not only TOP_K, we can check if TOP_K column can satisfy the heap. if let Some((index, sorter)) = top_k { let col = prewhere_block @@ -264,18 +274,7 @@ impl Processor for ParquetDeserializeTransform { return Ok(()); } - // Step 4: Remove columns that are not needed for output. Use dummy column to replce them. - let mut columns = prewhere_block.columns().to_vec(); - for (col, f) in columns.iter_mut().zip(reader.output_schema.fields()) { - if !self.output_schema.has_field(f.name()) { - *col = BlockEntry { - data_type: DataType::Null, - value: Value::Scalar(Scalar::Null), - }; - } - } - - // Step 5: Apply the filter to topk and update the bitmap, this will filter more results + // Step 4: Apply the filter to topk and update the bitmap, this will filter more results let filter = if let Some((index, sorter)) = top_k { let top_k_column = prewhere_block .get_by_offset(*index) @@ -294,12 +293,26 @@ impl Processor for ParquetDeserializeTransform { return Ok(()); } + // Step 5 Remove columns that are not needed for output. Use dummy column to replce them. + let mut columns = prewhere_block.columns().to_vec(); + for (col, f) in columns.iter_mut().zip(reader.output_schema.fields()) { + if !self.output_schema.has_field(f.name()) { + *col = BlockEntry { + data_type: DataType::Null, + value: Value::Scalar(Scalar::Null), + }; + } + } + // Step 6: Read remain columns. let chunks = self.remain_reader.read_from_readers(&mut readers)?; - if row_selection.is_some() { + let can_push_down = chunks + .iter() + .all(|(id, _)| !part.column_metas[id].has_dictionary); + let push_down = if can_push_down { row_selection } else { None }; + if push_down.is_some() || !can_push_down { let remain_block = - self.remain_reader - .deserialize(part, chunks, row_selection)?; + self.remain_reader.deserialize(part, chunks, push_down)?; // Combine two blocks. for col in remain_block.columns() { diff --git a/src/query/storages/parquet/src/parquet_part.rs b/src/query/storages/parquet/src/parquet_part.rs index 6ce834f05043..060f682c02a2 100644 --- a/src/query/storages/parquet/src/parquet_part.rs +++ b/src/query/storages/parquet/src/parquet_part.rs @@ -33,6 +33,9 @@ pub struct ColumnMeta { pub length: u64, pub compression: Compression, pub min_max: Option<(Scalar, Scalar)>, + + // if has dictionary, we can not push down predicate to deserialization. + pub has_dictionary: bool, } #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] diff --git a/src/query/storages/parquet/src/parquet_table/partition.rs b/src/query/storages/parquet/src/parquet_table/partition.rs index afd915aa75c1..7b73c7fb09b2 100644 --- a/src/query/storages/parquet/src/parquet_table/partition.rs +++ b/src/query/storages/parquet/src/parquet_table/partition.rs @@ -80,6 +80,14 @@ impl ParquetTable { .map(|f| f.as_expr(&BUILTIN_FUNCTIONS)) .collect::>() }); + let top_k = top_k.map(|top_k| { + let offset = projected_column_nodes + .column_nodes + .iter() + .position(|node| node.leaf_ids[0] == top_k.column_id as usize) + .unwrap(); + (top_k, offset) + }); let func_ctx = ctx.get_function_context()?; diff --git a/src/query/storages/parquet/src/pruning.rs b/src/query/storages/parquet/src/pruning.rs index 52ceec83d7e8..915d09157a09 100644 --- a/src/query/storages/parquet/src/pruning.rs +++ b/src/query/storages/parquet/src/pruning.rs @@ -65,8 +65,8 @@ pub struct PartitionPruner { pub column_nodes: ColumnNodes, /// Whether to skip pruning. pub skip_pruning: bool, - /// top k information from pushed down information. - pub top_k: Option, + /// top k information from pushed down information. The usize is the offset of top k column in `schema`. + pub top_k: Option<(TopK, usize)>, // TODO: use limit information for pruning // /// Limit of this query. If there is order by and filter, it will not be used (assign to `usize::MAX`). // pub limit: usize, @@ -149,7 +149,9 @@ impl PartitionPruner { .any(|c| c.metadata().statistics.is_none()) }); - let row_group_stats = if row_group_pruner.is_some() && !skip_pruning && !no_stats { + let row_group_stats = if no_stats { + None + } else if row_group_pruner.is_some() && !skip_pruning { let pruner = row_group_pruner.as_ref().unwrap(); // If collecting stats fails or `should_keep` is true, we still read the row group. // Otherwise, the row group will be pruned. @@ -207,16 +209,21 @@ impl PartitionPruner { let c = &rg.columns()[*index]; let (offset, length) = c.byte_range(); - let min_max = row_group_stats.as_ref().map(|stats| { - let stat = stats[rg_idx].get(&(*index as u32)).unwrap(); - (stat.min.clone(), stat.max.clone()) - }); + let min_max = top_k + .as_ref() + .filter(|(tk, _)| tk.column_id as usize == *index) + .zip(row_group_stats.as_ref()) + .map(|((_, offset), stats)| { + let stat = stats[rg_idx].get(&(*offset as u32)).unwrap(); + (stat.min.clone(), stat.max.clone()) + }); column_metas.insert(*index, ColumnMeta { offset, length, compression: c.compression(), min_max, + has_dictionary: c.dictionary_page_offset().is_some(), }); } @@ -233,7 +240,7 @@ impl PartitionPruner { // 3. Check if can conduct topk push down optimization. // Only all row groups have min/max stats can we use topk optimization. // If we can use topk optimization, we should use `PartitionsShuffleKind::Seq`. - let partition_kind = if let (Some(top_k), true) = (top_k, all_have_minmax) { + let partition_kind = if let (Some((top_k, _)), true) = (top_k, all_have_minmax) { partitions.sort_by(|a, b| { let (a_min, a_max) = a .column_metas diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.result b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.result index c286d7b9a3fb..fb40eaacdb2e 100644 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.result +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.result @@ -8,3 +8,23 @@ N176PQ N336PQ N901XJ N909XJ +N132EV +N132EV +N136EV +N137EV +N137EV +N937XJ +N937XJ +N930XJ +N929XJ +N929XJ +12 +12 +12 +12 +12 +12 +12 +12 +12 +12 diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.sh b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.sh index a9a47c63cd5f..ec2023e8dfa3 100755 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.sh +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_03_ontime.sh @@ -8,7 +8,14 @@ echo "create stage s1 FILE_FORMAT = (type = PARQUET);" | $MYSQL_CLIENT_CONNECT aws --endpoint-url http://127.0.0.1:9900 s3 cp s3://testbucket/admin/data/ontime_200.parquet s3://testbucket/admin/stage/internal/s1/ontime_200.parquet >/dev/null 2>&1 -echo "select count(*) from @s1" | $MYSQL_CLIENT_CONNECT +echo "select count(*) from @s1;" | $MYSQL_CLIENT_CONNECT echo "select tail_number from @s1 where dayofmonth=1;" | $MYSQL_CLIENT_CONNECT +echo "select tail_number from @s1 where dayofmonth > 15 order by tail_number limit 5;" | $MYSQL_CLIENT_CONNECT + +echo "select tail_number from @s1 where dayofmonth > 15 order by tail_number desc limit 5;" | $MYSQL_CLIENT_CONNECT + +echo "select month from @s1 where dayofmonth > 15 order by tail_number desc limit 5;" | $MYSQL_CLIENT_CONNECT + +echo "select month from @s1 where dayofmonth > 15 order by tail_number desc limit 5;" | $MYSQL_CLIENT_CONNECT