Skip to content

Commit

Permalink
[SPARK-4699] [SQL] Make caseSensitive configurable in spark sql analyzer
Browse files Browse the repository at this point in the history
based on #3558

Author: Jacky Li <jacky.likun@huawei.com>
Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>

Closes #5806 from scwf/case and squashes the following commits:

cd51712 [wangfei] fix compile
d4b724f [wangfei] address michael's comment
af512c7 [wangfei] fix conflicts
4ef1be7 [wangfei] fix conflicts
269cf21 [scwf] fix conflicts
b73df6c [scwf] style issue
9e11752 [scwf] improve SimpleCatalystConf
b35529e [scwf] minor style
a3f7659 [scwf] remove unsed imports
2a56515 [scwf] fix conflicts
6db4bf5 [scwf] also fix for HiveContext
7fc4a98 [scwf] fix test case
d5a9933 [wangfei] fix style
eee75ba [wangfei] fix EmptyConf
6ef31cf [wangfei] revert pom changes
5d7c456 [wangfei] set CASE_SENSITIVE false in TestHive
966e719 [wangfei] set CASE_SENSITIVE false in hivecontext
fd30e25 [wangfei] added override
69b3b70 [wangfei] fix AnalysisSuite
5472b08 [wangfei] fix compile issue
56034ca [wangfei] fix conflicts and improve for catalystconf
664d1e9 [Jacky Li] Merge branch 'master' of https://github.com/apache/spark into case
12eca9a [Jacky Li] solve conflict with master
39e369c [Jacky Li] fix confilct after DataFrame PR
dee56e9 [Jacky Li] fix test case failure
05b09a3 [Jacky Li] fix conflict base on the latest master branch
73c16b1 [Jacky Li] fix bug in sql/hive
9bf4cc7 [Jacky Li] fix bug in catalyst
005c56d [Jacky Li] make SQLContext caseSensitivity configurable
6332e0f [Jacky Li] fix bug
fcbf0d9 [Jacky Li] fix scalastyle check
e7bca31 [Jacky Li] make caseSensitive configuration in Analyzer and Catalog
91b1b96 [Jacky Li] make caseSensitive configurable in Analyzer
f57f15c [Jacky Li] add testcase
578d167 [Jacky Li] make caseSensitive configurable
  • Loading branch information
jackylk authored and marmbrus committed May 8, 2015
1 parent 90527f5 commit 6dad76e
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean
}

/**
* A trivial conf that is empty. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyzer needs only to resolve attribute
* references.
*/
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)
object SimpleAnalyzer
extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand All @@ -41,11 +43,17 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true
class Analyzer(
catalog: Catalog,
registry: FunctionRegistry,
caseSensitive: Boolean,
conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion with CheckAnalysis {

val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution
def resolver: Resolver = {
if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}

val fixedPoint = FixedPoint(maxIterations)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}

/**
Expand All @@ -34,7 +36,7 @@ class NoSuchDatabaseException extends Exception
*/
trait Catalog {

def caseSensitive: Boolean
val conf: CatalystConf

def tableExists(tableIdentifier: Seq[String]): Boolean

Expand All @@ -57,10 +59,10 @@ trait Catalog {
def unregisterAllTables(): Unit

protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (!caseSensitive) {
tableIdentifier.map(_.toLowerCase)
} else {
if (conf.caseSensitiveAnalysis) {
tableIdentifier
} else {
tableIdentifier.map(_.toLowerCase)
}
}

Expand All @@ -78,7 +80,7 @@ trait Catalog {
}
}

class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()

override def registerTable(
Expand Down Expand Up @@ -164,10 +166,10 @@ trait OverrideCatalog extends Catalog {
}

abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
val dbName = if (!caseSensitive) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else {
val dbName = if (conf.caseSensitiveAnalysis) {
databaseName
} else {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
}

val temporaryTables = overrides.filter {
Expand Down Expand Up @@ -207,7 +209,7 @@ trait OverrideCatalog extends Catalog {
*/
object EmptyCatalog extends Catalog {

override val caseSensitive: Boolean = true
override val conf: CatalystConf = EmptyConf

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,26 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseSensitiveCatalog = new SimpleCatalog(true)
val caseInsensitiveCatalog = new SimpleCatalog(false)
val caseSensitiveConf = new SimpleCatalystConf(true)
val caseInsensitiveConf = new SimpleCatalystConf(false)

val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)

val caseSensitiveAnalyzer =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) {
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}
val caseInsensitiveAnalyzer =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) {
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}


def caseSensitiveAnalyze(plan: LogicalPlan): Unit =
caseSensitiveAnalyzer.checkAnalysis(caseSensitiveAnalyzer.execute(plan))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package org.apache.spark.sql.catalyst.analysis

import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation}
import org.apache.spark.sql.types._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.sql.catalyst.SimpleCatalystConf

class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val catalog = new SimpleCatalog(false)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false)
val conf = new SimpleCatalystConf(true)
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)

val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
Expand Down
13 changes: 11 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql

import java.util.Properties

import scala.collection.immutable
import scala.collection.JavaConversions._

import java.util.Properties
import org.apache.spark.sql.catalyst.CatalystConf

private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
Expand All @@ -32,6 +34,7 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
val DIALECT = "spark.sql.dialect"
val CASE_SENSITIVE = "spark.sql.caseSensitive"

val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
Expand Down Expand Up @@ -89,7 +92,8 @@ private[spark] object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
private[sql] class SQLConf extends Serializable {

private[sql] class SQLConf extends Serializable with CatalystConf {
import SQLConf._

/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
Expand Down Expand Up @@ -158,6 +162,11 @@ private[sql] class SQLConf extends Serializable {
*/
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean

/**
* caseSensitive analysis true by default
*/
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, "true").toBoolean

/**
* When set to true, Spark SQL will use managed memory for certain operations. This option only
* takes effect if codegen is enabled.
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ class SQLContext(@transient val sparkContext: SparkContext)

// TODO how to handle the temp table per user session?
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

// TODO how to handle the temp function per user session?
@transient
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true)

@transient
protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = true) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
ExtractPythonUdfs ::
sources.PreInsertCastAndRename ::
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.GeneratedAggregate
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
Expand Down Expand Up @@ -1277,6 +1278,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}

test("SPARK-4699 case sensitivity SQL query") {
setConf(SQLConf.CASE_SENSITIVE, "false")
val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.toDF().registerTempTable("testTable1")
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
setConf(SQLConf.CASE_SENSITIVE, "true")
}

test("SPARK-6145: ORDER BY test for nested fields") {
jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
.registerTempTable("nestedOrder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,13 @@
package org.apache.spark.sql.sources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfter

abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
// Case sensitivity is not configurable yet, but we want to test some edge cases.
// TODO: Remove when it is configurable
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) {
@transient
override protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
PreInsertCastAndRename ::
Nil
// We want to test some edge cases.
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext)

override val extendedCheckRules = Seq(
sources.PreWriteCheck(catalog)
)
}
}
caseInsensisitiveContext.setConf(SQLConf.CASE_SENSITIVE, "false")
}

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, Query
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -329,7 +330,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
Expand All @@ -350,6 +351,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[hive] class SQLSession extends super.SQLSession {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
override def caseSensitiveAnalysis: Boolean =
getConf(SQLConf.CASE_SENSITIVE, "false").toBoolean
}

/**
Expand Down
Loading

0 comments on commit 6dad76e

Please sign in to comment.