Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Efficiently and correctly extract parquet statistics into ArrayRefs #10453

Closed
18 of 23 tasks
alamb opened this issue May 10, 2024 · 10 comments
Closed
18 of 23 tasks

[EPIC] Efficiently and correctly extract parquet statistics into ArrayRefs #10453

alamb opened this issue May 10, 2024 · 10 comments
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented May 10, 2024

Is your feature request related to a problem or challenge?

There are at least three places that parquet statistics are extracted into ArrayRefs today

  1. ParquetExec (pruning Row Groups): https://github.com/apache/datafusion/blob/465c89f7f16d48b030d4a384733567b91dab88fa/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L18-L17

Not only are there three copies of the code, they are all subtly different (e.g. #8295) and have varying degrees of testing

Describe the solution you'd like

I would like one API with the following properties:

  1. Extracts statistics from one or more parquet files as ArrayRefs suitable to pass to PruningPredicate
  2. Does so correctly (applies the appropriate schema coercion / conversion rules)
  3. Does so quickly and efficiently (e.g. does not do this once per row group), is suitable for 1000s of parquet files

Describe alternatives you've considered

Some ideas from apache/arrow-rs#4328

Subtasks

Follow on projects:

Here is a proposed API:

/// statistics extracted from `Statistics` as Arrow `ArrayRef`s
///
/// # Note:
/// If the corresponding `Statistics` is not present, or has no information for 
/// a column, a NULL is present in the  corresponding array entry
pub struct ArrowStatistics {
  /// min values
  min: ArrayRef,
  /// max values
  max: ArrayRef,
  /// Row counts (UInt64Array)
  row_count: ArrayRef,
  /// Null Counts (UInt64Array)
  null_count: ArrayRef,
}

// (TODO accessors for min/max/row_count/null_count)

/// Extract `ArrowStatistics` from the  parquet [`Statistics`]
pub fn parquet_stats_to_arrow(
    arrow_datatype: &DataType,
    statistics: impl IntoIterator<Item = Option<&Statistics>>
) -> Result<ArrowStatisics> {
  todo!()
}

Maybe it would make sense to have something more builder style:

struct ParquetStatisticsExtractor {
...
}

// create an extractor that can extract data from parquet files 
let extractor = ParquetStatisticsExtractor::new(arrow_schema, parquet_schema)

// get parquet statistics (one for each row group) somehow:
let parquet_stats: Vec<&Statistics> = ...;

// extract min/max values for column "a" and "b";
let col_a stats = extractor.extract("a", parquet_stats.iter());
let col_b stats = extractor.extract("b", parquet_stats.iter());

(This is similar to the existing API parquet::arrow::parquet_to_arrow_schema)

Note Statistics above is Statistics

There is a version of this code here in DataFusion that could perhaps be adapted:

pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
let scalars = iterator
.map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type))));
collect_scalars(data_type, scalars)
}

Testing

I suggest we add a new module to the existing parquet test in https://github.com/apache/datafusion/blob/main/datafusion/core/tests/parquet_exec.rs

The tests should look like:

let record_batch = make_batch_with_relevant_datatype();
// write batch/batches to file
// open file / extract stats from metadata
// compare stats

I can help writing these tests

I personally suggest:

  1. Make a PR with the basic API and a single basic types (like Int/UInt or String) and figure out the test pattern (I can definitely help here)
  2. Then we can fill out support for the rest of the types in a follow on PR

cc @tustvold in case you have other ideas

Additional context

This code likely eventually would be good to have in the parquet crate -- see apache/arrow-rs#4328. However, I think initially we should do it in DataFusion to iterate faster and figure out the API before moving it up there

There are a bunch of related improvements that I think become much simpler with this feature:

  1. Consolidate statistics aggregation #8229
@alamb
Copy link
Contributor Author

alamb commented May 14, 2024

In terms of sequencing of this feature what I would recommend

First PR

Purpose: Sketch out the API, and test framework

  1. Create a test framework for this
  2. Create the basic API and extract min/max values for Int64 columns

Second PR (draft)

purpose: demonstrate the API can be used in DataFusion, also ensure test coverage is adequate
Update one of the uses of parquet statistics (like ListingTable) to use the new API. @alamb would like to do this if I have time

Third+Fourth+... PRs

Add support for the remaining datatypes, along with tests
This part can be parallelized into multiple PRs

@NGA-TRAN
Copy link
Contributor

I start working on the first PR

@alamb
Copy link
Contributor Author

alamb commented May 17, 2024

After working through an actual example in #10549 I have a new API proposal: NGA-TRAN#118

Here is what the API looks like

/// What type of statistics should be extracted?
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RequestedStatistics {
    /// Minimum Value
    Min,
    /// Maximum Value
    Max,
    /// Null Count, returned as a [`UInt64Array`])
    NullCount,
}

/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
/// conversions. This information can be used for pruning parquet files or row
/// groups based on the statistics embedded in parquet files
///
/// # Schemas
///
/// The schema of the parquet file and the arrow schema are used to convert the
/// underlying statistics value (stored as a parquet value) into the
/// corresponding Arrow  value. For example, Decimals are stored as binary in
/// parquet files.
///
/// The parquet_schema and arrow _schema do not have to be identical (for
/// example, the columns may be in different orders and one or the other schemas
/// may have additional columns). The function [`parquet_column`] is used to
/// match the column in the parquet file to the column in the arrow schema.
///
/// # Multiple parquet files
///
/// This API is designed to support efficiently extracting statistics from
/// multiple parquet files (hence why the parquet schema is passed in as an
/// argument). This is useful when building an index for a directory of parquet
/// files.
///
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
    /// The name of the column to extract statistics for
    column_name: &'a str,
    /// The type of statistics to extract
    statistics_type: RequestedStatistics,
    /// The arrow schema of the query
    arrow_schema: &'a Schema,
    /// The field (with data type) of the column in the arrow schema
    arrow_field: &'a Field,
}

impl<'a> StatisticsConverter<'a> {
    /// Returns a [`UInt64Array`] with counts for each row group
    ///
    /// The returned array has no nulls, and has one value for each row group.
    /// Each value is the number of rows in the row group.
    pub fn row_counts(metadata: &ParquetMetaData) -> Result<UInt64Array> {
...
    }

    /// create an new statistics converter
    pub fn try_new(
        column_name: &'a str,
        statistics_type: RequestedStatistics,
        arrow_schema: &'a Schema,
    ) -> Result<Self> {
...
    }

    /// extract the statistics from a parquet file, given the parquet file's metadata
    ///
    /// The returned array contains 1 value for each row group in the parquet
    /// file in order
    ///
    /// Each value is either
    /// * the requested statistics type for the column
    /// * a null value, if the statistics can not be extracted
    ///
    /// Note that a null value does NOT mean the min or max value was actually
    /// `null` it means it the requested statistic is unknown
    ///
    /// Reasons for not being able to extract the statistics include:
    /// * the column is not present in the parquet file
    /// * statistics for the column are not present in the row group
    /// * the stored statistic value can not be converted to the requested type
    pub fn extract(&self, metadata: &ParquetMetaData) -> Result<ArrayRef> {
...
    }
}

I am envisioning this API could also easily support

Extract from multiple files in one go

impl<'a> StatisticsConverter<'a> {
..
/// Extract metadata from multiple parquet files into an single arrow array
/// one element per row group per file
fn extract_multi(&self, metadata: impl IntoIterator<Item = &ParquetMetadata>))-> Result<ArrayRef> {
...
}

Extract information from the page index as well

impl<'a> StatisticsConverter<'a> {
..
/// Extract metadata from page indexes across all row groups. The returned array has one element
/// per page across all row groups
fn extract_page(&self, metadata: impl IntoIterator<Item = &ParquetMetadata>))-> Result<ArrayRef> {
...
}

@NGA-TRAN
Copy link
Contributor

@alamb I have created 2 more bug tickets but I cannot edit the description to add them in the subtasks. Can you help with that?

  1. Incorrect statistics read for unsigned integer columns in parquet #10604
  2. Incorrect statistics read for binary columns in parquet  #10605

@alamb
Copy link
Contributor Author

alamb commented May 21, 2024

@alamb I have created 2 more bug tickets but I cannot edit the description to add them in the subtasks. Can you help with that?

Done

@NGA-TRAN
Copy link
Contributor

@alamb Another bug: #10609

@xinlifoobar
Copy link
Contributor

@alamb just hint #10605 is also closed.

@alamb
Copy link
Contributor Author

alamb commented Jun 5, 2024

FYI I have a proposed API change in #10806

@alamb
Copy link
Contributor Author

alamb commented Jun 11, 2024

Given how far we have come with this ticket, I plan to close this ticket and do some organizing of the remaining tasks as follow on tickets / epics

@alamb
Copy link
Contributor Author

alamb commented Jun 14, 2024

This issue is done enough -- I am consolidating the remaining todo items under #10922

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants