diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 23e2398e0f8..4cc7294f675 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -248,6 +248,7 @@ impl ParquetRecordBatchReader { #[cfg(test)] mod tests { + use bytes::Bytes; use std::cmp::min; use std::convert::TryFrom; use std::fs::File; @@ -285,7 +286,6 @@ mod tests { use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; - use crate::util::cursor::SliceableCursor; use crate::util::test_common::RandGen; #[test] @@ -1162,7 +1162,7 @@ mod tests { 114, 111, 119, 0, 130, 0, 0, 0, 80, 65, 82, 49, ]; - let file = SliceableCursor::new(data); + let file = Bytes::from(data); let file_reader = SerializedFileReader::new(file).unwrap(); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 530dfe2ad09..ceeddfef5d1 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -689,6 +689,7 @@ fn get_fsb_array_slice( mod tests { use super::*; + use bytes::Bytes; use std::fs::File; use std::sync::Arc; @@ -750,7 +751,7 @@ mod tests { writer.close().unwrap(); } - let cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); + let cursor = Bytes::from(buffer); let reader = SerializedFileReader::new(cursor).unwrap(); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index db8a23d8ebc..76461358681 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -160,11 +160,11 @@ fn parse_column_orders( #[cfg(test)] mod tests { use super::*; + use bytes::Bytes; use crate::basic::SortOrder; use crate::basic::Type; use crate::schema::types::Type as SchemaType; - use crate::util::cursor::SliceableCursor; use parquet_format::TypeDefinedOrder; #[test] @@ -180,7 +180,7 @@ mod tests { #[test] fn test_parse_metadata_corrupt_footer() { - let data = SliceableCursor::new(Arc::new(vec![1, 2, 3, 4, 5, 6, 7, 8])); + let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]); let reader_result = parse_metadata(&data); assert!(reader_result.is_err()); assert_eq!( @@ -191,8 +191,7 @@ mod tests { #[test] fn test_parse_metadata_invalid_length() { - let test_file = - SliceableCursor::new(Arc::new(vec![0, 0, 0, 255, b'P', b'A', b'R', b'1'])); + let test_file = Bytes::from(vec![0, 0, 0, 255, b'P', b'A', b'R', b'1']); let reader_result = parse_metadata(&test_file); assert!(reader_result.is_err()); assert_eq!( @@ -205,8 +204,7 @@ mod tests { #[test] fn test_parse_metadata_invalid_start() { - let test_file = - SliceableCursor::new(Arc::new(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1'])); + let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']); let reader_result = parse_metadata(&test_file); assert!(reader_result.is_err()); assert_eq!( diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1dd374ef85c..22f6f4a7369 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -18,6 +18,7 @@ //! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) +use bytes::{Buf, Bytes}; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; use parquet_format::{PageHeader, PageType}; @@ -36,6 +37,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr}; // export `SliceableCursor` and `FileSource` publically so clients can // re-use the logic in their own ParquetFileWriter wrappers +#[allow(deprecated)] pub use crate::util::{cursor::SliceableCursor, io::FileSource}; // ---------------------------------------------------------------------- @@ -61,12 +63,35 @@ impl ChunkReader for File { } } +impl Length for Bytes { + fn len(&self) -> u64 { + self.len() as u64 + } +} + +impl TryClone for Bytes { + fn try_clone(&self) -> std::io::Result { + Ok(self.clone()) + } +} + +impl ChunkReader for Bytes { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64, length: usize) -> Result { + let start = start as usize; + Ok(self.slice(start..start + length).reader()) + } +} + +#[allow(deprecated)] impl Length for SliceableCursor { fn len(&self) -> u64 { SliceableCursor::len(self) } } +#[allow(deprecated)] impl ChunkReader for SliceableCursor { type T = SliceableCursor; @@ -521,7 +546,7 @@ mod tests { get_test_file("alltypes_plain.parquet") .read_to_end(&mut buf) .unwrap(); - let cursor = SliceableCursor::new(buf); + let cursor = Bytes::from(buf); let read_from_cursor = SerializedFileReader::new(cursor).unwrap(); let test_file = get_test_file("alltypes_plain.parquet"); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 646550dcb6b..3108baddefa 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -541,6 +541,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { mod tests { use super::*; + use bytes::Bytes; use std::{fs::File, io::Cursor}; use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; @@ -1054,7 +1055,7 @@ mod tests { } fn test_bytes_roundtrip(data: Vec>) { - let mut cursor = Cursor::new(vec![]); + let mut buffer = vec![]; let schema = Arc::new( types::Type::group_type_builder("schema") @@ -1072,7 +1073,7 @@ mod tests { { let props = Arc::new(WriterProperties::builder().build()); let mut writer = - SerializedFileWriter::new(&mut cursor, schema, props).unwrap(); + SerializedFileWriter::new(&mut buffer, schema, props).unwrap(); for subset in &data { let mut row_group_writer = writer.next_row_group().unwrap(); @@ -1089,9 +1090,7 @@ mod tests { writer.close().unwrap(); } - let buffer = cursor.into_inner(); - - let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); + let reading_cursor = Bytes::from(buffer); let reader = SerializedFileReader::new(reading_cursor).unwrap(); assert_eq!(reader.num_row_groups(), data.len()); diff --git a/parquet/src/util/cursor.rs b/parquet/src/util/cursor.rs index ff7067fcbca..706724dbf52 100644 --- a/parquet/src/util/cursor.rs +++ b/parquet/src/util/cursor.rs @@ -26,6 +26,7 @@ use std::{cmp, fmt}; /// because the lack of Generic Associated Type implies that you would require complex lifetime propagation when /// returning such a cursor. #[allow(clippy::rc_buffer)] +#[deprecated = "use bytes::Bytes instead"] pub struct SliceableCursor { inner: Arc>, start: u64, @@ -33,6 +34,7 @@ pub struct SliceableCursor { pos: u64, } +#[allow(deprecated)] impl fmt::Debug for SliceableCursor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SliceableCursor") @@ -44,6 +46,7 @@ impl fmt::Debug for SliceableCursor { } } +#[allow(deprecated)] impl SliceableCursor { pub fn new(content: impl Into>>) -> Self { let inner = content.into(); @@ -90,6 +93,7 @@ impl SliceableCursor { } /// Implementation inspired by std::io::Cursor +#[allow(deprecated)] impl Read for SliceableCursor { fn read(&mut self, buf: &mut [u8]) -> io::Result { let n = Read::read(&mut self.remaining_slice(), buf)?; @@ -98,6 +102,7 @@ impl Read for SliceableCursor { } } +#[allow(deprecated)] impl Seek for SliceableCursor { fn seek(&mut self, pos: SeekFrom) -> io::Result { let new_pos = match pos { @@ -204,12 +209,14 @@ mod tests { use super::*; /// Create a SliceableCursor of all u8 values in ascending order + #[allow(deprecated)] fn get_u8_range() -> SliceableCursor { let data: Vec = (0u8..=255).collect(); SliceableCursor::new(data) } /// Reads all the bytes in the slice and checks that it matches the u8 range from start to end_included + #[allow(deprecated)] fn check_read_all(mut cursor: SliceableCursor, start: u8, end_included: u8) { let mut target = vec![]; let cursor_res = cursor.read_to_end(&mut target);