Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36574][SQL] pushDownPredicate=false should prevent push down filters to JDBC data source #33822

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -172,12 +172,12 @@ object JDBCRDD extends Logging {
*
* @param sc - Your SparkContext.
* @param schema - The Catalyst schema of the underlying database table.
* @param requiredColumns - The names of the columns to SELECT.
* @param requiredColumns - The names of the columns or aggregate columns to SELECT.
* @param filters - The filters to include in all WHERE clauses.
* @param parts - An array of JDBCPartitions specifying partition ids and
* per-partition WHERE clauses.
* @param options - JDBC options that contains url, table and other information.
* @param outputSchema - The schema of the columns to SELECT.
* @param outputSchema - The schema of the columns or aggregate columns to SELECT.
* @param groupByColumns - The pushed down group by columns.
*
* @return An RDD representing "SELECT requiredColumns FROM fqTable".
Expand Down Expand Up @@ -213,8 +213,8 @@ object JDBCRDD extends Logging {
}

/**
* An RDD representing a table in a database accessed via JDBC. Both the
* driver code and the workers must be able to access the database; the driver
* An RDD representing a query is related to a table in a database accessed via JDBC.
* Both the driver code and the workers must be able to access the database; the driver
* needs to fetch the schema while the workers need to fetch the data.
*/
private[jdbc] class JDBCRDD(
Expand All @@ -237,11 +237,7 @@ private[jdbc] class JDBCRDD(
/**
* `columns`, but as a String suitable for injection into a SQL query.
*/
private val columnList: String = {
val sb = new StringBuilder()
columns.foreach(x => sb.append(",").append(x))
if (sb.isEmpty) "1" else sb.substring(1)
}
private val columnList: String = if (columns.isEmpty) "1" else columns.mkString(",")

/**
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
Expand Down
Expand Up @@ -278,12 +278,18 @@ private[sql] case class JDBCRelation(
}

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// When pushDownPredicate is false, all Filters that need to be pushed down should be ignored
val pushedFilters = if (jdbcOptions.pushDownPredicate) {
filters
} else {
Array.empty[Filter]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If pushDownPredicate is false, the unhandledFilters are set to all the filters here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala#L276. Seems to me that the unhandledFilters shouldn't be pushed down to JDBC at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's a bug since day 1: #21875

I think we should update the tested add at that time and check the real pushed filters in JDBC source.

// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
JDBCRDD.scanTable(
sparkSession.sparkContext,
schema,
requiredColumns,
filters,
pushedFilters,
parts,
jdbcOptions).asInstanceOf[RDD[Row]]
}
Expand Down