Skip to content

Commit

Permalink
[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This patch adds a SharedState that groups state shared across multiple SQLContexts. This is analogous to the SessionState added in SPARK-13526 that groups session-specific state. This cleanup makes the constructors of the contexts simpler and ultimately allows us to remove HiveContext in the near future.

## How was this patch tested?
Existing tests.

Closes #12405

Author: Andrew Or <andrew@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #12447 from yhuai/sharedState.
  • Loading branch information
Andrew Or authored and yhuai committed Apr 16, 2016
1 parent 3f49afe commit 5cefecc
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 122 deletions.
31 changes: 14 additions & 17 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
Expand All @@ -63,17 +63,14 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager,
@transient private[sql] val listener: SQLListener,
val isRootContext: Boolean,
@transient private[sql] val externalCatalog: ExternalCatalog)
@transient protected[sql] val sharedState: SharedState,
val isRootContext: Boolean)
extends Logging with Serializable {

self =>

def this(sc: SparkContext) = {
this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
this(new SharedState(sc), true)
}

def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
Expand All @@ -100,20 +97,20 @@ class SQLContext private[sql](
}
}

def sparkContext: SparkContext = sharedState.sparkContext

protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
protected[sql] def listener: SQLListener = sharedState.listener
protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog

/**
* Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
* registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
* Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
* tables, registered functions, but sharing the same [[SparkContext]], cached data and
* other things.
*
* @since 1.6.0
*/
def newSession(): SQLContext = {
new SQLContext(
sparkContext = sparkContext,
cacheManager = cacheManager,
listener = listener,
isRootContext = false,
externalCatalog = externalCatalog)
}
def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)

/**
* Per-session state, e.g. configuration, functions, temporary tables etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.util.ExecutionListenerManager

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.SQLListener


/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
private[sql] class SharedState(val sparkContext: SparkContext) {

/**
* Class for caching query results reused in future executions.
*/
val cacheManager: CacheManager = new CacheManager

/**
* A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
*/
val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext)

/**
* A catalog that interacts with external systems.
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog

}
51 changes: 15 additions & 36 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -67,32 +65,14 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class HiveContext private[hive](
sc: SparkContext,
cacheManager: CacheManager,
listener: SQLListener,
@transient private[hive] val executionHive: HiveClientImpl,
@transient private[hive] val metadataHive: HiveClient,
isRootContext: Boolean,
@transient private[sql] val hiveCatalog: HiveExternalCatalog)
extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
self =>
@transient protected[hive] val hiveSharedState: HiveSharedState,
override val isRootContext: Boolean)
extends SQLContext(hiveSharedState, isRootContext) with Logging {

private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
this(
sc,
new CacheManager,
SQLContext.createListenerAndUI(sc),
execHive,
metaHive,
true,
new HiveExternalCatalog(metaHive))
}
self =>

def this(sc: SparkContext) = {
this(
sc,
HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
this(new HiveSharedState(sc), true)
}

def this(sc: JavaSparkContext) = this(sc.sc)
Expand All @@ -107,19 +87,16 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
new HiveContext(
sc = sc,
cacheManager = cacheManager,
listener = listener,
executionHive = executionHive.newSession(),
metadataHive = metadataHive.newSession(),
isRootContext = false,
hiveCatalog = hiveCatalog)
new HiveContext(hiveSharedState, isRootContext = false)
}

@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)

protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog
protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive
protected[hive] def metadataHive: HiveClient = sessionState.metadataHive

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
Expand Down Expand Up @@ -163,7 +140,7 @@ class HiveContext private[hive](
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)

protected[hive] def hiveThriftServerSingleSession: Boolean =
sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false)

@transient
protected[sql] lazy val substitutor = new VariableSubstitution()
Expand Down Expand Up @@ -601,7 +578,9 @@ private[hive] object HiveContext extends Logging {
* The version of the Hive client that is used here must match the metastore that is configured
* in the hive-site.xml file.
*/
private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
val configurations = hiveClientConfigurations(hiveConf)
newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.sql.hive

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.execution.{python, SparkPlanner}
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.hive.execution.HiveSqlParser
import org.apache.spark.sql.internal.{SessionState, SQLConf}

Expand All @@ -31,6 +32,16 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
*/
private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {

/**
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()

/**
* A Hive client used for interacting with the metastore.
*/
val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession()

override lazy val conf: SQLConf = new SQLConf {
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.internal.SharedState


/**
* A class that holds all state shared across sessions in a given [[HiveContext]].
*/
private[hive] class HiveSharedState(override val sparkContext: SparkContext)
extends SharedState(sparkContext) {

// TODO: just share the IsolatedClientLoader instead of the client instances themselves

/**
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = {
HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
}

/**
* A Hive client used to interact with the metastore.
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
lazy val metadataHive: HiveClient = {
HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
}

/**
* A catalog that interacts with the Hive metastore.
*/
override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)

}

0 comments on commit 5cefecc

Please sign in to comment.