Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-3462 push down filters and projections into Unions #2345

Closed
wants to merge 3 commits into from
Closed

SPARK-3462 push down filters and projections into Unions #2345

wants to merge 3 commits into from

Conversation

koeninger
Copy link
Contributor

No description provided.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

Can one of the admins verify this patch?

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2345 at commit ef47b3b.

  • This patch merges cleanly.

@marmbrus
Copy link
Contributor

Hey @koeninger, thanks for implementing this optimization!

Overall this looks pretty good. A few minor suggestions:

  • I'm not sure that we want to check the names and qualifiers to find the corresponding Attributes. It is possible that either side could actually have tables that are aliased differently and thus the Attributes would have different qualifiers. Instead, I think that it safe to assume that the analysis phase has checked name and type matching and just find the corresponding Attribute by ordering. I'm thinking something like this:
/**
 * Pushes Project and Filter operations to either side of a Union.
 */
object UnionPushdown extends Rule[LogicalPlan] {

  /**
   * Rewrites an expression so that it can be pushed to the right side of a Union operator.
   * This method relies on the fact that the output attributes of a union are always equal to the
   * left child's output.
   */
  def pushToRight[A <: Expression](e: A, union: Union): A = {
    assert(union.left.output.size == union.right.output.size)

    // Maps Attributes from the left side to the corresponding Attribute on the right side.
    val rewrites = AttributeMap(union.left.output.zip(union.right.output))
    val result = e transform {
      case a: Attribute => rewrites(a)
    }

    // We must promise the compiler that we did not discard the names in the case of project
    // expressions.  This is safe since the only transformation is from Attribute => Attribute.
    result.asInstanceOf[A]
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    // Push down filter into union
    case Filter(condition, u @ Union(left, right)) =>
      Union(
        Filter(condition, left),
        Filter(pushToRight(condition, u), right))

    // Push down projection into union
    case Project(projectList, u @ Union(left, right)) =>
      Union(
        Project(projectList, left),
        Project(projectList.map(pushToRight(_, u)), right))
  }
}
  • Would also be great to add some tests to FilterPushdownSuite and maybe create a similar ColumnPruningSuite.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2345 at commit ef47b3b.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor Author

@marbrus I see what you mean. Updated to basically what you suggested, aside from building the map once. Let me know, once it's finalized I can try to test one more time on live data.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2345 at commit 0788691.

  • This patch merges cleanly.

* This method relies on the fact that the output attributes of a union are always equal
* to the left child's output.
*/
def pushToRight[A <: Expression](e: A, union: Union, rewrites: AttributeMap[Attribute]): A = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: union is not used.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2345 at commit 0788691.

  • This patch merges cleanly.

@marmbrus
Copy link
Contributor

LGTM to me once the tests pass.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2345 at commit 5c8d24d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2345 at commit 0788691.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CreateTableAsSelect(
    • case class CreateTableAsSelect(

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2345 at commit 0788691.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CreateTableAsSelect(
    • case class CreateTableAsSelect(

@marmbrus
Copy link
Contributor

Thanks! I've merged to master.

@asfgit asfgit closed this in f858f46 Sep 12, 2014
@SparkQA
Copy link

SparkQA commented Sep 12, 2014

QA tests have finished for PR 2345 at commit 5c8d24d.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CreateTableAsSelect(
    • case class CreateTableAsSelect(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants