-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
In various parts of the codebase, it's clear that a projection: Option<&Vec<usize>> value of None is intended to indicate essentially select * or no explicit subset of the table's columns.
One important place this shows up is in the projection parameter of the TableProvider::scan() function. However, the high-level interfaces for calling this seem to never pass None for that parameter.
Concrete use-case
Imagine I have a custom table provider, VersionedTableProvider, which abstracts over a version-enabled object store bucket to allow the user to write queres like SELECT * FROM t WHERE version = 'v1';.
A user then pushes a new table t with the following data. This is stored in a custom index as v1.
+----+----+----+
| id | a | b |
+----+----+----+
| 0 | 10 | 30 |
| 1 | 20 | 40 |
+----+----+----+
Then, they push another version, stored as v2:
+----+----+----+
| id | a | c |
+----+----+----+
| 0 | 10 | 50 |
| 1 | 20 | 60 |
+----+----+----+
Calling ::schema() on this VersionedTableProvider gives me a combined schema like ["id", "a", "b", "c", "version"].
If the user explicitly requests a column not in any version of the table, DataFusion will throw an error while building the logical plan:
SELECT f FROM t; -> FieldNotFound: no field f in t.
However what about the following 2 cases (no explicit version filter implies latest)?
SELECT * FROM t;SELECT a, b, c FROM t;
I have been trying to handle this in TableProvider::scan(), but they need to be handled differently.
Let's say the beginning of my custom scan() impl looks like this:
let versioned_table = match parse_version_from_filters(filters) {
Some(version) => self.versioned_table(&version)?,
None => self.latest_versioned_table(),
};
// This may differ from self.schema(), it's the schema of the actual versioned parquet file.
let file_schema = versioned_table.schema();
let projected_schema = project_schema(&file_schema, &file_projection)?;
...Where project_schema handle None projection correctly.
What I'm expecting should happen
Case 1 should be a valid query. Since I have the projection and filter expr list, I can confirm the version the user wants or default to latest. Then, should be able to simply do project None onto the file_schema, and the user gets what they expect.
Case 2 should be an error. Since the user has explicitly requested a column b that is not in the requested version of the table v2, I have enough information be able to return a very specific diagnostic: field b not found in t version v2.
This should be trivial because SELECT * should seemingly, as documented in various places, flow down a None projection to scan().
What's actually happening
When using the dataframe API or ctx.sql, I seem to always get a Some projection. Earlier in the call chain, for example looking at the LogicalPlan right after sql_to_statement on SELECT * FROM t;, I see
Projection {
expr: vec![... every col in the table schema],
input: TableScan { ..., projection: None, ... },
schema: ...,
}
Interesting, the input TableScan logical plan correctly stores the None value of projection.
But for some reason that's not what gets passed to scan().
Describe the solution you'd like
SELECT * should map to a projection of None in the TableProvider::scan() so that, when there's a mismatch between a full table schema and a specific file schema, I can determine if the user explicitly requested an invalid column in an explicit projection list, or not.
Describe alternatives you've considered
No response
Additional context
No response