Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names #8839

Closed
wants to merge 2 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Dec 5, 2020

This PR builds on #8840 and integrates DFSchema with the DataFusion query planning, optimization, and execution code.

This was a pretty large refactor unfortunately and I don't really see a way to break this down into smaller PRs.

There should be no functional changes in this PR. Fields are looked up using field_with_unqualified_name and I will file a separate PR to add support for referencing qualified field names.

Note that I had to update PhysicalExpr.evaluate() to pass in the input schema since we can no longer rely on the schema from the Arrow RecordBatch (because it loses the qualifiers). The other methods on PhysicalExpr already required the input schema, so this seems consistent at least, because we now always use the schema from the plan.

The rest of the changes are updating the query planning, optimization, and execution to use DFSchema instead of Schema.

Design Document: https://docs.google.com/document/d/1BFo7ruJayCulAHLa9-noaHXbgcaAH_4LuOJFGJnDHkc/edit#heading=h.3japu7255aut

@github-actions
Copy link

github-actions bot commented Dec 5, 2020

@andygrove andygrove marked this pull request as ready for review December 5, 2020 20:33
@andygrove andygrove requested review from alamb and jorgecarleitao and removed request for alamb December 5, 2020 20:33
@andygrove andygrove changed the title ARROW-10732: [Rust] [DataFusion] Add SQL support for table/relation aliases and compound identifiers [WIP] ARROW-10732: [Rust] [DataFusion] Implement DFSchema as a step towards supporting qualified column names Dec 5, 2020
@andygrove
Copy link
Member Author

@alamb @jorgecarleitao @Dandandan fyi

@andygrove andygrove changed the title ARROW-10732: [Rust] [DataFusion] Implement DFSchema as a step towards supporting qualified column names ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names Dec 5, 2020
@Dandandan
Copy link
Contributor

This is great! I didn't see any strange things now, code looks clean and it sounds like this could be integrated and further tested.

@jorgecarleitao
Copy link
Member

jorgecarleitao commented Dec 6, 2020

Hey @andygrove . Thanks a lot for this!

I would benefit from understanding the use-case for DFSchema at the physical plan. Note that this is primarily for my own understanding, as I am only familiar with qualifier names in SQL to disambiguate columns in expressions concerning more than one table - not in the representation of a statement at the physical plan. Maybe you could give an example of where arrow::Schema is not sufficient at the physical level?

My current understanding is that, without qualifiers, we can't write things like (table1.a + 1) >= (table2.b - 1).

What I am trying to understand is when do we need such an expression at the physical level. Typically, these plans require some form of join and are mapped to filter(join(a, b)), in which case I do not see how a qualifier is used: before the join there are two input nodes that are joined on a key (i.e. always an equality relationship between columns); after the join, there is a single node, and thus qualifiers are not needed.

One use case case I see for this is when the join is itself over an expression, e.g. JOIN ON (table1.a + 1) == (table2.b - 1). However, in this case, at the physical level, this can always be mapped to join(projection()). I.e. it seems to me that it is more of a convenience at building a logical statement than a necessity for executing such a statement.

If the goal is that we can add the qualifier to the column name after the join, to desambiguate table1.a from table2.a, wouldn't it be easier to do that at the logical plan alone?

@andygrove
Copy link
Member Author

Hi @jorgecarleitao did you get a chance to read the design document? There is a link to it from the JIRA.

@jorgecarleitao
Copy link
Member

Hi @jorgecarleitao did you get a chance to read the design document? There is a link to it from the JIRA.

Yeah, I missed that one and the whole discussion on the issue: https://docs.google.com/document/d/1BFo7ruJayCulAHLa9-noaHXbgcaAH_4LuOJFGJnDHkc/edit#heading=h.su3u27lcpr3l , sorry about that.

@andygrove
Copy link
Member Author

You may have a point about only needing this at the logical level. I am not sure, but I will take a look at this tomorrow.

impl DFSchema {
/// Creates an empty `DFSchema`
pub fn empty() -> Self {
Self { fields: vec![] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make this a hashset? Or convert to vec in last step.


/// Find the index of the column with the given name
pub fn index_of(&self, name: &str) -> Result<usize> {
for i in 0..self.fields.len() {
Copy link
Contributor

@Dandandan Dandandan Dec 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use Vec::position

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove -- this is looking really nice. I agree with @jorgecarleitao that unless there is some usecase we have overlooked, that the DFSchema notion should probably be only in the LogicalPlan, and PhysicalPlans should still only use the Arrow Schema

That is a standard division I have seen in other optimizers / planners -- at some point the distinction between relations / where the input came from is no longer relevant and the code is just focused on sending columns of data around.

@@ -214,7 +212,7 @@ impl ExecutionContext {
has_header: options.has_header,
delimiter: Some(options.delimiter),
projection: None,
projected_schema: csv.schema(),
projected_schema: Arc::new(DFSchema::from(&csv.schema())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this code look better if we implemented impl Into<DFSchemaRef> for SchemaRef -- so then we could write something like projected_schema: csv.schema().into(),

Doing so in some follow on PR would be totally fine

@@ -408,7 +406,7 @@ impl ExecutionContext {
let path = Path::new(&path).join(&filename);
let file = fs::File::create(path)?;
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema(), None)?;
ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema().to_arrow_schema(), None)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could likewise implement impl Into<Schema> for DFSchema and so call into() rather than to_arrow_schema(). This is again just a stylistic thing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've broken this out into a separate PR #8857

andygrove added a commit that referenced this pull request Dec 6, 2020
This PR implements a DataFusion schema that wraps the Arrow schema and adds support for qualified names.

There is a follow-up PR #8839 to integrate this with DataFusion.

Design doc: https://docs.google.com/document/d/1BFo7ruJayCulAHLa9-noaHXbgcaAH_4LuOJFGJnDHkc/edit#heading=h.3japu7255aut

Closes #8840 from andygrove/dfschema

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
@github-actions github-actions bot added the needs-rebase A PR that needs to be rebased by the author label Dec 6, 2020
@andygrove
Copy link
Member Author

@jorgecarleitao @alamb I've been looking at the question of whether the physical plan should use DFSchema. Here is the current (in master) implementation of the physical expression for Column:

impl PhysicalExpr for Column {
    /// Get the data type of this expression, given the schema of the input
    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
        Ok(input_schema
            .field_with_name(&self.name)?
            .data_type()
            .clone())
    }

    /// Decide whehter this expression is nullable, given the schema of the input
    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
        Ok(input_schema.field_with_name(&self.name)?.is_nullable())
    }

    /// Evaluate the expression
    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
        Ok(ColumnarValue::Array(
            batch.column(batch.schema().index_of(&self.name)?).clone(),
        ))
    }
}

As you can see, the data_type and nullable use the schema from the plan whereas the evaluate method uses the schema from the record batch, which is a little inconsistent. They should probably all use the same schema.

The bigger issue though is that this expression is looking up columns by name, so how do we support qualified names here? I see the following choices:

  1. Have ExecutionPlan.schema() use DFSchema as I have done in this PR
  2. Use qualified names in the Arrow schema field name e.g. "t1.foo"
  3. Change the Column physical expression to refer to columns by index rather than name

Maybe there are other options that I am not seeing?

@jorgecarleitao
Copy link
Member

Thanks a lot for looking at this. All excellent points. I now see that this is tricky :)

Thinking about what you wrote, if we plan the Logical as t1.a, t2.a, wouldn't the column names become a, a on the RecordBatch? i.e. there will be a discrepancy between the schema provided by df.schema() and the RecordBatches::schema() returned by collect(), no?

I think that this will happen even if we pass DFSchema to the physical plan (1.) or use indexes (3.), as any map qualified name -> unqualified is lossy (the qualifier), and thus never recoverable at the RecordBatch's schema.

This IMO leaves us with 2., which is what I would try: change the physical planner to alias/rewrite column names with the qualifier when the physical plan is created. This will cause the resulting RecordBatch's schema to have columns named t1.a and t2.a, thereby guaranteeing the invariant that the output schema of the physical execution matches the schema of the logical plan.

I.e. The invariant that SELECT t1.a, t2.a, c ... yields a schema whose columns are named ["t1.a", "t2.a", "c"] is preserved.

Note that we already do this when performing coercion: we preserve the logical schema name by injecting cast ops during physical (and not logical) planning, so that if the user wrote SELECT sqrt(f32) ..., the resulting name on the RecordBatch::schema() is sqrt(f32), even if the physical operation performed was sqrt(CAST(f32 as Float64)).

@andygrove
Copy link
Member Author

Thanks @jorgecarleitao I think that makes a lot of sense. Unfortunately I am running into some issues implementing this due to the physical planner calling into the logical planner to create names and it is getting hard to mix and match these schemas.

I am going to have to take a step back and break this down into smaller steps I think.

@alamb
Copy link
Contributor

alamb commented Dec 7, 2020

As you can see, the data_type and nullable use the schema from the plan whereas the evaluate method uses the schema from the record batch, which is a little inconsistent. They should probably all use the same schema.

I agree -- I recommend using the schema from the plan for consistency.

This IMO leaves us with 2., which is what I would try: change the physical planner to alias/rewrite column names with the qualifier when the physical plan is created. This will cause the resulting RecordBatch's schema to have columns named t1.a and t2.a, thereby guaranteeing the invariant that the output schema of the physical execution matches the schema of the logical plan.

I agree with @jorgecarleitao 's recommendation -- I would recommend when moving from logical --> physical plan, that we always use the fully qualified name of the field, which would avoid ambiguity. If we don't like t1.foo being sprinkled around in plans that only have one table or where the column names aren't ambiguous, we could implement a (logical plan) optimizer pass to remove unneeded qualifiers.

@andygrove
Copy link
Member Author

Thanks for the feedback. I will try and get this rebased today.

@andygrove
Copy link
Member Author

andygrove commented Dec 7, 2020

@alamb @jorgecarleitao @Dandandan This is ready for re-review.

To recap:

  • At execution time we always* use the DataFusion schema from the plan now rather than the Arrow schema from the record batch
  • When converting the DataFusion schema to an Arrow schema for use in record batches, we use the fully qualified field names

(*) It is possible that there may still be one or two places where we are still using the batch schema but I think it will be easier to find those in the follow-up PRs where we add support for referencing columns by qualified names

return Ok(i);
}
if let Some(i) = self.fields.iter().position(|f| f.name() == name) {
Ok(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small style thing but I guess we could do it roughly like this instead?

    self.fields.iter().position(|f| f.name() == name)
        .ok_or_else(|| DataFusionError::Plan(format!("No field named '{}'", name)))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Fixed.

@github-actions github-actions bot removed the needs-rebase A PR that needs to be rebased by the author label Dec 7, 2020
@jorgecarleitao
Copy link
Member

I went carefully through this. As I understand this PR, the reason we pass DFSchema into the ExecutionPlan is that we need to pass it to PhysicalExpr.evaluate, so that we can use field_with_unqualified_name on the ColumnExpr. 95% of the changes on the PR are derived from this change.

IMO this introduces complexity on the physical execution that makes it more difficult to understand and use.

IMO the signature evaluate(&BatchRecord, &DFSchema) indicates a design issue, as the recordBatch has all information required to be evaluated by PhysicalExpr.

IMO we may be able to avoid this complexity by using field_with_unqualified_name on the physical planner, to create a Schema that is passed to the ExecutionPlan with the fields re-written, and creating ColumnExpr using the qualifier names.

Specifically, the suggestion is to have the physical planner convert DFSchema -> Schema by writing DFField (qual, name) to Field "qual.name", and, respectively, pass "qual.name" to ColumnExpr. IMO this would allow to keep all physical planning as it is in master, and IMO would make it easier to understand the physical execution and how the logical plan is being converted to the physical execution.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something doesn't feel right with this PR -- specifically that DFSchema is leaking into physical plan execution.

I think it we can find a way to avoid introducing DFSchema into ExecutionPlan we are going to be in much better shape.

}

#[test]
fn test_display_qualified_schema() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -62,7 +62,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
fn schema(&self) -> DFSchemaRef;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was saying "physical plan doesn't use DFSchemaI guess I was imagining thatExecutionPlan::schema()continued to returnSchemaRef-- there may be some reason thatExecutionPlan` needs to return a DFSchema, but I think the design would be cleaner if we avoided this

@alamb
Copy link
Contributor

alamb commented Dec 7, 2020

Ah and now I see, like so often, @jorgecarleitao has beat me to the comment and has more thorough comments as well 👍

@andygrove
Copy link
Member Author

andygrove commented Dec 7, 2020

Thanks for the continued reviews .... I think I misunderstood some of the earlier feedback. Also, I did run into a design issue when trying to leave the execution path using SchemaRef. I will see if I can find time this everning to explain this issue.

@andygrove
Copy link
Member Author

@jorgecarleitao @alamb I now see where I got carried away with this 😄 .. this PR now updates 16 files instead of 41 and does not change the phyical plans.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is looking good 🎉

Ok(Field::new(
pub fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
Ok(DFField::new(
None, //TODO qualifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth a ticket to track this work -- it would be a good initial project for someone to contribute maybe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I actually forgot about that TODO.. thanks

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the misunderstanding. Thanks for the patience and great work here! LGTM!

@alamb alamb closed this in 09c442a Dec 8, 2020
GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
This PR implements a DataFusion schema that wraps the Arrow schema and adds support for qualified names.

There is a follow-up PR apache#8839 to integrate this with DataFusion.

Design doc: https://docs.google.com/document/d/1BFo7ruJayCulAHLa9-noaHXbgcaAH_4LuOJFGJnDHkc/edit#heading=h.3japu7255aut

Closes apache#8840 from andygrove/dfschema

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
… supporting qualified column names

This PR builds on apache#8840 and integrates DFSchema with the DataFusion query planning, optimization, and execution code.

This was a pretty large refactor unfortunately and I don't really see a way to break this down into smaller PRs.

There should be no functional changes in this PR. Fields are looked up using `field_with_unqualified_name` and I will file a separate PR to add support for referencing qualified field names.

Note that I had to update `PhysicalExpr.evaluate()` to pass in the input schema since we can no longer rely on the schema from the Arrow `RecordBatch` (because it loses the qualifiers). The other methods on `PhysicalExpr` already required the input schema, so this seems consistent at least, because we now always use the schema from the plan.

The rest of the changes are updating the query planning, optimization, and execution to use `DFSchema` instead of `Schema`.

Design Document: https://docs.google.com/document/d/1BFo7ruJayCulAHLa9-noaHXbgcaAH_4LuOJFGJnDHkc/edit#heading=h.3japu7255aut

Closes apache#8839 from andygrove/sql-relation-names

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants