Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

165 changes: 4 additions & 161 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,101 +51,14 @@ pub use datafusion_physical_expr::create_ordering;

#[cfg(all(test, feature = "parquet"))]
mod tests {

use crate::prelude::SessionContext;
use ::object_store::{path::Path, ObjectMeta};
use arrow::{
array::{Int32Array, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
datatypes::{DataType, Field, Schema},
};
use datafusion_common::{record_batch, test_util::batches_to_sort_string};
use datafusion_common::record_batch;
use datafusion_datasource::{
file::FileSource,
file_scan_config::FileScanConfigBuilder,
schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
SchemaMapper,
},
source::DataSourceExec,
PartitionedFile,
schema_adapter::DefaultSchemaAdapterFactory,
};
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_physical_plan::collect;
use std::{fs, sync::Arc};
use tempfile::TempDir;

#[tokio::test]
async fn can_override_schema_adapter() {
// Test shows that SchemaAdapter can add a column that doesn't existing in the
// record batches returned from parquet. This can be useful for schema evolution
// where older files may not have all columns.

use datafusion_execution::object_store::ObjectStoreUrl;
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
let f1 = Field::new("id", DataType::Int32, true);

let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
let filename = "part.parquet".to_string();
let path = table_dir.as_path().join(filename.clone());
let file = fs::File::create(path.clone()).unwrap();
let mut writer =
parquet::arrow::ArrowWriter::try_new(file, file_schema.clone(), None)
.unwrap();

let ids = Arc::new(Int32Array::from(vec![1i32]));
let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap();

writer.write(&rec_batch).unwrap();
writer.close().unwrap();

let location = Path::parse(path.to_str().unwrap()).unwrap();
let metadata = fs::metadata(path.as_path()).expect("Local file metadata");
let meta = ObjectMeta {
location,
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
size: metadata.len(),
e_tag: None,
version: None,
};

let partitioned_file = PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let f1 = Field::new("id", DataType::Int32, true);
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
let source = ParquetSource::new(Arc::clone(&schema))
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
.unwrap();
let base_conf =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(partitioned_file)
.build();

let parquet_exec = DataSourceExec::from_data_source(base_conf);

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let read = collect(parquet_exec, task_ctx).await.unwrap();

insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
+----+--------------+
| id | extra_column |
+----+--------------+
| 1 | foo |
+----+--------------+
"###);
}
use std::sync::Arc;

#[test]
fn default_schema_adapter() {
Expand Down Expand Up @@ -198,74 +111,4 @@ mod tests {
let err = mapper.map_batch(file_batch).unwrap_err().to_string();
assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}");
}

#[derive(Debug)]
struct TestSchemaAdapterFactory;

impl SchemaAdapterFactory for TestSchemaAdapterFactory {
fn create(
&self,
projected_table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(TestSchemaAdapter {
table_schema: projected_table_schema,
})
}
}

struct TestSchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter for TestSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if self.table_schema.fields().find(file_field.name()).is_some() {
projection.push(file_idx);
}
}

Ok((Arc::new(TestSchemaMapping {}), projection))
}
}

#[derive(Debug)]
struct TestSchemaMapping {}

impl SchemaMapper for TestSchemaMapping {
fn map_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let f1 = Field::new("id", DataType::Int32, true);
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1, f2]));

let extra_column = Arc::new(StringArray::from(vec!["foo"]));
let mut new_columns = batch.columns().to_vec();
new_columns.push(extra_column);

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}

fn map_column_statistics(
&self,
_file_col_statistics: &[datafusion_common::ColumnStatistics],
) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
unimplemented!()
}
}
}
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ datafusion-common-runtime = { workspace = true }
datafusion-datasource = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-adapter = { workspace = true }
Expand Down
Loading
Loading