Skip to content

Commit

Permalink
[SPARK-13626][CORE] Avoid duplicate config deprecation warnings.
Browse files Browse the repository at this point in the history
Three different things were needed to get rid of spurious warnings:
- silence deprecation warnings when cloning configuration
- change the way SparkHadoopUtil instantiates SparkConf to silence
  warnings
- avoid creating new SparkConf instances where it's not needed.

On top of that, I changed the way that Logging.scala detects the repl;
now it uses a method that is overridden in the repl's Main class, and
the hack in Utils.scala is not needed anymore. This makes the 2.11 repl
behave like the 2.10 one and set the default log level to WARN, which
is a lot better. Previously, this wasn't working because the 2.11 repl
triggers log initialization earlier than the 2.10 one.

I also removed and simplified some other code in the 2.11 repl's Main
to avoid replicating logic that already exists elsewhere in Spark.

Tested the 2.11 repl in local and yarn modes.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #11510 from vanzin/SPARK-13626.
  • Loading branch information
Marcelo Vanzin committed Mar 14, 2016
1 parent 38529d8 commit 8301fad
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 56 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] trait Logging {
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
initializeIfNecessary()
initializeLogIfNecessary(false)
log_ = LoggerFactory.getLogger(logName)
}
log_
Expand Down Expand Up @@ -95,17 +95,17 @@ private[spark] trait Logging {
log.isTraceEnabled
}

private def initializeIfNecessary() {
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
initializeLogging()
initializeLogging(isInterpreter)
}
}
}
}

private def initializeLogging() {
private def initializeLogging(isInterpreter: Boolean): Unit = {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
Expand All @@ -127,11 +127,11 @@ private[spark] trait Logging {
}
}

if (Utils.isInInterpreter) {
if (isInterpreter) {
// Use the repl's main class to define the default log level when running the shell,
// overriding the root logger's config if they're different.
val rootLogger = LogManager.getRootLogger()
val replLogger = LogManager.getLogger("org.apache.spark.repl.Main")
val replLogger = LogManager.getLogger(logName)
val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
if (replLevel != rootLogger.getEffectiveLevel()) {
System.err.printf("Setting default log level to \"%s\".\n", replLevel)
Expand Down
23 changes: 19 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
*
* @param loadDefaults whether to also load values from Java system properties
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
class SparkConf private[spark] (loadDefaults: Boolean) extends Cloneable with Logging {

import SparkConf._

Expand All @@ -57,21 +57,32 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private val settings = new ConcurrentHashMap[String, String]()

if (loadDefaults) {
loadFromSystemProperties(false)
}

private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
set(key, value, silent)
}
this
}

/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}

private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
logDeprecationWarning(key)
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
Expand Down Expand Up @@ -395,7 +406,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Copy this object */
override def clone: SparkConf = {
new SparkConf(false).setAll(getAll)
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
private val sparkConf = new SparkConf()
private val sparkConf = new SparkConf(false).loadFromSystemProperties(true)
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

Expand Down
9 changes: 0 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1820,15 +1820,6 @@ private[spark] object Utils extends Logging {
}
}

lazy val isInInterpreter: Boolean = {
try {
val interpClass = classForName("org.apache.spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
case _: ClassNotFoundException => false
}
}

/**
* Return a well-formed URI for the file described by a user input string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package org.apache.spark.repl

import scala.collection.mutable.Set

object Main {
import org.apache.spark.Logging

object Main extends Logging {

initializeLogIfNecessary(true)

private var _interp: SparkILoop = _

def interp = _interp
Expand Down
32 changes: 7 additions & 25 deletions repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.SQLContext

object Main extends Logging {

initializeLogIfNecessary(true)

val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
Expand All @@ -50,39 +52,27 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = conf.getOption("spark.jars")
.map(_.replace(",", File.pathSeparator))
.getOrElse("")
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)
"-classpath", jars
) ++ args.toList

val settings = new GenericRunnerSettings(scalaOptionError)
settings.processArguments(interpArguments, true)

if (!hasErrors) {
if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
Option(sparkContext).map(_.stop)
}
}

def getAddedJars: Array[String] = {
val envJars = sys.env.get("ADD_JARS")
if (envJars.isDefined) {
logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
}
val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
val jars = propJars.orElse(envJars).getOrElse("")
Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
}

def createSparkContext(): SparkContext = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
val jars = getAddedJars
val conf = new SparkConf()
.setMaster(getMaster)
.setJars(jars)
.setIfMissing("spark.app.name", "Spark shell")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
Expand Down Expand Up @@ -115,12 +105,4 @@ object Main extends Logging {
sqlContext
}

private def getMaster: String = {
val master = {
val envMaster = sys.env.get("MASTER")
val propMaster = sys.props.get("spark.master")
propMaster.orElse(envMaster).getOrElse("local[*]")
}
master
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class ReplSuite extends SparkFunSuite {
val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)

System.setProperty("spark.master", master)
Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))
Main.conf.set("spark.master", master)
Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))

if (oldExecutorClasspath != null) {
System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class HiveContext private[hive](
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
val loader = new IsolatedClientLoader(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
sparkConf = sc.conf,
execJars = Seq(),
config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false,
Expand Down Expand Up @@ -278,6 +279,7 @@ class HiveContext private[hive](
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
Expand All @@ -290,6 +292,7 @@ class HiveContext private[hive](
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sc.conf,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
Expand Down Expand Up @@ -317,6 +320,7 @@ class HiveContext private[hive](
s"using ${jars.mkString(":")}")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
*/
private[hive] class HiveClientImpl(
override val version: HiveVersion,
sparkConf: SparkConf,
config: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
Expand Down Expand Up @@ -90,7 +91,6 @@ private[hive] class HiveClientImpl(
// instance of SparkConf is needed for the original value of spark.yarn.keytab
// and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
// keytab configuration for the link name in distributed cache
val sparkConf = new SparkConf
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
val principalName = sparkConf.get("spark.yarn.principal")
val keytabFileName = sparkConf.get("spark.yarn.keytab")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
Expand All @@ -41,6 +41,7 @@ private[hive] object IsolatedClientLoader extends Logging {
def forVersion(
hiveMetastoreVersion: String,
hadoopVersion: String,
sparkConf: SparkConf,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -75,7 +76,8 @@ private[hive] object IsolatedClientLoader extends Logging {
}

new IsolatedClientLoader(
version = hiveVersion(hiveMetastoreVersion),
hiveVersion(hiveMetastoreVersion),
sparkConf,
execJars = files,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
Expand Down Expand Up @@ -146,6 +148,7 @@ private[hive] object IsolatedClientLoader extends Logging {
*/
private[hive] class IsolatedClientLoader(
val version: HiveVersion,
val sparkConf: SparkConf,
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
Expand Down Expand Up @@ -235,7 +238,7 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
return new HiveClientImpl(version, config, baseClassLoader, this)
return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -246,7 +249,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, config, classLoader, this)
.newInstance(version, sparkConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
import org.apache.spark.util.Utils


/**
* Test suite for the [[HiveCatalog]].
*/
Expand All @@ -32,7 +32,8 @@ class HiveCatalogSuite extends CatalogTestCases {
private val client: HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion).createClient()
hadoopVersion = VersionInfo.getVersion,
sparkConf = new SparkConf()).createClient()
}

protected override val tableInputFormat: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
Expand All @@ -39,6 +39,8 @@ import org.apache.spark.util.Utils
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {

private val sparkConf = new SparkConf()

// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
private val ivyPath: Option[String] = {
Expand All @@ -59,6 +61,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
val badClient = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
val db = new CatalogDatabase("default", "desc", "loc", Map())
Expand Down Expand Up @@ -93,6 +96,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand All @@ -112,6 +116,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand Down

0 comments on commit 8301fad

Please sign in to comment.