From bc72868f5e731b20eb2abeacec77203f019f21cd Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 8 May 2015 01:10:59 -0700 Subject: [PATCH 01/10] Added SQLContext.getOrCreate --- .../org/apache/spark/sql/SQLContext.scala | 53 ++++++++++++- .../apache/spark/sql/SQLContextSuite.scala | 78 +++++++++++++++++++ 2 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 7eabb93c1e3d6..11de008f9fe25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.beans.Introspector import java.util.Properties +import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConversions._ import scala.collection.immutable @@ -45,7 +46,7 @@ import org.apache.spark.sql.json._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.{SparkConf, Partition, SparkContext} /** * Currently we support the default dialect named "sql", associated with the class @@ -1272,6 +1273,56 @@ class SQLContext(@transient val sparkContext: SparkContext) } } + SQLContext.setLastInstantiatedContext(self) +} + +/** + * This SQLContext object contains utility functions to create a singleton SQLContext instance, + * or to get the last created SQLContext instance. + */ +object SQLContext { + + private val INSTANTIATION_LOCK = new Object() + + /** + * Reference to the last created SQLContext. + */ + @transient private val lastInstantiatedContext = new AtomicReference[SQLContext]() + + /** + * Get the singleton SQLContext if it exists or create a new one using the given configuration. + * This function can be used to create a singleton SQLContext object that can be shared across + * the JVM. + */ + def getOrCreate(config: SparkConf): SQLContext = { + getOrCreate(SparkContext.getOrCreate(config)) + } + + /** + * Get the singleton SQLContext if it exists or create a new one using the given configuration. + * This function can be used to create a singleton SQLContext object that can be shared across + * the JVM. + */ + def getOrCreate(sparkContext: SparkContext): SQLContext = { + INSTANTIATION_LOCK.synchronized { + if (lastInstantiatedContext.get() == null) { + new SQLContext(sparkContext) + } + } + lastInstantiatedContext.get() + } + + private[sql] def clearLastInstantiatedContext(): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(null) + } + } + + private def setLastInstantiatedContext(sqlContext: SQLContext): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(sqlContext) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala new file mode 100644 index 0000000000000..3c321d1550e39 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -0,0 +1,78 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.{SparkConf, SparkContext} + +class SQLContextSuite extends FunSuite with BeforeAndAfter { + private val sparkConf = new SparkConf() + .setMaster("local") + .setAppName("SQLContextSuite") + .clone.set("newContext", "true") + + private var sparkContext: SparkContext = null + private var sqlContext: SQLContext = null + + before { + SQLContext.clearLastInstantiatedContext() + SparkContext.clearActiveContext() + } + + after { + if (sqlContext != null) { + sparkContext = sqlContext.sparkContext + sqlContext = null + } + if (sparkContext != null) { + sparkContext.stop() + sparkContext = null + } + } + + test("getOrCreate instantiates SQLContext from SparkConf") { + sqlContext = SQLContext.getOrCreate(sparkConf) + assert(sqlContext != null, "SQLContext not created") + assert(sqlContext.sparkContext != null, "SparkContext not created") + assert(sqlContext.sparkContext.conf.getBoolean("newContext", false), + "Provided conf not used to create SparkContext") + assert(SQLContext.getOrCreate(sparkConf).eq(sqlContext), + "Created SQLContext not saved as singleton") + } + + test("getOrCreate instantiates SQLContext from SparkContext") { + sparkContext = new SparkContext(sparkConf) + sqlContext = SQLContext.getOrCreate(sparkConf) + assert(sqlContext != null, "SQLContext not created") + assert(sqlContext.sparkContext != null, "SparkContext not passed") + assert(SQLContext.getOrCreate(sparkConf) != null, + "Instantiated SQLContext was not saved, null returned") + assert(SQLContext.getOrCreate(sparkConf).eq(sqlContext), + "Different SQLContext was returned") + } + + test("getOrCreate gets last explicitly instantiated SQLContext") { + sparkContext = new SparkContext + sqlContext = new SQLContext(sparkContext) + assert(SQLContext.getOrCreate(sparkConf) != null, + "Explicitly instantiated SQLContext was not saved, null returned") + assert(SQLContext.getOrCreate(sparkConf).eq(sqlContext), + "Different SQLContext was returned") + } +} From f82ae817f4a6c872405445cf9523a681040ed4fa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 8 May 2015 02:58:42 -0700 Subject: [PATCH 02/10] Fixed test --- .../src/test/scala/org/apache/spark/sql/SQLContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 3c321d1550e39..458aa3bb5b934 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -68,7 +68,7 @@ class SQLContextSuite extends FunSuite with BeforeAndAfter { } test("getOrCreate gets last explicitly instantiated SQLContext") { - sparkContext = new SparkContext + sparkContext = new SparkContext(sparkConf) sqlContext = new SQLContext(sparkContext) assert(SQLContext.getOrCreate(sparkConf) != null, "Explicitly instantiated SQLContext was not saved, null returned") From 83bc9505213987b8c84f09a1605ec072cd5fd680 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 15 May 2015 20:15:44 -0700 Subject: [PATCH 03/10] Updated tests --- .../org/apache/spark/sql/SQLContext.scala | 17 ++--- .../apache/spark/sql/SQLContextSuite.scala | 67 ++++++------------- 2 files changed, 26 insertions(+), 58 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 11de008f9fe25..c0c7c44f08c1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1289,15 +1289,6 @@ object SQLContext { */ @transient private val lastInstantiatedContext = new AtomicReference[SQLContext]() - /** - * Get the singleton SQLContext if it exists or create a new one using the given configuration. - * This function can be used to create a singleton SQLContext object that can be shared across - * the JVM. - */ - def getOrCreate(config: SparkConf): SQLContext = { - getOrCreate(SparkContext.getOrCreate(config)) - } - /** * Get the singleton SQLContext if it exists or create a new one using the given configuration. * This function can be used to create a singleton SQLContext object that can be shared across @@ -1312,13 +1303,19 @@ object SQLContext { lastInstantiatedContext.get() } + private[sql] def getLastInstantiatedContext(): Option[SQLContext] = { + INSTANTIATION_LOCK.synchronized { + Option(lastInstantiatedContext.get()) + } + } + private[sql] def clearLastInstantiatedContext(): Unit = { INSTANTIATION_LOCK.synchronized { lastInstantiatedContext.set(null) } } - private def setLastInstantiatedContext(sqlContext: SQLContext): Unit = { + private[sql] def setLastInstantiatedContext(sqlContext: SQLContext): Unit = { INSTANTIATION_LOCK.synchronized { lastInstantiatedContext.set(sqlContext) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 458aa3bb5b934..f186bc1c18123 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -17,62 +17,33 @@ package org.apache.spark.sql -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.test.TestSQLContext -class SQLContextSuite extends FunSuite with BeforeAndAfter { - private val sparkConf = new SparkConf() - .setMaster("local") - .setAppName("SQLContextSuite") - .clone.set("newContext", "true") +class SQLContextSuite extends FunSuite with BeforeAndAfterAll { - private var sparkContext: SparkContext = null - private var sqlContext: SQLContext = null + private val testSqlContext = TestSQLContext + private val testSparkContext = TestSQLContext.sparkContext - before { - SQLContext.clearLastInstantiatedContext() - SparkContext.clearActiveContext() - } - - after { - if (sqlContext != null) { - sparkContext = sqlContext.sparkContext - sqlContext = null - } - if (sparkContext != null) { - sparkContext.stop() - sparkContext = null - } + override def afterAll(): Unit = { + SQLContext.setLastInstantiatedContext(testSqlContext) } - test("getOrCreate instantiates SQLContext from SparkConf") { - sqlContext = SQLContext.getOrCreate(sparkConf) - assert(sqlContext != null, "SQLContext not created") - assert(sqlContext.sparkContext != null, "SparkContext not created") - assert(sqlContext.sparkContext.conf.getBoolean("newContext", false), - "Provided conf not used to create SparkContext") - assert(SQLContext.getOrCreate(sparkConf).eq(sqlContext), - "Created SQLContext not saved as singleton") - } - - test("getOrCreate instantiates SQLContext from SparkContext") { - sparkContext = new SparkContext(sparkConf) - sqlContext = SQLContext.getOrCreate(sparkConf) - assert(sqlContext != null, "SQLContext not created") - assert(sqlContext.sparkContext != null, "SparkContext not passed") - assert(SQLContext.getOrCreate(sparkConf) != null, - "Instantiated SQLContext was not saved, null returned") - assert(SQLContext.getOrCreate(sparkConf).eq(sqlContext), - "Different SQLContext was returned") + test("getOrCreate instantiates SQLContext") { + SQLContext.clearLastInstantiatedContext() + val sqlContext = SQLContext.getOrCreate(testSparkContext) + assert(sqlContext != null, "SQLContext.getOrCreate returned null") + assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext), + "SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate") } test("getOrCreate gets last explicitly instantiated SQLContext") { - sparkContext = new SparkContext(sparkConf) - sqlContext = new SQLContext(sparkContext) - assert(SQLContext.getOrCreate(sparkConf) != null, - "Explicitly instantiated SQLContext was not saved, null returned") - assert(SQLContext.getOrCreate(sparkConf).eq(sqlContext), - "Different SQLContext was returned") + SQLContext.clearLastInstantiatedContext() + val sqlContext = new SQLContext(testSparkContext) + assert(SQLContext.getOrCreate(testSparkContext) != null, + "SQLContext.getOrCreate after explicitly created SQLContext returned null") + assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext), + "SQLContext.getOrCreate after explicitly created SQLContext did not return the context") } } From d3ea8e421104a6543ab38f1bed586adba23e35aa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 15 May 2015 23:54:59 -0700 Subject: [PATCH 04/10] Added HiveContext --- .../apache/spark/sql/hive/HiveContext.scala | 41 +++++++++++++++ .../spark/sql/hive/HiveContextSuite.scala | 51 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 1d8d0b5c322ad..d836a1f2177d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.io.{BufferedReader, InputStreamReader, PrintStream} import java.sql.Timestamp +import java.util.concurrent.atomic.AtomicReference import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.spark.sql.catalyst.Dialect @@ -483,4 +484,44 @@ private object HiveContext { case (decimal, DecimalType()) => decimal.toString case (other, tpe) if primitiveTypes contains tpe => other.toString } + + + private val INSTANTIATION_LOCK = new Object() + + /** + * Reference to the last created SQLContext. + */ + @transient private val lastInstantiatedContext = new AtomicReference[HiveContext]() + + /** + * Get the singleton SQLContext if it exists or create a new one using the given configuration. + * This function can be used to create a singleton SQLContext object that can be shared across + * the JVM. + */ + def getOrCreate(sparkContext: SparkContext): HiveContext = { + INSTANTIATION_LOCK.synchronized { + if (lastInstantiatedContext.get() == null) { + new SQLContext(sparkContext) + } + } + lastInstantiatedContext.get() + } + + private[hive] def getLastInstantiatedContext(): Option[HiveContext] = { + INSTANTIATION_LOCK.synchronized { + Option(lastInstantiatedContext.get()) + } + } + + private[hive] def clearLastInstantiatedContext(): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(null) + } + } + + private[hive] def setLastInstantiatedContext(hiveContext: HiveContext): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(hiveContext) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala new file mode 100644 index 0000000000000..9d21fbb9aab77 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.hive + + +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +import org.apache.spark.sql.hive.test.TestHive + + +class HiveContextSuite extends FunSuite with BeforeAndAfterAll { + + private val testHiveContext = TestHive + private val testSparkContext = TestHive.sparkContext + + override def afterAll(): Unit = { + HiveContext.setLastInstantiatedContext(testHiveContext) + } + + test("getOrCreate instantiates HiveContext") { + HiveContext.clearLastInstantiatedContext() + val hiveContext = HiveContext.getOrCreate(testSparkContext) + assert(hiveContext != null, "HiveContext.getOrCreate returned null") + assert(HiveContext.getOrCreate(testSparkContext).eq(hiveContext), + "HiveContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate") + } + + test("getOrCreate gets last explicitly instantiated HiveContext") { + HiveContext.clearLastInstantiatedContext() + val hiveContext = new HiveContext(testSparkContext) + assert(HiveContext.getOrCreate(testSparkContext) != null, + "HiveContext.getOrCreate after explicitly created SQLContext returned null") + assert(HiveContext.getOrCreate(testSparkContext).eq(hiveContext), + "HiveContext.getOrCreate after explicitly created SQLContext did not return the context") + } +} From b4e9721c01942e46dae5f2d49b91cf119cfc57f4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 16 May 2015 00:17:29 -0700 Subject: [PATCH 05/10] Remove unnecessary import --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 9f3f3ed9802a7..bb69342c594cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelat import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, Partition, SparkContext} +import org.apache.spark.{Partition, SparkContext} /** * The entry point for working with structured data (rows and columns) in Spark. Allows the From dec559478ae6cd079833969f9ac03d4037e21870 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 16 May 2015 02:18:57 -0700 Subject: [PATCH 06/10] Fixed bug --- .../scala/org/apache/spark/sql/hive/HiveContext.scala | 2 ++ .../org/apache/spark/sql/hive/HiveContextSuite.scala | 11 ++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b68d3f9dc3604..39215259931e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -467,6 +467,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { case _ => super.simpleString } } + + HiveContext.setLastInstantiatedContext(self) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala index 9d21fbb9aab77..033321f8af21e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.test.TestHive @@ -37,15 +38,19 @@ class HiveContextSuite extends FunSuite with BeforeAndAfterAll { val hiveContext = HiveContext.getOrCreate(testSparkContext) assert(hiveContext != null, "HiveContext.getOrCreate returned null") assert(HiveContext.getOrCreate(testSparkContext).eq(hiveContext), - "HiveContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate") + "HiveContext created by HiveContext.getOrCreate not returned by HiveContext.getOrCreate") + assert(SQLContext.getOrCreate(testSparkContext).eq(hiveContext), + "HiveContext created by HiveContext.getOrCreate not returned by SQLContext.getOrCreate") } test("getOrCreate gets last explicitly instantiated HiveContext") { HiveContext.clearLastInstantiatedContext() val hiveContext = new HiveContext(testSparkContext) assert(HiveContext.getOrCreate(testSparkContext) != null, - "HiveContext.getOrCreate after explicitly created SQLContext returned null") + "HiveContext.getOrCreate after explicitly created HiveContext returned null") assert(HiveContext.getOrCreate(testSparkContext).eq(hiveContext), - "HiveContext.getOrCreate after explicitly created SQLContext did not return the context") + "HiveContext.getOrCreate after explicitly created HiveContext did not return the context") + assert(SQLContext.getOrCreate(testSparkContext).eq(hiveContext), + "SQLContext.getOrCreate after explicitly created HiveContext did not return the context") } } From bf8cf50b9263d6f8f2bf5cfb8fb655816a1e5fec Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 16 May 2015 02:25:06 -0700 Subject: [PATCH 07/10] Fix more bug --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 39215259931e1..a27285a0415f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -548,7 +548,7 @@ private[hive] object HiveContext { def getOrCreate(sparkContext: SparkContext): HiveContext = { INSTANTIATION_LOCK.synchronized { if (lastInstantiatedContext.get() == null) { - new SQLContext(sparkContext) + new HiveContext(sparkContext) } } lastInstantiatedContext.get() From 48adb14f04eb4c33977684ec2e0d178f5080a3b5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 20 May 2015 18:19:26 -0700 Subject: [PATCH 08/10] Removed HiveContext.getOrCreate --- .../apache/spark/sql/hive/HiveContext.scala | 76 ++++++++----------- .../spark/sql/hive/HiveContextSuite.scala | 56 -------------- 2 files changed, 30 insertions(+), 102 deletions(-) delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index a27285a0415f5..2733ebdb95bca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.sql.Timestamp -import java.util.concurrent.atomic.AtomicReference import java.util.{ArrayList => JArrayList} import org.apache.hadoop.hive.ql.parse.VariableSubstitution @@ -123,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS, "builtin") + /** + * A comma separated list of class prefixes that should be loaded using the classloader that + * is shared between Spark SQL and a specific version of Hive. An example of classes that should + * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need + * to be shared are those that interact with classes that are already shared. For example, + * custom appenders that are used by log4j. + */ + protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] = + getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes) + .split(",").filterNot(_ == "") + + private def jdbcPrefixes = Seq( + "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",") + + /** + * A comma separated list of class prefixes that should explicitly be reloaded for each version + * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a + * prefix that typically would be shared (i.e. org.apache.spark.*) + */ + protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] = + getConf("spark.sql.hive.metastore.barrierPrefixes", "") + .split(",").filterNot(_ == "") + @transient protected[sql] lazy val substitutor = new VariableSubstitution() @@ -180,12 +202,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { version = metaVersion, execJars = jars.toSeq, config = allConfig, - isolationOn = true) + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { // TODO: Support for loading the jars from an already downloaded location. logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") - IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig ) + IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig) } else { // Convert to files and expand any directories. val jars = @@ -211,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { version = metaVersion, execJars = jars.toSeq, config = allConfig, - isolationOn = true) + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) } isolatedLoader.client } @@ -467,8 +493,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { case _ => super.simpleString } } - - HiveContext.setLastInstantiatedContext(self) } @@ -531,44 +555,4 @@ private[hive] object HiveContext { case (decimal, DecimalType()) => decimal.toString case (other, tpe) if primitiveTypes contains tpe => other.toString } - - - private val INSTANTIATION_LOCK = new Object() - - /** - * Reference to the last created SQLContext. - */ - @transient private val lastInstantiatedContext = new AtomicReference[HiveContext]() - - /** - * Get the singleton SQLContext if it exists or create a new one using the given configuration. - * This function can be used to create a singleton SQLContext object that can be shared across - * the JVM. - */ - def getOrCreate(sparkContext: SparkContext): HiveContext = { - INSTANTIATION_LOCK.synchronized { - if (lastInstantiatedContext.get() == null) { - new HiveContext(sparkContext) - } - } - lastInstantiatedContext.get() - } - - private[hive] def getLastInstantiatedContext(): Option[HiveContext] = { - INSTANTIATION_LOCK.synchronized { - Option(lastInstantiatedContext.get()) - } - } - - private[hive] def clearLastInstantiatedContext(): Unit = { - INSTANTIATION_LOCK.synchronized { - lastInstantiatedContext.set(null) - } - } - - private[hive] def setLastInstantiatedContext(hiveContext: HiveContext): Unit = { - INSTANTIATION_LOCK.synchronized { - lastInstantiatedContext.set(hiveContext) - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala deleted file mode 100644 index 033321f8af21e..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - - -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.test.TestHive - - -class HiveContextSuite extends FunSuite with BeforeAndAfterAll { - - private val testHiveContext = TestHive - private val testSparkContext = TestHive.sparkContext - - override def afterAll(): Unit = { - HiveContext.setLastInstantiatedContext(testHiveContext) - } - - test("getOrCreate instantiates HiveContext") { - HiveContext.clearLastInstantiatedContext() - val hiveContext = HiveContext.getOrCreate(testSparkContext) - assert(hiveContext != null, "HiveContext.getOrCreate returned null") - assert(HiveContext.getOrCreate(testSparkContext).eq(hiveContext), - "HiveContext created by HiveContext.getOrCreate not returned by HiveContext.getOrCreate") - assert(SQLContext.getOrCreate(testSparkContext).eq(hiveContext), - "HiveContext created by HiveContext.getOrCreate not returned by SQLContext.getOrCreate") - } - - test("getOrCreate gets last explicitly instantiated HiveContext") { - HiveContext.clearLastInstantiatedContext() - val hiveContext = new HiveContext(testSparkContext) - assert(HiveContext.getOrCreate(testSparkContext) != null, - "HiveContext.getOrCreate after explicitly created HiveContext returned null") - assert(HiveContext.getOrCreate(testSparkContext).eq(hiveContext), - "HiveContext.getOrCreate after explicitly created HiveContext did not return the context") - assert(SQLContext.getOrCreate(testSparkContext).eq(hiveContext), - "SQLContext.getOrCreate after explicitly created HiveContext did not return the context") - } -} From 79fe069e876a5dcfc148a4079efd63f67b781c4c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 20 May 2015 19:33:38 -0700 Subject: [PATCH 09/10] Added comments. --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index c9eccc499a847..5dbada3585f6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1275,6 +1275,10 @@ class SQLContext(@transient val sparkContext: SparkContext) //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// + + // Register a succesfully instantiatd context to the singleton. This should be at the end of + // the class definition so that the singleton is updated only if there is no exception in the + // construction of the instance. SQLContext.setLastInstantiatedContext(self) } From 25f4da916e4d8218eda37b72fda6f909cfd5d30f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 21 May 2015 11:48:56 -0700 Subject: [PATCH 10/10] Addressed comments. --- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5dbada3585f6a..1ea596dddff02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1296,7 +1296,7 @@ object SQLContext { @transient private val lastInstantiatedContext = new AtomicReference[SQLContext]() /** - * Get the singleton SQLContext if it exists or create a new one using the given configuration. + * Get the singleton SQLContext if it exists or create a new one using the given SparkContext. * This function can be used to create a singleton SQLContext object that can be shared across * the JVM. */ @@ -1309,12 +1309,6 @@ object SQLContext { lastInstantiatedContext.get() } - private[sql] def getLastInstantiatedContext(): Option[SQLContext] = { - INSTANTIATION_LOCK.synchronized { - Option(lastInstantiatedContext.get()) - } - } - private[sql] def clearLastInstantiatedContext(): Unit = { INSTANTIATION_LOCK.synchronized { lastInstantiatedContext.set(null)