Skip to content
Merged
122 changes: 54 additions & 68 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,35 @@ mod tests {
use crate::prelude::SessionContext;
use ::object_store::{path::Path, ObjectMeta};
use arrow::{
array::{Int32Array, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
array::Int32Array,
datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{record_batch, test_util::batches_to_sort_string};
use datafusion_common::{
record_batch,
test_util::batches_to_sort_string,
tree_node::{Transformed, TransformedResult, TreeNode},
Result, ScalarValue,
};
use datafusion_datasource::{
file::FileSource,
file_scan_config::FileScanConfigBuilder,
schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
SchemaMapper,
},
source::DataSourceExec,
schema_adapter::DefaultSchemaAdapterFactory, source::DataSourceExec,
PartitionedFile,
};
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr_adapter::{
PhysicalExprAdapter, PhysicalExprAdapterFactory,
};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::collect;
use std::{fs, sync::Arc};
use tempfile::TempDir;

#[tokio::test]
async fn can_override_schema_adapter() {
// Test shows that SchemaAdapter can add a column that doesn't existing in the
// record batches returned from parquet. This can be useful for schema evolution
async fn can_override_physical_expr_adapter() {
// Test shows that PhysicalExprAdapter can add a column that doesn't exist in the
// record batches returned from parquet. This can be useful for schema evolution
// where older files may not have all columns.

use datafusion_execution::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -124,12 +129,11 @@ mod tests {
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
let source = ParquetSource::new(Arc::clone(&schema))
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
.unwrap();
let source = Arc::new(ParquetSource::new(Arc::clone(&schema)));
let base_conf =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(partitioned_file)
.with_expr_adapter(Some(Arc::new(TestPhysicalExprAdapterFactory)))
.build();

let parquet_exec = DataSourceExec::from_data_source(base_conf);
Expand Down Expand Up @@ -200,72 +204,54 @@ mod tests {
}

#[derive(Debug)]
struct TestSchemaAdapterFactory;
struct TestPhysicalExprAdapterFactory;

impl SchemaAdapterFactory for TestSchemaAdapterFactory {
impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory {
fn create(
&self,
projected_table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(TestSchemaAdapter {
table_schema: projected_table_schema,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(TestPhysicalExprAdapter {
logical_file_schema,
physical_file_schema,
})
}
}

struct TestSchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
#[derive(Debug)]
struct TestPhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
}

impl SchemaAdapter for TestSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if self.table_schema.fields().find(file_field.name()).is_some() {
projection.push(file_idx);
impl PhysicalExprAdapter for TestPhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
expr.transform(|e| {
if let Some(column) = e.as_any().downcast_ref::<Column>() {
// If column is "extra_column" and missing from physical schema, inject "foo"
if column.name() == "extra_column"
&& self.physical_file_schema.index_of("extra_column").is_err()
{
return Ok(Transformed::yes(Arc::new(Literal::new(
ScalarValue::Utf8(Some("foo".to_string())),
))
as Arc<dyn PhysicalExpr>));
}
}
}

Ok((Arc::new(TestSchemaMapping {}), projection))
}
}

#[derive(Debug)]
struct TestSchemaMapping {}

impl SchemaMapper for TestSchemaMapping {
fn map_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let f1 = Field::new("id", DataType::Int32, true);
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1, f2]));

let extra_column = Arc::new(StringArray::from(vec!["foo"]));
let mut new_columns = batch.columns().to_vec();
new_columns.push(extra_column);

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
Ok(Transformed::no(e))
})
.data()
}

fn map_column_statistics(
fn with_partition_values(
&self,
_file_col_statistics: &[datafusion_common::ColumnStatistics],
) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
unimplemented!()
_partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(TestPhysicalExprAdapter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
})
}
}
}
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ mod tests {
("c3", c3.clone()),
]);

// batch2: c3(int8), c2(int64), c1(string), c4(string)
// batch2: c3(date64), c2(int64), c1(string)
let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);

let table_schema = Schema::new(vec![
Expand All @@ -1272,7 +1272,7 @@ mod tests {
.round_trip_to_batches(vec![batch1, batch2])
.await;
assert_contains!(read.unwrap_err().to_string(),
"Cannot cast file schema field c3 of type Date64 to table schema field of type Int8");
"Cannot cast column 'c3' from 'Date64' (physical data type) to 'Int8' (logical data type)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks the opposite to me - Int should be the physical type and Date is the logical one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and I think this is right.

From above:

let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);

And:

let c4: ArrayRef = Arc::new(Date64Array::from(vec![
    Some(86400000),
    None,
    Some(259200000),
]));

But in the schema:

Field::new("c3", DataType::Int8, true)

So in the physical data c3 has the data type Date64 and the logical type is Int8

}

#[tokio::test]
Expand Down
Loading
Loading