Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Move SchemaAdapter from parquet module to data source #10680

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use crate::arrow::array::{
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{
DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
SchemaAdapterFactory,
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, ParquetExec};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod listing_table_factory;
pub mod memory;
pub mod physical_plan;
pub mod provider;
pub mod schema_adapter;
mod statistics;
pub mod stream;
pub mod streaming;
Expand Down
136 changes: 7 additions & 129 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,7 @@ mod statistics;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
pub use self::parquet::{
ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
SchemaAdapterFactory, SchemaMapper,
};
#[cfg(feature = "parquet")]
use arrow::{
array::new_null_array,
compute::{can_cast_types, cast},
datatypes::Schema,
record_batch::{RecordBatch, RecordBatchOptions},
};
#[cfg(feature = "parquet")]
use datafusion_common::plan_err;
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};

pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
Expand Down Expand Up @@ -247,119 +235,6 @@ where
Ok(())
}

#[cfg(feature = "parquet")]
#[derive(Clone, Debug, Default)]
pub(crate) struct DefaultSchemaAdapterFactory {}

#[cfg(feature = "parquet")]
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
Box::new(DefaultSchemaAdapter { table_schema })
}
}

#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
pub(crate) struct DefaultSchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

#[cfg(feature = "parquet")]
impl SchemaAdapter for DefaultSchemaAdapter {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.table_schema.fields().find(file_field.name())
{
match can_cast_types(file_field.data_type(), table_field.data_type()) {
true => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
false => {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)
}
}
}
}

Ok((
Arc::new(SchemaMapping {
table_schema: self.table_schema.clone(),
field_mappings,
}),
projection,
))
}
}

/// The SchemaMapping struct holds a mapping from the file schema to the table schema
/// and any necessary type conversions that need to be applied.
#[cfg(feature = "parquet")]
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result.
table_schema: SchemaRef,
/// Mapping from field index in `table_schema` to index in projected file_schema
field_mappings: Vec<Option<usize>>,
}

#[cfg(feature = "parquet")]
impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();

let cols = self
.table_schema
.fields()
.iter()
.zip(&self.field_mappings)
.map(|(field, file_idx)| match file_idx {
Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()),
None => Ok(new_null_array(field.data_type(), batch_rows)),
})
.collect::<Result<Vec<_>, _>>()?;

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = self.table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}

/// A single file or part of a file that should be read, along with its schema, statistics
pub struct FileMeta {
/// Path for the file (e.g. URL, filesystem path, etc)
Expand Down Expand Up @@ -621,11 +496,14 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{Float32Type, Float64Type, UInt32Type};
use arrow_array::{
BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, StringArray,
UInt64Array,
BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch,
StringArray, UInt64Array,
};
use arrow_schema::Field;
use arrow_schema::{Field, Schema};

use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use chrono::Utc;

#[test]
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, DisplayAs,
FileGroupPartitioner, FileMeta, FileScanConfig,
parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner,
FileMeta, FileScanConfig,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
Expand Down Expand Up @@ -67,12 +67,13 @@ mod metrics;
mod page_filter;
mod row_filter;
mod row_groups;
mod schema_adapter;
mod statistics;

use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
Expand Down

This file was deleted.

Loading