Skip to content

Commit

Permalink
refactor: Move SchemaAdapter from parquet module to data source (#10680)
Browse files Browse the repository at this point in the history
* refactor: Move SchemaAdapter from parquet module to data source

This is not a change in behavior except moving the public location of SchemaAdapter.  SchemaAdapter was exposed
in #10515 to allow callers to define their own implementation.  This PR then changes the location so that it could be used in other
data sources.

* fix comments surrounding tests to be accurate.
  • Loading branch information
HawaiianSpork committed May 27, 2024
1 parent 338c6e6 commit 7c08a6f
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 369 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use crate::arrow::array::{
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{
DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
SchemaAdapterFactory,
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, ParquetExec};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod listing_table_factory;
pub mod memory;
pub mod physical_plan;
pub mod provider;
pub mod schema_adapter;
mod statistics;
pub mod stream;
pub mod streaming;
Expand Down
136 changes: 7 additions & 129 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,7 @@ mod statistics;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
pub use self::parquet::{
ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
SchemaAdapterFactory, SchemaMapper,
};
#[cfg(feature = "parquet")]
use arrow::{
array::new_null_array,
compute::{can_cast_types, cast},
datatypes::Schema,
record_batch::{RecordBatch, RecordBatchOptions},
};
#[cfg(feature = "parquet")]
use datafusion_common::plan_err;
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};

pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
Expand Down Expand Up @@ -247,119 +235,6 @@ where
Ok(())
}

#[cfg(feature = "parquet")]
#[derive(Clone, Debug, Default)]
pub(crate) struct DefaultSchemaAdapterFactory {}

#[cfg(feature = "parquet")]
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
Box::new(DefaultSchemaAdapter { table_schema })
}
}

#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
pub(crate) struct DefaultSchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

#[cfg(feature = "parquet")]
impl SchemaAdapter for DefaultSchemaAdapter {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.table_schema.fields().find(file_field.name())
{
match can_cast_types(file_field.data_type(), table_field.data_type()) {
true => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
false => {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)
}
}
}
}

Ok((
Arc::new(SchemaMapping {
table_schema: self.table_schema.clone(),
field_mappings,
}),
projection,
))
}
}

/// The SchemaMapping struct holds a mapping from the file schema to the table schema
/// and any necessary type conversions that need to be applied.
#[cfg(feature = "parquet")]
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result.
table_schema: SchemaRef,
/// Mapping from field index in `table_schema` to index in projected file_schema
field_mappings: Vec<Option<usize>>,
}

#[cfg(feature = "parquet")]
impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();

let cols = self
.table_schema
.fields()
.iter()
.zip(&self.field_mappings)
.map(|(field, file_idx)| match file_idx {
Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()),
None => Ok(new_null_array(field.data_type(), batch_rows)),
})
.collect::<Result<Vec<_>, _>>()?;

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = self.table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}

/// A single file or part of a file that should be read, along with its schema, statistics
pub struct FileMeta {
/// Path for the file (e.g. URL, filesystem path, etc)
Expand Down Expand Up @@ -621,11 +496,14 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{Float32Type, Float64Type, UInt32Type};
use arrow_array::{
BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, StringArray,
UInt64Array,
BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch,
StringArray, UInt64Array,
};
use arrow_schema::Field;
use arrow_schema::{Field, Schema};

use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use chrono::Utc;

#[test]
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, DisplayAs,
FileGroupPartitioner, FileMeta, FileScanConfig,
parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner,
FileMeta, FileScanConfig,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
Expand Down Expand Up @@ -67,12 +67,13 @@ mod metrics;
mod page_filter;
mod row_filter;
mod row_groups;
mod schema_adapter;
mod statistics;

use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
Expand Down

This file was deleted.

Loading

0 comments on commit 7c08a6f

Please sign in to comment.