diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala new file mode 100644 index 0000000000000..cb04cc8dca428 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import scala.collection.immutable + +private[spark] object CatalystConf{ + val CASE_SENSITIVE = "spark.sql.caseSensitive" +} + +private[spark] trait CatalystConf { + def setConf(key: String, value: String) : Unit + def getConf(key: String) : String + def getConf(key: String, defaultValue: String) : String + def getAllConfs: immutable.Map[String, String] +} + +/** + * A trivial conf that is empty. Used for testing when all + * relations are already filled in and the analyser needs only to resolve attribute references. + */ +object EmptyConf extends CatalystConf { + def setConf(key: String, value: String) : Unit = { + throw new UnsupportedOperationException + } + + def getConf(key: String) : String = { + throw new UnsupportedOperationException + } + + def getConf(key: String, defaultValue: String) : String = { + throw new UnsupportedOperationException + } + + def getAllConfs: immutable.Map[String, String] = { + throw new UnsupportedOperationException + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fc37b8cde0806..1ae74fd96e8dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.util.collection.OpenHashSet import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.test.SimpleConf import org.apache.spark.sql.types._ /** @@ -30,7 +31,7 @@ import org.apache.spark.sql.types._ * when all relations are already filled in and the analyser needs only to resolve attribute * references. */ -object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true) +object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleConf) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and @@ -39,11 +40,17 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true */ class Analyzer(catalog: Catalog, registry: FunctionRegistry, - caseSensitive: Boolean, + conf: CatalystConf, maxIterations: Int = 100) extends RuleExecutor[LogicalPlan] with HiveTypeCoercion { - val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution + def resolver: Resolver = { + if (conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) { + caseSensitiveResolution + } else { + caseInsensitiveResolution + } + } val fixedPoint = FixedPoint(maxIterations) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 9e6e2912e0622..29262e8d7e056 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.EmptyConf /** * Thrown by a catalog when a table cannot be found. The analzyer will rethrow the exception @@ -32,7 +34,7 @@ class NoSuchTableException extends Exception */ trait Catalog { - def caseSensitive: Boolean + val conf: CatalystConf def tableExists(tableIdentifier: Seq[String]): Boolean @@ -55,7 +57,7 @@ trait Catalog { def unregisterAllTables(): Unit protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = { - if (!caseSensitive) { + if (!conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) { tableIdentifier.map(_.toLowerCase) } else { tableIdentifier @@ -76,7 +78,7 @@ trait Catalog { } } -class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { +class SimpleCatalog(val conf: CatalystConf) extends Catalog { val tables = new mutable.HashMap[String, LogicalPlan]() override def registerTable( @@ -162,7 +164,7 @@ trait OverrideCatalog extends Catalog { } abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - val dbName = if (!caseSensitive) { + val dbName = if (!conf.getConf(CatalystConf.CASE_SENSITIVE).toBoolean) { if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None } else { databaseName @@ -205,7 +207,7 @@ trait OverrideCatalog extends Catalog { */ object EmptyCatalog extends Catalog { - val caseSensitive: Boolean = true + override val conf: CatalystConf = EmptyConf def tableExists(tableIdentifier: Seq[String]): Boolean = { throw new UnsupportedOperationException diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/test/SimpleConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/test/SimpleConf.scala new file mode 100644 index 0000000000000..1f6675f89190d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/test/SimpleConf.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.test + +import org.apache.spark.sql.catalyst.CatalystConf + +import scala.collection.immutable +import scala.collection.mutable + +/** A CatalystConf that can be used for local testing. */ +class SimpleConf extends CatalystConf{ + val map = mutable.Map[String, String]() + + def setConf(key: String, value: String) : Unit = { + map.put(key, value) + } + def getConf(key: String) : String ={ + map.get(key).get + } + def getConf(key: String, defaultValue: String) : String = { + map.getOrElse(key, defaultValue) + } + def getAllConfs: immutable.Map[String, String] = { + map.toMap + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index aec7847356cd4..98ec29f762c75 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -23,17 +23,22 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ - +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.test.SimpleConf import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ class AnalysisSuite extends FunSuite with BeforeAndAfter { - val caseSensitiveCatalog = new SimpleCatalog(true) - val caseInsensitiveCatalog = new SimpleCatalog(false) + val caseSensitiveConf = new SimpleConf() + caseSensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "true") + val caseInsensitiveConf = new SimpleConf() + caseInsensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "false") + val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) + val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) val caseSensitiveAnalyze = - new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) + new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) val caseInsensitiveAnalyze = - new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) + new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) val testRelation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) val testRelation2 = LocalRelation( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index bc2ec754d5865..3dc7f004271a9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Project, LocalRelation} import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.test.SimpleConf import org.scalatest.{BeforeAndAfter, FunSuite} class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter { - val catalog = new SimpleCatalog(false) - val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false) + val conf = new SimpleConf + val catalog = new SimpleCatalog(conf) + val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) val relation = LocalRelation( AttributeReference("i", IntegerType)(), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 39f6c2f4bc8b4..087b17f53ae80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.CatalystConf + import scala.collection.immutable import scala.collection.JavaConversions._ @@ -69,7 +71,8 @@ private[spark] object SQLConf { * * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ -private[sql] class SQLConf extends Serializable { + +private[sql] class SQLConf extends Serializable with CatalystConf { import SQLConf._ /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ @@ -220,4 +223,3 @@ private[sql] class SQLConf extends Serializable { settings.clear() } } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4bdaa023914b8..41bb5be5a27eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -104,14 +104,14 @@ class SQLContext(@transient val sparkContext: SparkContext) def getAllConfs: immutable.Map[String, String] = conf.getAllConfs @transient - protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true) + protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf) @transient protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true) @transient protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, caseSensitive = true) { + new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = ExtractPythonUdfs :: sources.PreWriteCheck(catalog) :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 097bf0dd23c89..250e55cc3ef91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,10 +21,10 @@ import org.apache.spark.sql.test.TestSQLContext import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.functions._ +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types._ - import org.apache.spark.sql.TestData._ import org.apache.spark.sql.test.TestSQLContext.{udf => _, _} @@ -1049,4 +1049,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { rdd.toDF().registerTempTable("distinctData") checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2)) } + + test("SPARK-4699 case sensitivity SQL query") { + setConf(CatalystConf.CASE_SENSITIVE, "false") + val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil + val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) + rdd.toDF().registerTempTable("testTable1") + checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) + setConf(CatalystConf.CASE_SENSITIVE, "true") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index 0ec6881d7afe6..a023788c8ab75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -18,22 +18,13 @@ package org.apache.spark.sql.sources import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.test.TestSQLContext import org.scalatest.BeforeAndAfter abstract class DataSourceTest extends QueryTest with BeforeAndAfter { - // Case sensitivity is not configurable yet, but we want to test some edge cases. - // TODO: Remove when it is configurable - implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) { - @transient - override protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, caseSensitive = false) { - override val extendedResolutionRules = - PreWriteCheck(catalog) :: - PreInsertCastAndRename :: - Nil - } - } -} + // We want to test some edge cases. + implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) + caseInsensisitiveContext.setConf(CatalystConf.CASE_SENSITIVE, "false") +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 2e205e67c0fdd..eae4008bcd0a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand} import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy} +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.types._ /** @@ -53,6 +54,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") } + /* By default it should be case insensitive to match Hive */ + conf.setConf(CatalystConf.CASE_SENSITIVE, "false") + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -249,7 +253,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient - override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog + override protected[sql] lazy val catalog = + new HiveMetastoreCatalog(this, conf) with OverrideCatalog // Note that HiveUDFs will be overridden by functions registered in this context. @transient @@ -261,7 +266,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* An analyzer that uses the Hive metastore. */ @transient override protected[sql] lazy val analyzer = - new Analyzer(catalog, functionRegistry, caseSensitive = false) { + new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = catalog.ParquetConversions :: catalog.CreateTables :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f7ad2efc9544e..d9951d0deb0ca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -41,12 +41,14 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec} import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.util.Utils /* Implicit conversions */ import scala.collection.JavaConversions._ -private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { +private[hive] class HiveMetastoreCatalog(hive: HiveContext, val conf: CatalystConf) + extends Catalog with Logging { import org.apache.spark.sql.hive.HiveMetastoreTypes._ /** Connection to hive metastore. Usages should lock on `this`. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index f2bc73bf3bdf9..86c8b2dc2006a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -337,6 +337,12 @@ class SQLQuerySuite extends QueryTest { } } + test("SPARK-4699 HiveContext should be case insensitive by default") { + checkAnswer( + sql("SELECT KEY FROM Src ORDER BY value"), + sql("SELECT key FROM src ORDER BY value").collect().toSeq) + } + test("SPARK-5284 Insert into Hive throws NPE when a inner complex type field has a null value") { val schema = StructType( StructField("s",