Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 135 additions & 21 deletions arrow-avro/src/reader/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::compression::CompressionCodec;
use crate::reader::Decoder;
use crate::reader::block::{BlockDecoder, BlockDecoderState};
use arrow_array::RecordBatch;
use arrow_schema::ArrowError;
use arrow_schema::{ArrowError, SchemaRef};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
Expand Down Expand Up @@ -173,6 +173,13 @@ impl<R> AsyncAvroFileReader<R> {
}
}

/// Returns the Arrow schema for batches produced by this reader.
///
/// The schema is determined by the writer schema in the file and the reader schema provided to the builder.
pub fn schema(&self) -> SchemaRef {
self.decoder.schema()
}

/// Calculate the byte range needed to complete the current block.
/// Only valid when block_decoder is in Data or Sync state.
/// Returns the range to fetch, or an error if EOF would be reached.
Expand Down Expand Up @@ -534,7 +541,9 @@ impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> {
#[cfg(all(test, feature = "object_store"))]
mod tests {
use super::*;
use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
use crate::schema::{
AVRO_NAME_METADATA_KEY, AVRO_NAMESPACE_METADATA_KEY, AvroSchema, SCHEMA_METADATA_KEY,
};
use arrow_array::cast::AsArray;
use arrow_array::types::{Int32Type, Int64Type};
use arrow_array::*;
Expand Down Expand Up @@ -758,39 +767,63 @@ mod tests {
vec![Field::new("f1_3_1", DataType::Float64, false)].into(),
),
false,
),
)
.with_metadata(HashMap::from([
(AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns3".to_owned()),
(AVRO_NAME_METADATA_KEY.to_owned(), "record3".to_owned()),
])),
]
.into(),
),
false,
),
)
.with_metadata(HashMap::from([
(AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns2".to_owned()),
(AVRO_NAME_METADATA_KEY.to_owned(), "record2".to_owned()),
])),
Field::new(
"f2",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(
vec![
Field::new("f2_1", DataType::Boolean, false),
Field::new("f2_2", DataType::Float32, false),
]
.into(),
),
false,
))),
DataType::List(Arc::new(
Field::new(
"item",
DataType::Struct(
vec![
Field::new("f2_1", DataType::Boolean, false),
Field::new("f2_2", DataType::Float32, false),
]
.into(),
),
false,
)
.with_metadata(HashMap::from([
(AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns4".to_owned()),
(AVRO_NAME_METADATA_KEY.to_owned(), "record4".to_owned()),
])),
)),
false,
),
Field::new(
"f3",
DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, false)].into()),
true,
),
)
.with_metadata(HashMap::from([
(AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns5".to_owned()),
(AVRO_NAME_METADATA_KEY.to_owned(), "record5".to_owned()),
])),
Field::new(
"f4",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(vec![Field::new("f4_1", DataType::Int64, false)].into()),
true,
))),
DataType::List(Arc::new(
Field::new(
"item",
DataType::Struct(vec![Field::new("f4_1", DataType::Int64, false)].into()),
true,
)
.with_metadata(HashMap::from([
(AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns6".to_owned()),
(AVRO_NAME_METADATA_KEY.to_owned(), "record6".to_owned()),
])),
)),
false,
),
])
Expand Down Expand Up @@ -1538,6 +1571,87 @@ mod tests {
assert!(err.to_string().contains("Duplicate projection index"));
}

#[tokio::test]
async fn test_arrow_schema_from_reader_no_reader_schema() {
// Use a very small header size hint to force multiple fetches
let file = arrow_test_data("avro/alltypes_plain.avro");
let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
let location = Path::from_filesystem_path(&file).unwrap();
let file_size = store.head(&location).await.unwrap().size;

let file_reader = AvroObjectReader::new(store, location);
let expected_schema = get_alltypes_schema()
.as_ref()
.clone()
.with_metadata(Default::default());

let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
.try_build()
.await
.unwrap();

assert_eq!(reader.schema().as_ref(), &expected_schema);

let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
let batch = &batches[0];

assert_eq!(batch.schema().as_ref(), &expected_schema);
}

#[tokio::test]
async fn test_arrow_schema_from_reader_with_reader_schema() {
// Use a very small header size hint to force multiple fetches
let file = arrow_test_data("avro/alltypes_plain.avro");
let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
let location = Path::from_filesystem_path(&file).unwrap();
let file_size = store.head(&location).await.unwrap().size;

let file_reader = AvroObjectReader::new(store, location);
let schema = get_alltypes_schema();
let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
let expected_schema = schema.as_ref().clone().with_metadata(Default::default());

let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
.with_reader_schema(reader_schema)
.try_build()
.await
.unwrap();

assert_eq!(reader.schema().as_ref(), &expected_schema);

let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
let batch = &batches[0];

assert_eq!(batch.schema().as_ref(), &expected_schema);
}

#[tokio::test]
async fn test_arrow_schema_from_reader_nested_records() {
// Use a very small header size hint to force multiple fetches
let file = arrow_test_data("avro/nested_records.avro");
let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
let location = Path::from_filesystem_path(&file).unwrap();
let file_size = store.head(&location).await.unwrap().size;

let file_reader = AvroObjectReader::new(store, location);
let expected_schema = get_nested_records_schema()
.as_ref()
.clone()
.with_metadata(Default::default());

let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
.try_build()
.await
.unwrap();

assert_eq!(reader.schema().as_ref(), &expected_schema);

let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
let batch = &batches[0];

assert_eq!(batch.schema().as_ref(), &expected_schema);
}

#[tokio::test]
async fn test_with_header_size_hint_small() {
// Use a very small header size hint to force multiple fetches
Expand Down
Loading