Skip to content

Commit

Permalink
[SPARK-13626] [core] Avoid duplicate config deprecation warnings.
Browse files Browse the repository at this point in the history
Three different things were needed to get rid of spurious warnings:
- silence deprecation warnings when cloning configuration
- change the way SparkHadoopUtil instantiates SparkConf to silence
  warnings
- avoid creating new SparkConf instances where it's not needed.

On top of that, I changed the way that Logging.scala detects the repl;
now it uses a method that is overridden in the repl's Main class, and
the hack in Utils.scala is not needed anymore. This makes the 2.11 repl
behave like the 2.10 one and set the default log level to WARN, which
is a lot better. Previously, this wasn't working because the 2.11 repl
triggers log initialization earlier than the 2.10 one.

I also removed and simplified some other code in the 2.11 repl's Main
to avoid replicating logic that already exists elsewhere in Spark.

Last but not least, fixed a compilation bug in a test for Scala 2.10.

Tested the 2.11 repl in local and yarn modes.
  • Loading branch information
Marcelo Vanzin committed Mar 4, 2016
1 parent 511d492 commit c5338f6
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 51 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ private[spark] trait Logging {
log_
}

protected def isInterpreter: Boolean = false

// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
Expand Down Expand Up @@ -127,11 +129,11 @@ private[spark] trait Logging {
}
}

if (Utils.isInInterpreter) {
if (isInterpreter) {
// Use the repl's main class to define the default log level when running the shell,
// overriding the root logger's config if they're different.
val rootLogger = LogManager.getRootLogger()
val replLogger = LogManager.getLogger("org.apache.spark.repl.Main")
val replLogger = LogManager.getLogger(logName)
val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
if (replLevel != rootLogger.getEffectiveLevel()) {
System.err.printf("Setting default log level to \"%s\".\n", replLevel)
Expand Down
23 changes: 19 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
*
* @param loadDefaults whether to also load values from Java system properties
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
class SparkConf private[spark] (loadDefaults: Boolean) extends Cloneable with Logging {

import SparkConf._

Expand All @@ -55,21 +55,32 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private val settings = new ConcurrentHashMap[String, String]()

if (loadDefaults) {
loadFromSystemProperties(false)
}

private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
set(key, value, silent)
}
this
}

/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}

private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
logDeprecationWarning(key)
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
Expand Down Expand Up @@ -355,7 +366,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Copy this object */
override def clone: SparkConf = {
new SparkConf(false).setAll(getAll)
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
private val sparkConf = new SparkConf()
private val sparkConf = new SparkConf(false).loadFromSystemProperties(true)
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

Expand Down
9 changes: 0 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1819,15 +1819,6 @@ private[spark] object Utils extends Logging {
}
}

lazy val isInInterpreter: Boolean = {
try {
val interpClass = classForName("org.apache.spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
case _: ClassNotFoundException => false
}
}

/**
* Return a well-formed URI for the file described by a user input string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
val resAfter = captor.getValue
val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
assert(resSizeBefore.contains(0L))
assert(resSizeBefore === Some(0L))
assert(resSizeAfter.exists(_.toString.toLong > 0L))
}

Expand Down
10 changes: 9 additions & 1 deletion repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ package org.apache.spark.repl

import scala.collection.mutable.Set

object Main {
import org.apache.spark.Logging

object Main extends Logging {

// Force log initialization to pick up the repl-specific settings.
logTrace("Initializing Spark REPL...")

override protected def isInterpreter: Boolean = true

private var _interp: SparkILoop = _

def interp = _interp
Expand Down
35 changes: 10 additions & 25 deletions repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.apache.spark.sql.SQLContext

object Main extends Logging {

// Force log initialization to pick up the repl-specific settings.
logTrace("Initializing Spark REPL...")

val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
Expand All @@ -38,6 +41,8 @@ object Main extends Logging {

private var hasErrors = false

override protected def isInterpreter: Boolean = true

private def scalaOptionError(msg: String): Unit = {
hasErrors = true
Console.err.println(msg)
Expand All @@ -50,39 +55,27 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = conf.getOption("spark.jars")
.map(_.replace(",", File.pathSeparator))
.getOrElse("")
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)
"-classpath", jars
) ++ args.toList

val settings = new GenericRunnerSettings(scalaOptionError)
settings.processArguments(interpArguments, true)

if (!hasErrors) {
if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
Option(sparkContext).map(_.stop)
}
}

def getAddedJars: Array[String] = {
val envJars = sys.env.get("ADD_JARS")
if (envJars.isDefined) {
logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
}
val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
val jars = propJars.orElse(envJars).getOrElse("")
Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
}

def createSparkContext(): SparkContext = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
val jars = getAddedJars
val conf = new SparkConf()
.setMaster(getMaster)
.setJars(jars)
.setIfMissing("spark.app.name", "Spark shell")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
Expand Down Expand Up @@ -115,12 +108,4 @@ object Main extends Logging {
sqlContext
}

private def getMaster: String = {
val master = {
val envMaster = sys.env.get("MASTER")
val propMaster = sys.props.get("spark.master")
propMaster.orElse(envMaster).getOrElse("local[*]")
}
master
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class HiveContext private[hive](
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
val loader = new IsolatedClientLoader(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
sparkConf = sc.conf,
execJars = Seq(),
config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false,
Expand Down Expand Up @@ -276,6 +277,7 @@ class HiveContext private[hive](
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
Expand All @@ -288,6 +290,7 @@ class HiveContext private[hive](
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sc.conf,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
Expand Down Expand Up @@ -315,6 +318,7 @@ class HiveContext private[hive](
s"using ${jars.mkString(":")}")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
*/
private[hive] class HiveClientImpl(
override val version: HiveVersion,
sparkConf: SparkConf,
config: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
Expand Down Expand Up @@ -90,7 +91,6 @@ private[hive] class HiveClientImpl(
// instance of SparkConf is needed for the original value of spark.yarn.keytab
// and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
// keytab configuration for the link name in distributed cache
val sparkConf = new SparkConf
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
val principalName = sparkConf.get("spark.yarn.principal")
val keytabFileName = sparkConf.get("spark.yarn.keytab")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
Expand All @@ -41,6 +41,7 @@ private[hive] object IsolatedClientLoader extends Logging {
def forVersion(
hiveMetastoreVersion: String,
hadoopVersion: String,
sparkConf: SparkConf,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -75,7 +76,8 @@ private[hive] object IsolatedClientLoader extends Logging {
}

new IsolatedClientLoader(
version = hiveVersion(hiveMetastoreVersion),
hiveVersion(hiveMetastoreVersion),
sparkConf,
execJars = files,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
Expand Down Expand Up @@ -146,6 +148,7 @@ private[hive] object IsolatedClientLoader extends Logging {
*/
private[hive] class IsolatedClientLoader(
val version: HiveVersion,
val sparkConf: SparkConf,
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
Expand Down Expand Up @@ -235,7 +238,7 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
return new HiveClientImpl(version, config, baseClassLoader, this)
return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -246,7 +249,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, config, classLoader, this)
.newInstance(version, sparkConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
import org.apache.spark.util.Utils


/**
* Test suite for the [[HiveCatalog]].
*/
Expand All @@ -32,7 +32,8 @@ class HiveCatalogSuite extends CatalogTestCases {
private val client: HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion).createClient()
hadoopVersion = VersionInfo.getVersion,
sparkConf = new SparkConf()).createClient()
}

protected override val tableInputFormat: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
Expand All @@ -39,6 +39,8 @@ import org.apache.spark.util.Utils
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {

private val sparkConf = new SparkConf()

// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
private val ivyPath: Option[String] = {
Expand All @@ -59,6 +61,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
val badClient = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
val db = new CatalogDatabase("default", "desc", "loc", Map())
Expand Down Expand Up @@ -93,6 +96,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand All @@ -112,6 +116,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand Down

0 comments on commit c5338f6

Please sign in to comment.