From c63875b43ec7a81a27b999d75c3b6557bb2043e1 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Tue, 25 Jun 2024 07:25:19 -0700 Subject: [PATCH 1/2] feat: Add support for Timestamp data types in data page statistics. --- .../physical_plan/parquet/statistics.rs | 30 +++++++++++++++++ .../core/tests/parquet/arrow_statistics.rs | 32 +++++++++---------- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 17bfe72dbd65..4d1da87f14c8 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -713,6 +713,36 @@ macro_rules! get_data_page_statistics { )), Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Timestamp(unit, timezone)) => { + let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); + + Ok(match unit { + TimeUnit::Second => { + Arc::new(match timezone { + Some(tz) => TimestampSecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampSecondArray::from_iter(iter), + }) + } + TimeUnit::Millisecond => { + Arc::new(match timezone { + Some(tz) => TimestampMillisecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampMillisecondArray::from_iter(iter), + }) + } + TimeUnit::Microsecond => { + Arc::new(match timezone { + Some(tz) => TimestampMicrosecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampMicrosecondArray::from_iter(iter), + }) + } + TimeUnit::Nanosecond => { + Arc::new(match timezone { + Some(tz) => TimestampNanosecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampNanosecondArray::from_iter(iter), + }) + } + }) + }, _ => unimplemented!() } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index cd6985b311c3..596015d581e2 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -747,7 +747,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "nanos", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -776,7 +776,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "nanos_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -798,7 +798,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "micros", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -827,7 +827,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "micros_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -849,7 +849,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "millis", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -878,7 +878,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "millis_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -900,7 +900,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "seconds", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -929,7 +929,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "seconds_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); } @@ -975,7 +975,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "nanos", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1002,7 +1002,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "nanos_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1022,7 +1022,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "micros", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1049,7 +1049,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "micros_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1069,7 +1069,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "millis", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1096,7 +1096,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "millis_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1116,7 +1116,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "seconds", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1143,7 +1143,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "seconds_timezoned", - check: Check::RowGroup, + check: Check::Both, } .run(); } From d29737c2b587867912c71c29f6609f242dd5b3d9 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Tue, 25 Jun 2024 18:04:10 -0700 Subject: [PATCH 2/2] Simplify array creation using with_timezone_opt. --- .../physical_plan/parquet/statistics.rs | 58 +++---------------- 1 file changed, 8 insertions(+), 50 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 4d1da87f14c8..44bacbdae147 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -354,32 +354,11 @@ macro_rules! get_statistics { ))), DataType::Timestamp(unit, timezone) =>{ let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()); - Ok(match unit { - TimeUnit::Second => { - Arc::new(match timezone { - Some(tz) => TimestampSecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampSecondArray::from_iter(iter), - }) - } - TimeUnit::Millisecond => { - Arc::new(match timezone { - Some(tz) => TimestampMillisecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampMillisecondArray::from_iter(iter), - }) - } - TimeUnit::Microsecond => { - Arc::new(match timezone { - Some(tz) => TimestampMicrosecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampMicrosecondArray::from_iter(iter), - }) - } - TimeUnit::Nanosecond => { - Arc::new(match timezone { - Some(tz) => TimestampNanosecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampNanosecondArray::from_iter(iter), - }) - } + TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), }) }, DataType::Time32(unit) => { @@ -715,32 +694,11 @@ macro_rules! get_data_page_statistics { Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Timestamp(unit, timezone)) => { let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); - Ok(match unit { - TimeUnit::Second => { - Arc::new(match timezone { - Some(tz) => TimestampSecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampSecondArray::from_iter(iter), - }) - } - TimeUnit::Millisecond => { - Arc::new(match timezone { - Some(tz) => TimestampMillisecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampMillisecondArray::from_iter(iter), - }) - } - TimeUnit::Microsecond => { - Arc::new(match timezone { - Some(tz) => TimestampMicrosecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampMicrosecondArray::from_iter(iter), - }) - } - TimeUnit::Nanosecond => { - Arc::new(match timezone { - Some(tz) => TimestampNanosecondArray::from_iter(iter).with_timezone(tz.clone()), - None => TimestampNanosecondArray::from_iter(iter), - }) - } + TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), }) }, _ => unimplemented!()