From 0fecee8732f21e219fd00a948f92af5c8f4c7771 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 24 Aug 2021 18:28:12 +0800 Subject: [PATCH 1/7] pushDownPredicate failed to prevent push filters down to the data source. --- .../execution/datasources/DataSourceStrategy.scala | 2 +- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 11d23f482fc14..21740c17bd45a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -409,7 +409,7 @@ object DataSourceStrategy pushedFilters.toSet, handledFilters, None, - scanBuilder(requestedColumns, candidatePredicates, pushedFilters), + scanBuilder(requestedColumns, candidatePredicates, handledFilters.toSeq), relation.relation, relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index c575e95485cea..097b371ea0722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -172,12 +172,12 @@ object JDBCRDD extends Logging { * * @param sc - Your SparkContext. * @param schema - The Catalyst schema of the underlying database table. - * @param requiredColumns - The names of the columns to SELECT. + * @param requiredColumns - The names of the columns or aggregate columns to SELECT. * @param filters - The filters to include in all WHERE clauses. * @param parts - An array of JDBCPartitions specifying partition ids and * per-partition WHERE clauses. * @param options - JDBC options that contains url, table and other information. - * @param outputSchema - The schema of the columns to SELECT. + * @param outputSchema - The schema of the columns or aggregate columns to SELECT. * @param groupByColumns - The pushed down group by columns. * * @return An RDD representing "SELECT requiredColumns FROM fqTable". @@ -213,8 +213,8 @@ object JDBCRDD extends Logging { } /** - * An RDD representing a table in a database accessed via JDBC. Both the - * driver code and the workers must be able to access the database; the driver + * An RDD representing a query is related to a table in a database accessed via JDBC. + * Both the driver code and the workers must be able to access the database; the driver * needs to fetch the schema while the workers need to fetch the data. */ private[jdbc] class JDBCRDD( @@ -237,11 +237,7 @@ private[jdbc] class JDBCRDD( /** * `columns`, but as a String suitable for injection into a SQL query. */ - private val columnList: String = { - val sb = new StringBuilder() - columns.foreach(x => sb.append(",").append(x)) - if (sb.isEmpty) "1" else sb.substring(1) - } + private val columnList: String = if (columns.isEmpty) "1" else columns.mkString(",") /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. From d34cdfd3670a3c3be1f9128fa79a3eb5a55fd73d Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 25 Aug 2021 11:02:23 +0800 Subject: [PATCH 2/7] Update code --- .../sql/execution/datasources/DataSourceStrategy.scala | 2 +- .../sql/execution/datasources/jdbc/JDBCRelation.scala | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 21740c17bd45a..11d23f482fc14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -409,7 +409,7 @@ object DataSourceStrategy pushedFilters.toSet, handledFilters, None, - scanBuilder(requestedColumns, candidatePredicates, handledFilters.toSeq), + scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 60d88b6690587..8098fa0b83a95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -278,12 +278,18 @@ private[sql] case class JDBCRelation( } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // When pushDownPredicate is false, all Filters that need to be pushed down should be ignored + val pushedFilters = if (jdbcOptions.pushDownPredicate) { + filters + } else { + Array.empty[Filter] + } // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( sparkSession.sparkContext, schema, requiredColumns, - filters, + pushedFilters, parts, jdbcOptions).asInstanceOf[RDD[Row]] } From 91c22843da85a7a40f0d558e880563374d95be51 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 26 Aug 2021 14:44:30 +0800 Subject: [PATCH 3/7] Update code --- .../datasources/DataSourceStrategy.scala | 2 +- .../datasources/jdbc/JDBCRelation.scala | 8 +------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 5 +++++ .../spark/sql/sources/FilteredScanSuite.scala | 19 +++++++++---------- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 11d23f482fc14..21740c17bd45a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -409,7 +409,7 @@ object DataSourceStrategy pushedFilters.toSet, handledFilters, None, - scanBuilder(requestedColumns, candidatePredicates, pushedFilters), + scanBuilder(requestedColumns, candidatePredicates, handledFilters.toSeq), relation.relation, relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 8098fa0b83a95..60d88b6690587 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -278,18 +278,12 @@ private[sql] case class JDBCRelation( } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - // When pushDownPredicate is false, all Filters that need to be pushed down should be ignored - val pushedFilters = if (jdbcOptions.pushDownPredicate) { - filters - } else { - Array.empty[Filter] - } // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( sparkSession.sparkContext, schema, requiredColumns, - pushedFilters, + filters, parts, jdbcOptions).asInstanceOf[RDD[Row]] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 95a91616f80cc..4e10d7d260ecf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -315,6 +315,11 @@ class JDBCSuite extends QueryTest assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) + val filterExec = node.child.asInstanceOf[org.apache.spark.sql.execution.FilterExec] + assert(filterExec.child.isInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec]) + val scanExec = + filterExec.child.asInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec] + assert(scanExec.handledFilters.isEmpty) df } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 657ef5ca13bd9..2814921c6b86d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -21,7 +21,6 @@ import java.util.Locale import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -66,7 +65,9 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S } } - filters.filter(unhandled) + val unhandledFilters = filters.filter(unhandled) + FiltersUnhandled.list = unhandledFilters + unhandledFilters } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { @@ -122,6 +123,11 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S } } +// Used to check unhandled filters. +object FiltersUnhandled { + var list: Seq[Filter] = Nil +} + // A hack for better error messages when filter pushdown fails. object FiltersPushed { var list: Seq[Filter] = Nil @@ -321,14 +327,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSparkSession { } val rawCount = rawPlan.execute().count() assert(ColumnsRequired.set === requiredColumnNames) - - val table = spark.table("oneToTenFiltered") - val relation = table.queryExecution.analyzed.collectFirst { - case LogicalRelation(r, _, _, _) => r - }.get - - assert( - relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) + assert(FiltersUnhandled.list.toSet === expectedUnhandledFilters) if (rawCount != expectedCount) { fail( From ecb3a47b224c0d4e7cc40c6f26f9a7f3d5b9aee3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 26 Aug 2021 15:24:31 +0800 Subject: [PATCH 4/7] Update code --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 4e10d7d260ecf..460860827577c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -315,11 +315,19 @@ class JDBCSuite extends QueryTest assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) - val filterExec = node.child.asInstanceOf[org.apache.spark.sql.execution.FilterExec] - assert(filterExec.child.isInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec]) - val scanExec = - filterExec.child.asInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec] - assert(scanExec.handledFilters.isEmpty) + val relation = df.queryExecution.analyzed.collectFirst { + case LogicalRelation(r, _, _, _) => r + }.get + assert(relation.isInstanceOf[org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation]) + val jdbcRelation = + relation.asInstanceOf[org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation] + if (jdbcRelation.jdbcOptions.pushDownPredicate == false) { + val filterExec = node.child.asInstanceOf[org.apache.spark.sql.execution.FilterExec] + assert(filterExec.child.isInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec]) + val scanExec = + filterExec.child.asInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec] + assert(scanExec.handledFilters.isEmpty) + } df } From 24d3c94cd88c41b41e446efd2332b0005155cac2 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 26 Aug 2021 17:14:56 +0800 Subject: [PATCH 5/7] Update code --- .../datasources/DataSourceStrategy.scala | 2 +- .../datasources/jdbc/JDBCRelation.scala | 8 +++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 41 +++++++++++++------ .../spark/sql/sources/FilteredScanSuite.scala | 19 +++++---- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 21740c17bd45a..11d23f482fc14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -409,7 +409,7 @@ object DataSourceStrategy pushedFilters.toSet, handledFilters, None, - scanBuilder(requestedColumns, candidatePredicates, handledFilters.toSeq), + scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 60d88b6690587..8098fa0b83a95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -278,12 +278,18 @@ private[sql] case class JDBCRelation( } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // When pushDownPredicate is false, all Filters that need to be pushed down should be ignored + val pushedFilters = if (jdbcOptions.pushDownPredicate) { + filters + } else { + Array.empty[Filter] + } // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( sparkSession.sparkContext, schema, requiredColumns, - filters, + pushedFilters, parts, jdbcOptions).asInstanceOf[RDD[Row]] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 460860827577c..f758a768592c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -315,19 +315,6 @@ class JDBCSuite extends QueryTest assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) - val relation = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(r, _, _, _) => r - }.get - assert(relation.isInstanceOf[org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation]) - val jdbcRelation = - relation.asInstanceOf[org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation] - if (jdbcRelation.jdbcOptions.pushDownPredicate == false) { - val filterExec = node.child.asInstanceOf[org.apache.spark.sql.execution.FilterExec] - assert(filterExec.child.isInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec]) - val scanExec = - filterExec.child.asInstanceOf[org.apache.spark.sql.execution.RowDataSourceScanExec] - assert(scanExec.handledFilters.isEmpty) - } df } @@ -1736,6 +1723,34 @@ class JDBCSuite extends QueryTest Row("fred", 1) :: Nil) } + test( + "SPARK-36574: pushDownPredicate=false should prevent push down filters to JDBC data source") { + val table = "test.people" + + val df = spark.read.format("jdbc").option("Url", urlWithUserAndPass).option("dbTable", table) + val df1 = df + .option("pushDownPredicate", false) + .load() + .filter("theid = 1") + .select("name", "theid") + val df2 = df + .load() + .select("name", "theid") + + def getRowCount(df: DataFrame): Long = { + val queryExecution = df.queryExecution + val rawPlan = queryExecution.executedPlan.collect { + case p: DataSourceScanExec => p + } match { + case Seq(p) => p + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } + rawPlan.execute().count() + } + + assert(getRowCount(df1) == getRowCount(df2)) + } + test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") { val e = intercept[IllegalArgumentException] { val opts = Map( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 2814921c6b86d..657ef5ca13bd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -21,6 +21,7 @@ import java.util.Locale import org.apache.spark.rdd.RDD import org.apache.spark.sql._ +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -65,9 +66,7 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S } } - val unhandledFilters = filters.filter(unhandled) - FiltersUnhandled.list = unhandledFilters - unhandledFilters + filters.filter(unhandled) } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { @@ -123,11 +122,6 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S } } -// Used to check unhandled filters. -object FiltersUnhandled { - var list: Seq[Filter] = Nil -} - // A hack for better error messages when filter pushdown fails. object FiltersPushed { var list: Seq[Filter] = Nil @@ -327,7 +321,14 @@ class FilteredScanSuite extends DataSourceTest with SharedSparkSession { } val rawCount = rawPlan.execute().count() assert(ColumnsRequired.set === requiredColumnNames) - assert(FiltersUnhandled.list.toSet === expectedUnhandledFilters) + + val table = spark.table("oneToTenFiltered") + val relation = table.queryExecution.analyzed.collectFirst { + case LogicalRelation(r, _, _, _) => r + }.get + + assert( + relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) if (rawCount != expectedCount) { fail( From 9c01364e1850cd6274226deb93abaa898bcbd01e Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 26 Aug 2021 17:28:14 +0800 Subject: [PATCH 6/7] Update code --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f758a768592c7..aeb32b2d79cd3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1725,9 +1725,9 @@ class JDBCSuite extends QueryTest test( "SPARK-36574: pushDownPredicate=false should prevent push down filters to JDBC data source") { - val table = "test.people" - - val df = spark.read.format("jdbc").option("Url", urlWithUserAndPass).option("dbTable", table) + val df = spark.read.format("jdbc") + .option("Url", urlWithUserAndPass) + .option("dbTable", "test.people") val df1 = df .option("pushDownPredicate", false) .load() From 4b4f78ababd83664340722ea47b24352dda284ca Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 27 Aug 2021 10:27:16 +0800 Subject: [PATCH 7/7] Update code --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index aeb32b2d79cd3..8842db2a2aca4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1734,6 +1734,11 @@ class JDBCSuite extends QueryTest .filter("theid = 1") .select("name", "theid") val df2 = df + .option("pushDownPredicate", true) + .load() + .filter("theid = 1") + .select("name", "theid") + val df3 = df .load() .select("name", "theid") @@ -1748,7 +1753,8 @@ class JDBCSuite extends QueryTest rawPlan.execute().count() } - assert(getRowCount(df1) == getRowCount(df2)) + assert(getRowCount(df1) == df3.count) + assert(getRowCount(df2) < df3.count) } test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") {