diff --git a/parquet-testing b/parquet-testing index 7175a471339..aafd3fc9df4 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 7175a471339704c7645af0fe66c68305e2e6759c +Subproject commit aafd3fc9df431c2625a514fb46626e5614f1d199 diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 7ec29de0173..ad8fe16ad8b 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -55,8 +55,10 @@ use crate::schema::types::{ pub struct ParquetMetaData { file_metadata: FileMetaData, row_groups: Vec, - page_indexes: Option>, - offset_indexes: Option>>, + /// Page index for all pages in each column chunk + page_indexes: Option>>, + /// Offset index for all pages in each column chunk + offset_indexes: Option>>>, } impl ParquetMetaData { @@ -74,8 +76,8 @@ impl ParquetMetaData { pub fn new_with_page_index( file_metadata: FileMetaData, row_groups: Vec, - page_indexes: Option>, - offset_indexes: Option>>, + page_indexes: Option>>, + offset_indexes: Option>>>, ) -> Self { ParquetMetaData { file_metadata, @@ -107,12 +109,12 @@ impl ParquetMetaData { } /// Returns page indexes in this file. - pub fn page_indexes(&self) -> Option<&Vec> { + pub fn page_indexes(&self) -> Option<&Vec>> { self.page_indexes.as_ref() } /// Returns offset indexes in this file. - pub fn offset_indexes(&self) -> Option<&Vec>> { + pub fn offset_indexes(&self) -> Option<&Vec>>> { self.offset_indexes.as_ref() } } diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index e97826c63b4..5c0a7df84e7 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -56,6 +56,10 @@ pub enum Index { DOUBLE(NativeIndex), BYTE_ARRAY(ByteArrayIndex), FIXED_LEN_BYTE_ARRAY(ByteArrayIndex), + /// Sometimes reading page index from parquet file + /// will only return pageLocations without min_max index, + /// `None` represents this lack of index information + None, } /// An index of a column of [`Type`] physical representation diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 8414480903f..6165021399f 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -101,13 +101,7 @@ fn get_index_offset_and_lengths( .iter() .map(|x| x.column_index_length()) .map(|maybe_length| { - let index_length = maybe_length.ok_or_else(|| { - ParquetError::General( - "The column_index_length must exist if offset_index_offset exists" - .to_string(), - ) - })?; - + let index_length = maybe_length.unwrap_or(0); Ok(index_length.try_into().unwrap()) }) .collect::, ParquetError>>()?; @@ -143,6 +137,9 @@ fn deserialize_column_index( data: &[u8], column_type: Type, ) -> Result { + if data.is_empty() { + return Ok(Index::None); + } let mut d = Cursor::new(data); let mut prot = TCompactInputProtocol::new(&mut d); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6ff73e041e8..c9f6c1f7564 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -248,13 +248,17 @@ impl SerializedFileReader { } if options.enable_page_index { - //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup - //support multi after create multi-RG test data. - let cols = metadata.row_group(0); - let columns_indexes = - index_reader::read_columns_indexes(&chunk_reader, cols.columns())?; - let pages_locations = - index_reader::read_pages_locations(&chunk_reader, cols.columns())?; + let mut columns_indexes = vec![]; + let mut offset_indexes = vec![]; + + for rg in &filtered_row_groups { + let column_index = + index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; + let offset_index = + index_reader::read_pages_locations(&chunk_reader, rg.columns())?; + columns_indexes.push(column_index); + offset_indexes.push(offset_index); + } Ok(Self { chunk_reader: Arc::new(chunk_reader), @@ -262,7 +266,7 @@ impl SerializedFileReader { metadata.file_metadata().clone(), filtered_row_groups, Some(columns_indexes), - Some(pages_locations), + Some(offset_indexes), ), }) } else { @@ -561,9 +565,11 @@ impl PageReader for SerializedPageReader { mod tests { use super::*; use crate::basic::{self, ColumnOrder}; - use crate::file::page_index::index::Index; + use crate::data_type::private::ParquetValueType; + use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex}; use crate::record::RowAccessor; use crate::schema::parser::parse_message_type; + use crate::util::bit_util::from_le_slice; use crate::util::test_common::{get_test_file, get_test_path}; use parquet_format::BoundaryOrder; use std::sync::Arc; @@ -1077,7 +1083,7 @@ mod tests { // only one row group assert_eq!(page_indexes.len(), 1); - let index = if let Index::BYTE_ARRAY(index) = page_indexes.get(0).unwrap() { + let index = if let Index::BYTE_ARRAY(index) = &page_indexes[0][0] { index } else { unreachable!() @@ -1089,7 +1095,7 @@ mod tests { //only one page group assert_eq!(index_in_pages.len(), 1); - let page0 = index_in_pages.get(0).unwrap(); + let page0 = &index_in_pages[0]; let min = page0.min.as_ref().unwrap(); let max = page0.max.as_ref().unwrap(); assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap()); @@ -1098,11 +1104,215 @@ mod tests { let offset_indexes = metadata.offset_indexes().unwrap(); // only one row group assert_eq!(offset_indexes.len(), 1); - let offset_index = offset_indexes.get(0).unwrap(); - let page_offset = offset_index.get(0).unwrap(); + let offset_index = &offset_indexes[0]; + let page_offset = &offset_index[0][0]; assert_eq!(4, page_offset.offset); assert_eq!(152, page_offset.compressed_page_size); assert_eq!(0, page_offset.first_row_index); } + + #[test] + fn test_page_index_reader_all_type() { + let test_file = get_test_file("alltypes_tiny_pages_plain.parquet"); + let builder = ReadOptionsBuilder::new(); + //enable read page index + let options = builder.with_page_index().build(); + let reader_result = SerializedFileReader::new_with_options(test_file, options); + let reader = reader_result.unwrap(); + + // Test contents in Parquet metadata + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + + let page_indexes = metadata.page_indexes().unwrap(); + let row_group_offset_indexes = &metadata.offset_indexes().unwrap()[0]; + + // only one row group + assert_eq!(page_indexes.len(), 1); + let row_group_metadata = metadata.row_group(0); + + //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][0] { + check_native_page_index( + index, + 325, + get_row_group_min_max_bytes(row_group_metadata, 0), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[0].len(), 325); + } else { + unreachable!() + }; + //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0] + if let Index::BOOLEAN(index) = &page_indexes[0][1] { + assert_eq!(index.indexes.len(), 82); + assert_eq!(row_group_offset_indexes[1].len(), 82); + } else { + unreachable!() + }; + //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][2] { + check_native_page_index( + index, + 325, + get_row_group_min_max_bytes(row_group_metadata, 2), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[2].len(), 325); + } else { + unreachable!() + }; + //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][3] { + check_native_page_index( + index, + 325, + get_row_group_min_max_bytes(row_group_metadata, 3), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[3].len(), 325); + } else { + unreachable!() + }; + //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][4] { + check_native_page_index( + index, + 325, + get_row_group_min_max_bytes(row_group_metadata, 4), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[4].len(), 325); + } else { + unreachable!() + }; + //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0] + if let Index::INT64(index) = &page_indexes[0][5] { + check_native_page_index( + index, + 528, + get_row_group_min_max_bytes(row_group_metadata, 5), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[5].len(), 528); + } else { + unreachable!() + }; + //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0] + if let Index::FLOAT(index) = &page_indexes[0][6] { + check_native_page_index( + index, + 325, + get_row_group_min_max_bytes(row_group_metadata, 6), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[6].len(), 325); + } else { + unreachable!() + }; + //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0] + if let Index::DOUBLE(index) = &page_indexes[0][7] { + check_native_page_index( + index, + 528, + get_row_group_min_max_bytes(row_group_metadata, 7), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[7].len(), 528); + } else { + unreachable!() + }; + //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0] + if let Index::BYTE_ARRAY(index) = &page_indexes[0][8] { + check_bytes_page_index( + index, + 974, + get_row_group_min_max_bytes(row_group_metadata, 8), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[8].len(), 974); + } else { + unreachable!() + }; + //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::BYTE_ARRAY(index) = &page_indexes[0][9] { + check_bytes_page_index( + index, + 352, + get_row_group_min_max_bytes(row_group_metadata, 9), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[9].len(), 352); + } else { + unreachable!() + }; + //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined] + //Notice: min_max values for each page for this col not exits. + if let Index::None = &page_indexes[0][10] { + assert_eq!(row_group_offset_indexes[10].len(), 974); + } else { + unreachable!() + }; + //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][11] { + check_native_page_index( + index, + 325, + get_row_group_min_max_bytes(row_group_metadata, 11), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[11].len(), 325); + } else { + unreachable!() + }; + //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][12] { + check_native_page_index( + index, + 325, + get_row_group_min_max_bytes(row_group_metadata, 12), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[12].len(), 325); + } else { + unreachable!() + }; + } + + fn check_native_page_index( + row_group_index: &NativeIndex, + page_size: usize, + min_max: (&[u8], &[u8]), + boundary_order: BoundaryOrder, + ) { + assert_eq!(row_group_index.indexes.len(), page_size); + assert_eq!(row_group_index.boundary_order, boundary_order); + row_group_index.indexes.iter().all(|x| { + x.min.as_ref().unwrap() >= &from_le_slice::(min_max.0) + && x.max.as_ref().unwrap() <= &from_le_slice::(min_max.1) + }); + } + + fn check_bytes_page_index( + row_group_index: &ByteArrayIndex, + page_size: usize, + min_max: (&[u8], &[u8]), + boundary_order: BoundaryOrder, + ) { + assert_eq!(row_group_index.indexes.len(), page_size); + assert_eq!(row_group_index.boundary_order, boundary_order); + row_group_index.indexes.iter().all(|x| { + x.min.as_ref().unwrap().as_slice() >= min_max.0 + && x.max.as_ref().unwrap().as_slice() <= min_max.1 + }); + } + + fn get_row_group_min_max_bytes( + r: &RowGroupMetaData, + col_num: usize, + ) -> (&[u8], &[u8]) { + let statistics = r.column(col_num).statistics().unwrap(); + (statistics.min_bytes(), statistics.max_bytes()) + } }