Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into rm-count
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 13, 2024
2 parents 384291a + b7d2aea commit bd3dd03
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 19 deletions.
73 changes: 73 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,26 @@ async fn fetch_schema(
}

/// Read and parse the statistics of the Parquet file at location `path`
///
/// See [`statistics_from_parquet_meta`] for more details
async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
statistics_from_parquet_meta(&metadata, table_schema).await
}

/// Convert statistics in [`ParquetMetaData`] into [`Statistics`]
///
/// The statistics are calculated for each column in the table schema
/// using the row group statistics in the parquet metadata.
pub async fn statistics_from_parquet_meta(
metadata: &ParquetMetaData,
table_schema: SchemaRef,
) -> Result<Statistics> {
let file_metadata = metadata.file_metadata();

let file_schema = parquet_to_arrow_schema(
Expand Down Expand Up @@ -1402,6 +1415,66 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_statistics_from_parquet_metadata() -> Result<()> {
// Data for column c1: ["Foo", null, "bar"]
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();

// Data for column c2: [1, 2, null]
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

// Use store_parquet to write each batch to its own file
// . batch1 written into first file and includes:
// - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values
// . batch2 written into second file and includes:
// - column c2 that has 3 rows with one null. Stats min and max of int are avaialble and 1 and 2 respectively
let store = Arc::new(LocalFileSystem::new()) as _;
let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?;

let state = SessionContext::new().state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&state, &store, &files).await.unwrap();

let null_i64 = ScalarValue::Int64(None);
let null_utf8 = ScalarValue::Utf8(None);

// Fetch statistics for first file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
//
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c1_stats.max_value, Precision::Absent);
assert_eq!(c1_stats.min_value, Precision::Absent);
// column c2: missing from the file so the table treats all 3 rows as null
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(3));
assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone()));
assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));

// Fetch statistics for second file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1: missing from the file so the table treats all 3 rows as null
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(3));
assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone()));
assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone()));
// column c2
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(1));
assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into()));
assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into()));

Ok(())
}

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
// select / skip all 20 rows in row group 1
// specifies all 20 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
Expand Down Expand Up @@ -463,7 +463,7 @@ mod test {
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
// specify only 12 rows in selection, but row group 1 has 20
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
Expand All @@ -484,7 +484,7 @@ mod test {
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
// specify 22 rows in selection, but row group 1 has only 20
RowGroupAccess::Selection(
vec![
RowSelector::select(10),
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ pub use writer::plan_to_parquet;
/// used to implement external indexes on top of parquet files and select only
/// portions of the files.
///
/// The `ParquetExec` will try and further reduce any provided
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
/// other settings.
/// The `ParquetExec` will try and reduce any provided `ParquetAccessPlan`
/// further based on the contents of `ParquetMetadata` and other settings.
///
/// ## Example of providing a ParquetAccessPlan
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ fn create_initial_plan(

// check row group count matches the plan
return Ok(access_plan.clone());
} else {
debug!("ParquetExec Ignoring unknown extension specified for {file_name}");
}
}

Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ async fn test_semi_join_1k() {
.await
}

// The test is flaky
// https://github.com/apache/datafusion/issues/10886
#[ignore]
#[tokio::test]
async fn test_semi_join_1k_filtered() {
JoinFuzzTestCase::new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
}
}
Expr::Literal(_) => {
indexes.push(std::usize::MAX);
indexes.push(usize::MAX);
}
_ => {}
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ impl BuiltInWindowFunctionExpr for NthValue {

fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
let state = NthValueState {
range: Default::default(),
finalized_result: None,
kind: self.kind,
};
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ pub enum NthValueKind {

#[derive(Debug, Clone)]
pub struct NthValueState {
pub range: Range<usize>,
// In certain cases, we can finalize the result early. Consider this usage:
// ```
// FIRST_VALUE(increasing_col) OVER window AS my_first_value
Expand Down
10 changes: 5 additions & 5 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ impl DataSinkExec {
&self.sort_order
}

/// Returns the metrics of the underlying [DataSink]
pub fn metrics(&self) -> Option<MetricsSet> {
self.sink.metrics()
}

fn create_schema(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
Expand Down Expand Up @@ -289,6 +284,11 @@ impl ExecutionPlan for DataSinkExec {
stream,
)))
}

/// Returns the metrics of the underlying [DataSink]
fn metrics(&self) -> Option<MetricsSet> {
self.sink.metrics()
}
}

/// Create a output record batch with a count
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::{any::Any, usize, vec};
use std::{any::Any, vec};

use super::{
utils::{OnceAsync, OnceFut},
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::usize;

use crate::joins::utils::{JoinFilter, JoinHashMapType};
use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{usize, vec};
use std::vec;

use crate::common::SharedMemoryReservation;
use crate::handle_state;
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/joins/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! This file has test utils for hash joins

use std::sync::Arc;
use std::usize;

use crate::joins::utils::{JoinFilter, JoinOn};
use crate::joins::{
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::future::Future;
use std::ops::{IndexMut, Range};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::usize;

use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder};
use crate::{ColumnStatistics, ExecutionPlan, Partitioning, Statistics};
Expand Down

0 comments on commit bd3dd03

Please sign in to comment.