From 09e2135d6e5abef21328941d73031109e6d4d4b6 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 27 Dec 2015 11:58:12 -0800 Subject: [PATCH] [SPARK-12506][SQL]push down WHERE clause arithmetic operator to JDBC layer --- .../datasources/DataSourceStrategy.scala | 77 ++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 3741a9cb32fd4..32875eb087a21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation + import scala.collection.mutable.ArrayBuffer import org.apache.spark.deploy.SparkHadoopUtil @@ -484,6 +486,74 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } } + /** + * Convert add predicate such as C1 + C2 + C3 to string + */ + private[this] def getAddString (predicate: Expression): String = { + predicate match { + case expressions.Add(left, right) => + { + val leftString = left match { + case a: Attribute => a.name + case add: Add => getAddString (add) + case _ => None + } + val rightString = right match { + case a: Attribute => a.name + case add: Add => getAddString (add) + case _ => None + } + leftString + " + " + rightString + } + } + } + + /** + * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. + * If isJdbcRelation is true, also translate the arithmetica predicate (+,-,*, /) + * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. + */ + protected[sql] def translateFilter(isJdbcRelation: Boolean, predicate: Expression): Option[Filter] = { + val option = translateFilter(predicate) + if (option != None) + option + else { + //if translated filter is none, check if it's jdbc relateion, if yes, translate arithmetic predicate + if (isJdbcRelation){ + predicate match { + case expressions.EqualTo(Add(a, b), Literal(v, t)) => + Some(sources.EqualTo(getAddString (Add(a, b)), convertToScala(v, t))) + case expressions.EqualTo(Literal(v, t), Add(a, b)) => + Some(sources.EqualTo(getAddString (Add(a, b)), convertToScala(v, t))) + + case expressions.GreaterThan(Add(a, b), Literal(v, t)) => + Some(sources.GreaterThan(getAddString (Add(a, b)), convertToScala(v, t))) + case expressions.GreaterThan(Literal(v, t), Add(a, b)) => + Some(sources.LessThan(getAddString (Add(a, b)), convertToScala(v, t))) + + case expressions.LessThan(Add(a, b), Literal(v, t)) => + Some(sources.LessThan(getAddString (Add(a, b)), convertToScala(v, t))) + case expressions.LessThan(Literal(v, t), Add(a, b)) => + Some(sources.GreaterThan(getAddString (Add(a, b)), convertToScala(v, t))) + + case expressions.GreaterThanOrEqual(Add(a, b), Literal(v, t)) => + Some(sources.GreaterThanOrEqual(getAddString (Add(a, b)), convertToScala(v, t))) + case expressions.GreaterThanOrEqual(Literal(v, t), Add(a, b)) => + Some(sources.LessThanOrEqual(getAddString (Add(a, b)), convertToScala(v, t))) + + case expressions.LessThanOrEqual(Add(a, b), Literal(v, t)) => + Some(sources.LessThanOrEqual(getAddString (Add(a, b)), convertToScala(v, t))) + case expressions.LessThanOrEqual(Literal(v, t), Add(a, b)) => + Some(sources.GreaterThanOrEqual(getAddString (Add(a, b)), convertToScala(v, t))) + + case _ => None + } + } + else + None + } + } + /** * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s * and can be handled by `relation`. @@ -500,11 +570,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are // called `predicate`s, while all data source filters of type `sources.Filter` are simply called // `filter`s. + var jdbcRelation = false; + relation match { + case jdbc: JDBCRelation => jdbcRelation = true + case _ => jdbcRelation = false + } val translated: Seq[(Expression, Filter)] = for { predicate <- predicates - filter <- translateFilter(predicate) + filter <- translateFilter(jdbcRelation, predicate) } yield predicate -> filter // A map from original Catalyst expressions to corresponding translated data source filters.