diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index d557a99274ea..4dfb6a4ec3d3 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -156,6 +156,11 @@ pub struct FileScanConfig { /// Schema information including the file schema, table partition columns, /// and the combined table schema. /// + /// The table schema (file schema + partition columns) is the schema exposed + /// upstream of [`FileScanConfig`] (e.g. in [`DataSourceExec`]). + /// + /// See [`TableSchema`] for more information. + /// /// [`DataSourceExec`]: crate::source::DataSourceExec pub table_schema: TableSchema, /// List of files to be processed, grouped into partitions @@ -244,23 +249,19 @@ pub struct FileScanConfig { #[derive(Clone)] pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, - /// Table schema before any projections or partition columns are applied. + /// Schema information including the file schema, table partition columns, + /// and the combined table schema. /// - /// This schema is used to read the files, but is **not** necessarily the - /// schema of the physical files. Rather this is the schema that the + /// This schema is used to read the files, but the file schema is **not** necessarily + /// the schema of the physical files. Rather this is the schema that the /// physical file schema will be mapped onto, and the schema that the /// [`DataSourceExec`] will return. /// - /// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns. - /// - /// This probably would be better named `table_schema` - /// /// [`DataSourceExec`]: crate::source::DataSourceExec - file_schema: SchemaRef, + table_schema: TableSchema, file_source: Arc, limit: Option, projection: Option>, - table_partition_cols: Vec, constraints: Option, file_groups: Vec, statistics: Option, @@ -285,7 +286,7 @@ impl FileScanConfigBuilder { ) -> Self { Self { object_store_url, - file_schema, + table_schema: TableSchema::from_file_schema(file_schema), file_source, file_groups: vec![], statistics: None, @@ -294,7 +295,6 @@ impl FileScanConfigBuilder { new_lines_in_values: None, limit: None, projection: None, - table_partition_cols: vec![], constraints: None, batch_size: None, expr_adapter_factory: None, @@ -326,10 +326,13 @@ impl FileScanConfigBuilder { /// Set the partitioning columns pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { - self.table_partition_cols = table_partition_cols + let table_partition_cols: Vec = table_partition_cols .into_iter() .map(|f| Arc::new(f) as FieldRef) .collect(); + self.table_schema = self + .table_schema + .with_table_partition_cols(table_partition_cols); self } @@ -427,11 +430,10 @@ impl FileScanConfigBuilder { pub fn build(self) -> FileScanConfig { let Self { object_store_url, - file_schema, + table_schema, file_source, limit, projection, - table_partition_cols, constraints, file_groups, statistics, @@ -443,19 +445,16 @@ impl FileScanConfigBuilder { } = self; let constraints = constraints.unwrap_or_default(); - let statistics = - statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema)); + let statistics = statistics + .unwrap_or_else(|| Statistics::new_unknown(table_schema.file_schema())); let file_source = file_source .with_statistics(statistics.clone()) - .with_schema(Arc::clone(&file_schema)); + .with_schema(Arc::clone(table_schema.file_schema())); let file_compression_type = file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); - // Create TableSchema from file_schema and table_partition_cols - let table_schema = TableSchema::new(file_schema, table_partition_cols); - FileScanConfig { object_store_url, table_schema, @@ -477,7 +476,7 @@ impl From for FileScanConfigBuilder { fn from(config: FileScanConfig) -> Self { Self { object_store_url: config.object_store_url, - file_schema: Arc::clone(config.table_schema.file_schema()), + table_schema: config.table_schema, file_source: Arc::::clone(&config.file_source), file_groups: config.file_groups, statistics: config.file_source.statistics().ok(), @@ -486,7 +485,6 @@ impl From for FileScanConfigBuilder { new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, projection: config.projection, - table_partition_cols: config.table_schema.table_partition_cols().clone(), constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 9413bd9ef20b..8e95585ce873 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -121,6 +121,20 @@ impl TableSchema { } } + /// Create a new TableSchema from a file schema with no partition columns. + pub fn from_file_schema(file_schema: SchemaRef) -> Self { + Self::new(file_schema, vec![]) + } + + /// Set the table partition columns and rebuild the table schema. + pub fn with_table_partition_cols( + mut self, + table_partition_cols: Vec, + ) -> TableSchema { + self.table_partition_cols = table_partition_cols; + self + } + /// Get the file schema (without partition columns). /// /// This is the schema of the actual data files on disk.