Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 96 additions & 8 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use bytes::Bytes;
use futures::{FutureExt, TryFutureExt, future::BoxFuture};
use futures::{
FutureExt, TryFutureExt,
future::{BoxFuture, try_join_all},
};
use object_store::ObjectStoreExt;
use object_store::{GetOptions, GetRange};
use object_store::{ObjectStore, path::Path};
Expand Down Expand Up @@ -55,6 +58,7 @@ use tokio::runtime::Handle;
pub struct ParquetObjectReader {
store: Arc<dyn ObjectStore>,
path: Path,
version: Option<String>,
file_size: Option<u64>,
metadata_size_hint: Option<usize>,
preload_column_index: bool,
Expand All @@ -68,6 +72,7 @@ impl ParquetObjectReader {
Self {
store,
path,
version: None,
file_size: None,
metadata_size_hint: None,
preload_column_index: false,
Expand Down Expand Up @@ -101,6 +106,14 @@ impl ParquetObjectReader {
}
}

/// Request a specific object version from the underlying [`ObjectStore`].
pub fn with_version(self, version: impl Into<String>) -> Self {
Self {
version: Some(version.into()),
..self
}
}

/// Whether to load the Column Index as part of [`Self::get_metadata`]
///
/// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`.
Expand Down Expand Up @@ -166,14 +179,17 @@ impl ParquetObjectReader {
None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
}
}

fn get_opts(&self, range: Option<GetRange>) -> GetOptions {
GetOptions::new()
.with_range(range)
.with_version(self.version.clone())
}
}

impl MetadataSuffixFetch for &mut ParquetObjectReader {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
let options = GetOptions {
range: Some(GetRange::Suffix(suffix as u64)),
..Default::default()
};
let options = self.get_opts(Some(GetRange::Suffix(suffix as u64)));
self.spawn(|store, path| {
async move {
let resp = store.get_opts(path, options).await?;
Expand All @@ -186,14 +202,42 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader {

impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
self.spawn(|store, path| store.get_range(path, range).boxed())
if self.version.is_some() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something but the version is not passed to the options... So how does it make it into the request?

I also think it would be easier to read this code if you use the same async closure and just changed the ptions

let mut options = self.get_opts(Some(GetRange::from(range)));
if let Some(version) = self.version.as_ref() {
  options = options.woth_version(version);
}
self.spawn(|store, path|  store.get_opts(path, options).boxed())

let options = self.get_opts(Some(GetRange::from(range)));
self.spawn(|store, path| {
async move {
let resp = store.get_opts(path, options).await?;
Ok::<_, ParquetError>(resp.bytes().await?)
}
.boxed()
Comment on lines +207 to +212
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the async closure here? Can we simplify this to something like this

                store.get_opts(path, options).await.boxed()

})
} else {
self.spawn(|store, path| store.get_range(path, range).boxed())
}
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
if self.version.is_some() {
let options = ranges
.into_iter()
.map(|range| self.get_opts(Some(GetRange::from(range))))
.collect::<Vec<_>>();
self.spawn(|store, path| {
async move {
try_join_all(options.into_iter().map(|options| async move {
let resp = store.get_opts(path, options).await?;
Ok::<_, ParquetError>(resp.bytes().await?)
}))
.await
}
.boxed()
})
} else {
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
}
}

// This method doesn't directly call `self.spawn` because all of the IO that is done down the
Expand Down Expand Up @@ -257,9 +301,12 @@ mod tests {

use futures::TryStreamExt;

use crate::arrow::ParquetRecordBatchStreamBuilder;
use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use crate::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
use crate::errors::ParquetError;
use arrow::array::{ArrayRef, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow::util::test_util::parquet_test_data;
use futures::FutureExt;
use object_store::local::LocalFileSystem;
Expand All @@ -278,6 +325,32 @@ mod tests {
(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

async fn get_generated_meta_store() -> (tempfile::TempDir, ObjectMeta, Arc<dyn ObjectStore>) {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("generated.parquet");

let schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int32,
true,
)]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef],
)
.unwrap();

let file = std::fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema, None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let store = LocalFileSystem::new_with_prefix(dir.path()).unwrap();
let meta = store.head(&Path::from("generated.parquet")).await.unwrap();

(dir, meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();
Expand Down Expand Up @@ -319,6 +392,21 @@ mod tests {
assert_eq!(batches[0].num_rows(), 8);
}

#[tokio::test]
async fn test_simple_with_version() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do these very the version is passed through? Do they fail if you revert the code changes?

let (_dir, meta, store) = get_generated_meta_store().await;

let object_reader = ParquetObjectReader::new(store, meta.location).with_version("v1");

let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
}

#[tokio::test]
async fn test_not_found() {
let (mut meta, store) = get_meta_store().await;
Expand Down
Loading