Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockwise IO in IPC FileReader (#5153) #5179

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 47 additions & 73 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow_array::*;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::ArrayData;
use arrow_schema::*;

use crate::compression::CompressionCodec;
use crate::{FieldNode, MetadataVersion, CONTINUATION_MARKER};
use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
use DataType::*;

/// Read a buffer based on offset and length
Expand Down Expand Up @@ -498,10 +498,34 @@ pub fn read_dictionary(
Ok(())
}

/// Read the data for a given block
fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am making an assumption here that blocks should be large enough to be an appropriate IO size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @pitrou might know if this is a bad idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that a block typically points to an entire record batch, yes, a block is certainly large enough.

reader.seek(SeekFrom::Start(block.offset() as u64))?;
let body_len = block.bodyLength().to_usize().unwrap();
let metadata_len = block.metaDataLength().to_usize().unwrap();
let total_len = body_len.checked_add(metadata_len).unwrap();

let mut buf = MutableBuffer::from_len_zeroed(total_len);
reader.read_exact(&mut buf)?;
Ok(buf.into())
}

/// Parse an encapsulated message
///
/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
let buf = match buf[..4] == CONTINUATION_MARKER {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does highlight a somewhat peculiar quirk of the IPC file format, it doesn't actually care about the size prefixes. I guess this is just a historical artifact of the fact the IPC streams came first.

true => &buf[8..],
false => &buf[4..],
};
crate::root_as_message(buf)
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
reader: BufReader<R>,
reader: R,

/// The schema that is read from the file header
schema: SchemaRef,
Expand Down Expand Up @@ -535,45 +559,34 @@ pub struct FileReader<R: Read + Seek> {
impl<R: Read + Seek> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("reader", &"BufReader<..>")
.field("schema", &self.schema)
.field("blocks", &self.blocks)
.field("current_block", &self.current_block)
.field("total_blocks", &self.total_blocks)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("metadata_version", &self.metadata_version)
.field("projection", &self.projection)
.finish()
.finish_non_exhaustive()
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let mut reader = BufReader::new(reader);
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
/// Returns errors if the file does not meet the Arrow Format footer requirements
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be sufficient to just check the footer, and this provides a more predictable IO pattern

pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let mut buffer = [0; 10];
tustvold marked this conversation as resolved.
Show resolved Hide resolved
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut buffer)?;

if buffer[4..] != super::ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contain correct footer".to_string(),
));
}

// read footer length
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = i32::from_le_bytes(footer_size);
let footer_len = i32::from_le_bytes(buffer[..4].try_into().unwrap());

// read footer
let mut footer_data = vec![0; footer_len as usize];
Expand Down Expand Up @@ -607,35 +620,14 @@ impl<R: Read + Seek> FileReader<R> {
let mut dictionaries_by_id = HashMap::new();
if let Some(dictionaries) = footer.dictionaries() {
for block in dictionaries {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
}
let footer_len = i32::from_le_bytes(message_size);
let mut block_data = vec![0; footer_len as usize];

reader.read_exact(&mut block_data)?;

let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;
let buf = read_block(&mut reader, block)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reducing the number of copies of the code that does the read seems like a good improvement to me

It seems like previously the code did 2 IOs per block (at least) to read the header and then the data. With this PR it will only do 1 IO a block (given there is no continuation)

let message = parse_message(&buf)?;

match message.header_type() {
crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();

// read the block that makes up the dictionary batch into a buffer
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;

read_dictionary(
&buf.into(),
&buf.slice(block.metaDataLength() as _),
batch,
&schema,
&mut dictionaries_by_id,
Expand Down Expand Up @@ -702,27 +694,15 @@ impl<R: Read + Seek> FileReader<R> {
}

fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
let block = self.blocks[self.current_block];
let block = &self.blocks[self.current_block];
self.current_block += 1;

// read length
self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
let mut meta_buf = [0; 4];
self.reader.read_exact(&mut meta_buf)?;
if meta_buf == CONTINUATION_MARKER {
// continuation marker encountered, read message next
self.reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf);

let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;
let buffer = read_block(&mut self.reader, block)?;
let message = parse_message(&buffer)?;

// some old test data's footer metadata is not set, so we account for that
if self.metadata_version != crate::MetadataVersion::V1
if self.metadata_version != MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IpcError(
Expand All @@ -739,14 +719,8 @@ impl<R: Read + Seek> FileReader<R> {
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
self.reader.read_exact(&mut buf)?;

read_record_batch(
&buf.into(),
&buffer.slice(block.metaDataLength() as _),
batch,
self.schema(),
&self.dictionaries_by_id,
Expand All @@ -766,14 +740,14 @@ impl<R: Read + Seek> FileReader<R> {
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_ref(&self) -> &R {
self.reader.get_ref()
&self.reader
}

/// Gets a mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
self.reader.get_mut()
&mut self.reader
}
}

Expand Down