Skip to content

Commit

Permalink
Implement ChunkReader for Bytes
Browse files Browse the repository at this point in the history
Deprecate SliceableCursor
  • Loading branch information
tustvold committed Jun 2, 2022
1 parent c1a91dc commit 6f8b935
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 15 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl ParquetRecordBatchReader {

#[cfg(test)]
mod tests {
use bytes::Bytes;
use std::cmp::min;
use std::convert::TryFrom;
use std::fs::File;
Expand Down Expand Up @@ -285,7 +286,6 @@ mod tests {
use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
use crate::util::cursor::SliceableCursor;
use crate::util::test_common::RandGen;

#[test]
Expand Down Expand Up @@ -1162,7 +1162,7 @@ mod tests {
114, 111, 119, 0, 130, 0, 0, 0, 80, 65, 82, 49,
];

let file = SliceableCursor::new(data);
let file = Bytes::from(data);
let file_reader = SerializedFileReader::new(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));

Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ fn get_fsb_array_slice(
mod tests {
use super::*;

use bytes::Bytes;
use std::fs::File;
use std::sync::Arc;

Expand Down Expand Up @@ -750,7 +751,7 @@ mod tests {
writer.close().unwrap();
}

let cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
let cursor = Bytes::from(buffer);
let reader = SerializedFileReader::new(cursor).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
Expand Down
10 changes: 4 additions & 6 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ fn parse_column_orders(
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;

use crate::basic::SortOrder;
use crate::basic::Type;
use crate::schema::types::Type as SchemaType;
use crate::util::cursor::SliceableCursor;
use parquet_format::TypeDefinedOrder;

#[test]
Expand All @@ -180,7 +180,7 @@ mod tests {

#[test]
fn test_parse_metadata_corrupt_footer() {
let data = SliceableCursor::new(Arc::new(vec![1, 2, 3, 4, 5, 6, 7, 8]));
let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let reader_result = parse_metadata(&data);
assert!(reader_result.is_err());
assert_eq!(
Expand All @@ -191,8 +191,7 @@ mod tests {

#[test]
fn test_parse_metadata_invalid_length() {
let test_file =
SliceableCursor::new(Arc::new(vec![0, 0, 0, 255, b'P', b'A', b'R', b'1']));
let test_file = Bytes::from(vec![0, 0, 0, 255, b'P', b'A', b'R', b'1']);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
Expand All @@ -205,8 +204,7 @@ mod tests {

#[test]
fn test_parse_metadata_invalid_start() {
let test_file =
SliceableCursor::new(Arc::new(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']));
let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
Expand Down
27 changes: 26 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)

use bytes::{Buf, Bytes};
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};

use parquet_format::{PageHeader, PageType};
Expand All @@ -36,6 +37,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr};

// export `SliceableCursor` and `FileSource` publically so clients can
// re-use the logic in their own ParquetFileWriter wrappers
#[allow(deprecated)]
pub use crate::util::{cursor::SliceableCursor, io::FileSource};

// ----------------------------------------------------------------------
Expand All @@ -61,12 +63,35 @@ impl ChunkReader for File {
}
}

impl Length for Bytes {
fn len(&self) -> u64 {
self.len() as u64
}
}

impl TryClone for Bytes {
fn try_clone(&self) -> std::io::Result<Self> {
Ok(self.clone())
}
}

impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
let start = start as usize;
Ok(self.slice(start..start + length).reader())
}
}

#[allow(deprecated)]
impl Length for SliceableCursor {
fn len(&self) -> u64 {
SliceableCursor::len(self)
}
}

#[allow(deprecated)]
impl ChunkReader for SliceableCursor {
type T = SliceableCursor;

Expand Down Expand Up @@ -521,7 +546,7 @@ mod tests {
get_test_file("alltypes_plain.parquet")
.read_to_end(&mut buf)
.unwrap();
let cursor = SliceableCursor::new(buf);
let cursor = Bytes::from(buf);
let read_from_cursor = SerializedFileReader::new(cursor).unwrap();

let test_file = get_test_file("alltypes_plain.parquet");
Expand Down
9 changes: 4 additions & 5 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
mod tests {
use super::*;

use bytes::Bytes;
use std::{fs::File, io::Cursor};

use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
Expand Down Expand Up @@ -1054,7 +1055,7 @@ mod tests {
}

fn test_bytes_roundtrip(data: Vec<Vec<i32>>) {
let mut cursor = Cursor::new(vec![]);
let mut buffer = vec![];

let schema = Arc::new(
types::Type::group_type_builder("schema")
Expand All @@ -1072,7 +1073,7 @@ mod tests {
{
let props = Arc::new(WriterProperties::builder().build());
let mut writer =
SerializedFileWriter::new(&mut cursor, schema, props).unwrap();
SerializedFileWriter::new(&mut buffer, schema, props).unwrap();

for subset in &data {
let mut row_group_writer = writer.next_row_group().unwrap();
Expand All @@ -1089,9 +1090,7 @@ mod tests {
writer.close().unwrap();
}

let buffer = cursor.into_inner();

let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
let reading_cursor = Bytes::from(buffer);
let reader = SerializedFileReader::new(reading_cursor).unwrap();

assert_eq!(reader.num_row_groups(), data.len());
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/util/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ use std::{cmp, fmt};
/// because the lack of Generic Associated Type implies that you would require complex lifetime propagation when
/// returning such a cursor.
#[allow(clippy::rc_buffer)]
#[deprecated = "use bytes::Bytes instead"]
pub struct SliceableCursor {
inner: Arc<Vec<u8>>,
start: u64,
length: usize,
pos: u64,
}

#[allow(deprecated)]
impl fmt::Debug for SliceableCursor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SliceableCursor")
Expand All @@ -44,6 +46,7 @@ impl fmt::Debug for SliceableCursor {
}
}

#[allow(deprecated)]
impl SliceableCursor {
pub fn new(content: impl Into<Arc<Vec<u8>>>) -> Self {
let inner = content.into();
Expand Down Expand Up @@ -90,6 +93,7 @@ impl SliceableCursor {
}

/// Implementation inspired by std::io::Cursor
#[allow(deprecated)]
impl Read for SliceableCursor {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = Read::read(&mut self.remaining_slice(), buf)?;
Expand All @@ -98,6 +102,7 @@ impl Read for SliceableCursor {
}
}

#[allow(deprecated)]
impl Seek for SliceableCursor {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let new_pos = match pos {
Expand Down Expand Up @@ -204,12 +209,14 @@ mod tests {
use super::*;

/// Create a SliceableCursor of all u8 values in ascending order
#[allow(deprecated)]
fn get_u8_range() -> SliceableCursor {
let data: Vec<u8> = (0u8..=255).collect();
SliceableCursor::new(data)
}

/// Reads all the bytes in the slice and checks that it matches the u8 range from start to end_included
#[allow(deprecated)]
fn check_read_all(mut cursor: SliceableCursor, start: u8, end_included: u8) {
let mut target = vec![];
let cursor_res = cursor.read_to_end(&mut target);
Expand Down

0 comments on commit 6f8b935

Please sign in to comment.