Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ pub struct FileScanConfig {
/// Schema information including the file schema, table partition columns,
/// and the combined table schema.
///
/// The table schema (file schema + partition columns) is the schema exposed
/// upstream of [`FileScanConfig`] (e.g. in [`DataSourceExec`]).
///
/// See [`TableSchema`] for more information.
///
/// [`DataSourceExec`]: crate::source::DataSourceExec
pub table_schema: TableSchema,
/// List of files to be processed, grouped into partitions
Expand Down Expand Up @@ -244,23 +249,19 @@ pub struct FileScanConfig {
#[derive(Clone)]
pub struct FileScanConfigBuilder {
object_store_url: ObjectStoreUrl,
/// Table schema before any projections or partition columns are applied.
/// Schema information including the file schema, table partition columns,
/// and the combined table schema.
///
/// This schema is used to read the files, but is **not** necessarily the
/// schema of the physical files. Rather this is the schema that the
/// This schema is used to read the files, but the file schema is **not** necessarily
/// the schema of the physical files. Rather this is the schema that the
/// physical file schema will be mapped onto, and the schema that the
/// [`DataSourceExec`] will return.
///
/// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns.
///
/// This probably would be better named `table_schema`
///
/// [`DataSourceExec`]: crate::source::DataSourceExec
file_schema: SchemaRef,
table_schema: TableSchema,
file_source: Arc<dyn FileSource>,
limit: Option<usize>,
projection: Option<Vec<usize>>,
table_partition_cols: Vec<FieldRef>,
constraints: Option<Constraints>,
file_groups: Vec<FileGroup>,
statistics: Option<Statistics>,
Expand All @@ -285,7 +286,7 @@ impl FileScanConfigBuilder {
) -> Self {
Self {
object_store_url,
file_schema,
table_schema: TableSchema::from_file_schema(file_schema),
file_source,
file_groups: vec![],
statistics: None,
Expand All @@ -294,7 +295,6 @@ impl FileScanConfigBuilder {
new_lines_in_values: None,
limit: None,
projection: None,
table_partition_cols: vec![],
constraints: None,
batch_size: None,
expr_adapter_factory: None,
Expand Down Expand Up @@ -326,10 +326,13 @@ impl FileScanConfigBuilder {

/// Set the partitioning columns
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols
let table_partition_cols: Vec<FieldRef> = table_partition_cols
.into_iter()
.map(|f| Arc::new(f) as FieldRef)
.collect();
self.table_schema = self
.table_schema
.with_table_partition_cols(table_partition_cols);
self
}

Expand Down Expand Up @@ -427,11 +430,10 @@ impl FileScanConfigBuilder {
pub fn build(self) -> FileScanConfig {
let Self {
object_store_url,
file_schema,
table_schema,
file_source,
limit,
projection,
table_partition_cols,
constraints,
file_groups,
statistics,
Expand All @@ -443,19 +445,16 @@ impl FileScanConfigBuilder {
} = self;

let constraints = constraints.unwrap_or_default();
let statistics =
statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
let statistics = statistics
.unwrap_or_else(|| Statistics::new_unknown(table_schema.file_schema()));

let file_source = file_source
.with_statistics(statistics.clone())
.with_schema(Arc::clone(&file_schema));
.with_schema(Arc::clone(table_schema.file_schema()));
let file_compression_type =
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
let new_lines_in_values = new_lines_in_values.unwrap_or(false);

// Create TableSchema from file_schema and table_partition_cols
let table_schema = TableSchema::new(file_schema, table_partition_cols);

FileScanConfig {
object_store_url,
table_schema,
Expand All @@ -477,7 +476,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
fn from(config: FileScanConfig) -> Self {
Self {
object_store_url: config.object_store_url,
file_schema: Arc::clone(config.table_schema.file_schema()),
table_schema: config.table_schema,
file_source: Arc::<dyn FileSource>::clone(&config.file_source),
file_groups: config.file_groups,
statistics: config.file_source.statistics().ok(),
Expand All @@ -486,7 +485,6 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
new_lines_in_values: Some(config.new_lines_in_values),
limit: config.limit,
projection: config.projection,
table_partition_cols: config.table_schema.table_partition_cols().clone(),
constraints: Some(config.constraints),
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
Expand Down
14 changes: 14 additions & 0 deletions datafusion/datasource/src/table_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ impl TableSchema {
}
}

/// Create a new TableSchema from a file schema with no partition columns.
pub fn from_file_schema(file_schema: SchemaRef) -> Self {
Self::new(file_schema, vec![])
}

/// Set the table partition columns and rebuild the table schema.
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<FieldRef>,
) -> TableSchema {
self.table_partition_cols = table_partition_cols;
self
}

/// Get the file schema (without partition columns).
///
/// This is the schema of the actual data files on disk.
Expand Down