Skip to content

Commit

Permalink
solve conflict with master
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Feb 21, 2015
2 parents 5b0a42c + 39e369c commit 12eca9a
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import scala.collection.immutable

private[spark] object CatalystConf{
val CASE_SENSITIVE = "spark.sql.caseSensitive"
}

private[spark] trait CatalystConf {
def setConf(key: String, value: String) : Unit
def getConf(key: String) : String
def getConf(key: String, defaultValue: String) : String
def getAllConfs: immutable.Map[String, String]
}

/**
* A trivial conf that is empty. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyConf extends CatalystConf {
def setConf(key: String, value: String) : Unit = {
throw new UnsupportedOperationException
}

def getConf(key: String) : String = {
throw new UnsupportedOperationException
}

def getConf(key: String, defaultValue: String) : String = {
throw new UnsupportedOperationException
}

def getAllConfs: immutable.Map[String, String] = {
throw new UnsupportedOperationException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.test.SimpleConf
import org.apache.spark.sql.types._

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyser needs only to resolve attribute
* references.
*/
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleConf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand All @@ -39,11 +40,17 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true
*/
class Analyzer(catalog: Catalog,
registry: FunctionRegistry,
caseSensitive: Boolean,
conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution
def resolver: Resolver = {
if (conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}

val fixedPoint = FixedPoint(maxIterations)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf

/**
* Thrown by a catalog when a table cannot be found. The analzyer will rethrow the exception
Expand All @@ -32,7 +34,7 @@ class NoSuchTableException extends Exception
*/
trait Catalog {

def caseSensitive: Boolean
val conf: CatalystConf

def tableExists(tableIdentifier: Seq[String]): Boolean

Expand All @@ -55,7 +57,7 @@ trait Catalog {
def unregisterAllTables(): Unit

protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (!caseSensitive) {
if (!conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) {
tableIdentifier.map(_.toLowerCase)
} else {
tableIdentifier
Expand All @@ -76,7 +78,7 @@ trait Catalog {
}
}

class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()

override def registerTable(
Expand Down Expand Up @@ -162,7 +164,7 @@ trait OverrideCatalog extends Catalog {
}

abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
val dbName = if (!caseSensitive) {
val dbName = if (!conf.getConf(CatalystConf.CASE_SENSITIVE).toBoolean) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else {
databaseName
Expand Down Expand Up @@ -205,7 +207,7 @@ trait OverrideCatalog extends Catalog {
*/
object EmptyCatalog extends Catalog {

val caseSensitive: Boolean = true
override val conf: CatalystConf = EmptyConf

def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.test

import org.apache.spark.sql.catalyst.CatalystConf

import scala.collection.immutable
import scala.collection.mutable

/** A CatalystConf that can be used for local testing. */
class SimpleConf extends CatalystConf{
val map = mutable.Map[String, String]()

def setConf(key: String, value: String) : Unit = {
map.put(key, value)
}
def getConf(key: String) : String ={
map.get(key).get
}
def getConf(key: String, defaultValue: String) : String = {
map.getOrElse(key, defaultValue)
}
def getAllConfs: immutable.Map[String, String] = {
map.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,22 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.test.SimpleConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseSensitiveCatalog = new SimpleCatalog(true)
val caseInsensitiveCatalog = new SimpleCatalog(false)
val caseSensitiveConf = new SimpleConf()
caseSensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "true")
val caseInsensitiveConf = new SimpleConf()
caseInsensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "false")
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
val caseSensitiveAnalyze =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf)
val caseInsensitiveAnalyze =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf)

val testRelation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val testRelation2 = LocalRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Project, LocalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.test.SimpleConf
import org.scalatest.{BeforeAndAfter, FunSuite}

class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val catalog = new SimpleCatalog(false)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false)
val conf = new SimpleConf
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)

val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
Expand Down
6 changes: 4 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.CatalystConf

import scala.collection.immutable
import scala.collection.JavaConversions._

Expand Down Expand Up @@ -69,7 +71,8 @@ private[spark] object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
private[sql] class SQLConf extends Serializable {

private[sql] class SQLConf extends Serializable with CatalystConf {
import SQLConf._

/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
Expand Down Expand Up @@ -220,4 +223,3 @@ private[sql] class SQLConf extends Serializable {
settings.clear()
}
}

4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs

@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

@transient
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true)

@transient
protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = true) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
ExtractPythonUdfs ::
sources.PreWriteCheck(catalog) ::
Expand Down
11 changes: 10 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}

Expand Down Expand Up @@ -1049,4 +1049,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
rdd.toDF().registerTempTable("distinctData")
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}

test("SPARK-4699 case sensitivity SQL query") {
setConf(CatalystConf.CASE_SENSITIVE, "false")
val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.toDF().registerTempTable("testTable1")
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
setConf(CatalystConf.CASE_SENSITIVE, "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,13 @@
package org.apache.spark.sql.sources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfter

abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
// Case sensitivity is not configurable yet, but we want to test some edge cases.
// TODO: Remove when it is configurable
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) {
@transient
override protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
PreWriteCheck(catalog) ::
PreInsertCastAndRename ::
Nil
}
}
}
// We want to test some edge cases.
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext)

caseInsensisitiveContext.setConf(CatalystConf.CASE_SENSITIVE, "false")
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand}
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.types._

/**
Expand All @@ -53,6 +54,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
}

/* By default it should be case insensitive to match Hive */
conf.setConf(CatalystConf.CASE_SENSITIVE, "false")

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
Expand Down Expand Up @@ -249,7 +253,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
override protected[sql] lazy val catalog =
new HiveMetastoreCatalog(this, conf) with OverrideCatalog

// Note that HiveUDFs will be overridden by functions registered in this context.
@transient
Expand All @@ -261,7 +266,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.util.Utils

/* Implicit conversions */
import scala.collection.JavaConversions._

private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
private[hive] class HiveMetastoreCatalog(hive: HiveContext, val conf: CatalystConf)
extends Catalog with Logging {
import org.apache.spark.sql.hive.HiveMetastoreTypes._

/** Connection to hive metastore. Usages should lock on `this`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ class SQLQuerySuite extends QueryTest {
}
}

test("SPARK-4699 HiveContext should be case insensitive by default") {
checkAnswer(
sql("SELECT KEY FROM Src ORDER BY value"),
sql("SELECT key FROM src ORDER BY value").collect().toSeq)
}

test("SPARK-5284 Insert into Hive throws NPE when a inner complex type field has a null value") {
val schema = StructType(
StructField("s",
Expand Down

0 comments on commit 12eca9a

Please sign in to comment.