Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RecordReader trait and proc macro to implement it for a struct #4773

Merged
merged 16 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions parquet/src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

mod api;
pub mod reader;
mod record_reader;
mod record_writer;
mod triplet;

pub use self::{
api::{
Field, List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowColumnIter, RowFormatter,
},
record_reader::RecordReader,
record_writer::RecordWriter,
};
30 changes: 30 additions & 0 deletions parquet/src/record/record_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::super::errors::ParquetError;
use super::super::file::reader::RowGroupReader;

/// read up to `max_records` records from `row_group_reader` into `self`
/// The type parameter `T` is used to work around the rust orphan rule
/// when implementing on types such as `Vec<T>`.
pub trait RecordReader<T> {
Joseph-Rance marked this conversation as resolved.
Show resolved Hide resolved
fn read_from_row_group(
&mut self,
row_group_reader: &mut dyn RowGroupReader,
num_records: usize,
) -> Result<(), ParquetError>;
}
4 changes: 4 additions & 0 deletions parquet/src/record/record_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use crate::schema::types::TypePtr;
use super::super::errors::ParquetError;
use super::super::file::writer::SerializedRowGroupWriter;

/// `write_to_row_group` writes from `self` into `row_group_writer`
/// `schema` builds the schema used by `row_group_writer`
/// The type parameter `T` is used to work around the rust orphan rule
/// when implementing on types such as `&[T]`.
pub trait RecordWriter<T> {
fn write_to_row_group<W: std::io::Write + Send>(
&self,
Expand Down
51 changes: 45 additions & 6 deletions parquet_derive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

# Parquet Derive

A crate for deriving `RecordWriter` for arbitrary, _simple_ structs. This does not generate writers for arbitrarily nested
structures. It only works for primitives and a few generic structures and
various levels of reference. Please see features checklist for what is currently
A crate for deriving `RecordWriter` and `RecordReader` for arbitrary, _simple_ structs. This does not
generate readers or writers for arbitrarily nested structures. It only works for primitives and a few
generic structures and various levels of reference. Please see features checklist for what is currently
supported.

Derive also has some support for the chrono time library. You must must enable the `chrono` feature to get this support.
Expand Down Expand Up @@ -77,16 +77,55 @@ writer.close_row_group(row_group).unwrap();
writer.close().unwrap();
```

Example usage of deriving a `RecordReader` for your struct:

```rust
use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader};
use parquet_derive::ParquetRecordReader;

#[derive(ParquetRecordReader)]
struct ACompleteRecord {
pub a_bool: bool,
pub a_string: String,
pub i16: i16,
pub i32: i32,
pub u64: u64,
pub isize: isize,
pub float: f32,
pub double: f64,
pub now: chrono::NaiveDateTime,
pub byte_vec: Vec<u8>,
}

// Initialize your parquet file
let reader = SerializedFileReader::new(file).unwrap();
let mut row_group = reader.get_row_group(0).unwrap();

// create your records vector to read into
let mut chunks: Vec<ACompleteRecord> = Vec::new();

// The derived `RecordReader` takes over here
chunks.read_from_row_group(&mut *row_group, 1).unwrap();
```

## Features

- [x] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec<u8>`
- [ ] Support writing dictionaries
- [x] Support writing logical types like timestamp
- [x] Derive definition_levels for `Option`
- [ ] Derive definition levels for nested structures
- [x] Derive definition_levels for `Option` for writing
- [ ] Derive definition levels for nested structures for writing
- [ ] Derive writing tuple struct
- [ ] Derive writing `tuple` container types

- [x] Support reading `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec<u8>`
- [ ] Support reading/writing dictionaries
- [x] Support reading/writing logical types like timestamp
- [ ] Handle definition_levels for `Option` for reading
- [ ] Handle definition levels for nested structures for reading
- [ ] Derive reading/writing tuple struct
- [ ] Derive reading/writing `tuple` container types

## Requirements

- Same as `parquet-rs`
Expand All @@ -103,4 +142,4 @@ To compile and view in the browser, run `cargo doc --no-deps --open`.

## License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
88 changes: 87 additions & 1 deletion parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod parquet_field;
/// use parquet::file::writer::SerializedFileWriter;
///
/// use std::sync::Arc;
//
///
/// #[derive(ParquetRecordWriter)]
/// struct ACompleteRecord<'a> {
/// pub a_bool: bool,
Expand Down Expand Up @@ -137,3 +137,89 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
}
}).into()
}

/// Derive flat, simple RecordReader implementations. Works by parsing
/// a struct tagged with `#[derive(ParquetRecordReader)]` and emitting
/// the correct writing code for each field of the struct. Column readers
/// are generated in the order they are defined.
///
/// It is up to the programmer to keep the order of the struct
/// fields lined up with the schema.
///
/// Example:
///
/// ```ignore
/// use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader};
/// use parquet_derive::{ParquetRecordReader};
///
/// #[derive(ParquetRecordReader)]
/// struct ACompleteRecord {
/// pub a_bool: bool,
/// pub a_string: String,
/// }
///
/// pub fn read_some_records() -> Vec<ACompleteRecord> {
/// let mut samples: Vec<ACompleteRecord> = Vec::new();
///
/// let reader = SerializedFileReader::new(file).unwrap();
/// let mut row_group = reader.get_row_group(0).unwrap();
/// samples.read_from_row_group(&mut *row_group, 1).unwrap();
/// samples
/// }
/// ```
///
#[proc_macro_derive(ParquetRecordReader)]
pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input: DeriveInput = parse_macro_input!(input as DeriveInput);
let fields = match input.data {
Data::Struct(DataStruct { fields, .. }) => fields,
Data::Enum(_) => unimplemented!("Enum currently is not supported"),
Data::Union(_) => unimplemented!("Union currently is not supported"),
};

let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect();
let field_names: Vec<_> = fields.iter().map(|f| f.ident.clone()).collect();
let reader_snippets: Vec<proc_macro2::TokenStream> =
field_infos.iter().map(|x| x.reader_snippet()).collect();
let i: Vec<_> = (0..reader_snippets.len()).collect();

let derived_for = input.ident;
let generics = input.generics;

(quote! {

impl #generics ::parquet::record::RecordReader<#derived_for #generics> for Vec<#derived_for #generics> {
fn read_from_row_group(
&mut self,
row_group_reader: &mut dyn ::parquet::file::reader::RowGroupReader,
num_records: usize,
) -> Result<(), ::parquet::errors::ParquetError> {
use ::parquet::column::reader::ColumnReader;

let mut row_group_reader = row_group_reader;

for _ in 0..num_records {
self.push(#derived_for {
#(
#field_names: Default::default()
),*
})
}

let records = self; // Used by all the reader snippets to be more clear

#(
{
if let Ok(mut column_reader) = row_group_reader.get_column_reader(#i) {
#reader_snippets
} else {
return Err(::parquet::errors::ParquetError::General("Failed to get next column".into()))
}
}
);*

Ok(())
}
}
}).into()
}
Loading
Loading