Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState #12463

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
Expand All @@ -63,17 +63,14 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager,
@transient private[sql] val listener: SQLListener,
val isRootContext: Boolean,
@transient private[sql] val externalCatalog: ExternalCatalog)
@transient protected[sql] val sharedState: SharedState,
val isRootContext: Boolean)
extends Logging with Serializable {

self =>

def this(sc: SparkContext) = {
this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
this(new SharedState(sc), true)
}

def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
Expand All @@ -100,20 +97,20 @@ class SQLContext private[sql](
}
}

def sparkContext: SparkContext = sharedState.sparkContext

protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
protected[sql] def listener: SQLListener = sharedState.listener
protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog

/**
* Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
* registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
* Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
* tables, registered functions, but sharing the same [[SparkContext]], cached data and
* other things.
*
* @since 1.6.0
*/
def newSession(): SQLContext = {
new SQLContext(
sparkContext = sparkContext,
cacheManager = cacheManager,
listener = listener,
isRootContext = false,
externalCatalog = externalCatalog)
}
def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)

/**
* Per-session state, e.g. configuration, functions, temporary tables etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.util.ExecutionListenerManager

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.SQLListener


/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
private[sql] class SharedState(val sparkContext: SparkContext) {

/**
* Class for caching query results reused in future executions.
*/
val cacheManager: CacheManager = new CacheManager

/**
* A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
*/
val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext)

/**
* A catalog that interacts with external systems.
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog

}
51 changes: 15 additions & 36 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -63,32 +61,14 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class HiveContext private[hive](
sc: SparkContext,
cacheManager: CacheManager,
listener: SQLListener,
@transient private[hive] val executionHive: HiveClientImpl,
@transient private[hive] val metadataHive: HiveClient,
isRootContext: Boolean,
@transient private[sql] val hiveCatalog: HiveExternalCatalog)
extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
self =>
@transient protected[hive] val hiveSharedState: HiveSharedState,
override val isRootContext: Boolean)
extends SQLContext(hiveSharedState, isRootContext) with Logging {

private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
this(
sc,
new CacheManager,
SQLContext.createListenerAndUI(sc),
execHive,
metaHive,
true,
new HiveExternalCatalog(metaHive))
}
self =>

def this(sc: SparkContext) = {
this(
sc,
HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
this(new HiveSharedState(sc), true)
}

def this(sc: JavaSparkContext) = this(sc.sc)
Expand All @@ -103,19 +83,16 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
new HiveContext(
sc = sc,
cacheManager = cacheManager,
listener = listener,
executionHive = executionHive.newSession(),
metadataHive = metadataHive.newSession(),
isRootContext = false,
hiveCatalog = hiveCatalog)
new HiveContext(hiveSharedState, isRootContext = false)
}

@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)

protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog
protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive
protected[hive] def metadataHive: HiveClient = sessionState.metadataHive

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
Expand Down Expand Up @@ -159,7 +136,7 @@ class HiveContext private[hive](
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)

protected[hive] def hiveThriftServerSingleSession: Boolean =
sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false)

@transient
protected[sql] lazy val substitutor = new VariableSubstitution()
Expand Down Expand Up @@ -527,7 +504,9 @@ private[hive] object HiveContext extends Logging {
* The version of the Hive client that is used here must match the metastore that is configured
* in the hive-site.xml file.
*/
private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
val configurations = hiveClientConfigurations(hiveConf)
newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.sql.hive

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.execution.{python, SparkPlanner}
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.hive.execution.HiveSqlParser
import org.apache.spark.sql.internal.{SessionState, SQLConf}

Expand All @@ -31,6 +32,16 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
*/
private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {

/**
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()

/**
* A Hive client used for interacting with the metastore.
*/
val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession()

override lazy val conf: SQLConf = new SQLConf {
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.internal.SharedState


/**
* A class that holds all state shared across sessions in a given [[HiveContext]].
*/
private[hive] class HiveSharedState(override val sparkContext: SparkContext)
extends SharedState(sparkContext) {

// TODO: just share the IsolatedClientLoader instead of the client instances themselves

/**
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = {
HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
}

/**
* A Hive client used to interact with the metastore.
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
lazy val metadataHive: HiveClient = {
HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
}

/**
* A catalog that interacts with the Hive metastore.
*/
override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)

}
Loading