Skip to content

Commit

Permalink
Do not assume dictionaries exists in footer (#1631)
Browse files Browse the repository at this point in the history
* do not assume footer exists, fixes issue #1335

* fix cargo fmt and clippy errors
  • Loading branch information
pcjentsch committed May 3, 2022
1 parent 98a77fe commit 8f24c45
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,43 +651,53 @@ impl<R: Read + Seek> FileReader<R> {

// Create an array of optional dictionary value arrays, one per field.
let mut dictionaries_by_field = vec![None; schema.all_fields().len()];
for block in footer.dictionaries().unwrap() {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
if let Some(dictionaries) = footer.dictionaries() {
for block in dictionaries {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
}
let footer_len = i32::from_le_bytes(message_size);
let mut block_data = vec![0; footer_len as usize];

reader.read_exact(&mut block_data)?;

let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
}
let footer_len = i32::from_le_bytes(message_size);
let mut block_data = vec![0; footer_len as usize];

match message.header_type() {
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();
reader.read_exact(&mut block_data)?;

// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;
let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!(
"Unable to get root as message: {:?}",
err
))
})?;

read_dictionary(&buf, batch, &schema, &mut dictionaries_by_field)?;
}
t => {
return Err(ArrowError::IoError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
match message.header_type() {
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();

// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;

read_dictionary(
&buf,
batch,
&schema,
&mut dictionaries_by_field,
)?;
}
t => {
return Err(ArrowError::IoError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
}
};
}
}
let projection = match projection {
Some(projection_indices) => {
Expand Down

0 comments on commit 8f24c45

Please sign in to comment.