Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13626] [core] Avoid duplicate config deprecation warnings. #11510

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] trait Logging {
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
initializeIfNecessary()
initializeLogIfNecessary(false)
log_ = LoggerFactory.getLogger(logName)
}
log_
Expand Down Expand Up @@ -95,17 +95,17 @@ private[spark] trait Logging {
log.isTraceEnabled
}

private def initializeIfNecessary() {
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
initializeLogging()
initializeLogging(isInterpreter)
}
}
}
}

private def initializeLogging() {
private def initializeLogging(isInterpreter: Boolean): Unit = {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
Expand All @@ -127,11 +127,11 @@ private[spark] trait Logging {
}
}

if (Utils.isInInterpreter) {
if (isInterpreter) {
// Use the repl's main class to define the default log level when running the shell,
// overriding the root logger's config if they're different.
val rootLogger = LogManager.getRootLogger()
val replLogger = LogManager.getLogger("org.apache.spark.repl.Main")
val replLogger = LogManager.getLogger(logName)
val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
if (replLevel != rootLogger.getEffectiveLevel()) {
System.err.printf("Setting default log level to \"%s\".\n", replLevel)
Expand Down
23 changes: 19 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
*
* @param loadDefaults whether to also load values from Java system properties
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
class SparkConf private[spark] (loadDefaults: Boolean) extends Cloneable with Logging {

import SparkConf._

Expand All @@ -55,21 +55,32 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private val settings = new ConcurrentHashMap[String, String]()

if (loadDefaults) {
loadFromSystemProperties(false)
}

private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
set(key, value, silent)
}
this
}

/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}

private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
logDeprecationWarning(key)
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
Expand Down Expand Up @@ -355,7 +366,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Copy this object */
override def clone: SparkConf = {
new SparkConf(false).setAll(getAll)
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
private val sparkConf = new SparkConf()
private val sparkConf = new SparkConf(false).loadFromSystemProperties(true)
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

Expand Down
9 changes: 0 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1819,15 +1819,6 @@ private[spark] object Utils extends Logging {
}
}

lazy val isInInterpreter: Boolean = {
try {
val interpClass = classForName("org.apache.spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
case _: ClassNotFoundException => false
}
}

/**
* Return a well-formed URI for the file described by a user input string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package org.apache.spark.repl

import scala.collection.mutable.Set

object Main {
import org.apache.spark.Logging

object Main extends Logging {

initializeLogIfNecessary(true)

private var _interp: SparkILoop = _

def interp = _interp
Expand Down
32 changes: 7 additions & 25 deletions repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.SQLContext

object Main extends Logging {

initializeLogIfNecessary(true)

val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
Expand All @@ -50,39 +52,27 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = conf.getOption("spark.jars")
.map(_.replace(",", File.pathSeparator))
.getOrElse("")
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)
"-classpath", jars
) ++ args.toList

val settings = new GenericRunnerSettings(scalaOptionError)
settings.processArguments(interpArguments, true)

if (!hasErrors) {
if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
Option(sparkContext).map(_.stop)
}
}

def getAddedJars: Array[String] = {
val envJars = sys.env.get("ADD_JARS")
if (envJars.isDefined) {
logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
}
val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
val jars = propJars.orElse(envJars).getOrElse("")
Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
}

def createSparkContext(): SparkContext = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
val jars = getAddedJars
val conf = new SparkConf()
.setMaster(getMaster)
.setJars(jars)
.setIfMissing("spark.app.name", "Spark shell")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
Expand Down Expand Up @@ -115,12 +105,4 @@ object Main extends Logging {
sqlContext
}

private def getMaster: String = {
val master = {
val envMaster = sys.env.get("MASTER")
val propMaster = sys.props.get("spark.master")
propMaster.orElse(envMaster).getOrElse("local[*]")
}
master
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class ReplSuite extends SparkFunSuite {
val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)

System.setProperty("spark.master", master)
Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))
Main.conf.set("spark.master", master)
Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))

if (oldExecutorClasspath != null) {
System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class HiveContext private[hive](
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
val loader = new IsolatedClientLoader(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
sparkConf = sc.conf,
execJars = Seq(),
config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false,
Expand Down Expand Up @@ -277,6 +278,7 @@ class HiveContext private[hive](
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
Expand All @@ -289,6 +291,7 @@ class HiveContext private[hive](
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sc.conf,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
Expand Down Expand Up @@ -316,6 +319,7 @@ class HiveContext private[hive](
s"using ${jars.mkString(":")}")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
*/
private[hive] class HiveClientImpl(
override val version: HiveVersion,
sparkConf: SparkConf,
config: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
Expand Down Expand Up @@ -90,7 +91,6 @@ private[hive] class HiveClientImpl(
// instance of SparkConf is needed for the original value of spark.yarn.keytab
// and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
// keytab configuration for the link name in distributed cache
val sparkConf = new SparkConf
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
val principalName = sparkConf.get("spark.yarn.principal")
val keytabFileName = sparkConf.get("spark.yarn.keytab")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
Expand All @@ -41,6 +41,7 @@ private[hive] object IsolatedClientLoader extends Logging {
def forVersion(
hiveMetastoreVersion: String,
hadoopVersion: String,
sparkConf: SparkConf,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -75,7 +76,8 @@ private[hive] object IsolatedClientLoader extends Logging {
}

new IsolatedClientLoader(
version = hiveVersion(hiveMetastoreVersion),
hiveVersion(hiveMetastoreVersion),
sparkConf,
execJars = files,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
Expand Down Expand Up @@ -146,6 +148,7 @@ private[hive] object IsolatedClientLoader extends Logging {
*/
private[hive] class IsolatedClientLoader(
val version: HiveVersion,
val sparkConf: SparkConf,
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
Expand Down Expand Up @@ -235,7 +238,7 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
return new HiveClientImpl(version, config, baseClassLoader, this)
return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -246,7 +249,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, config, classLoader, this)
.newInstance(version, sparkConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
import org.apache.spark.util.Utils


/**
* Test suite for the [[HiveCatalog]].
*/
Expand All @@ -32,7 +32,8 @@ class HiveCatalogSuite extends CatalogTestCases {
private val client: HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion).createClient()
hadoopVersion = VersionInfo.getVersion,
sparkConf = new SparkConf()).createClient()
}

protected override val tableInputFormat: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
Expand All @@ -39,6 +39,8 @@ import org.apache.spark.util.Utils
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {

private val sparkConf = new SparkConf()

// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
private val ivyPath: Option[String] = {
Expand All @@ -59,6 +61,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
val badClient = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
val db = new CatalogDatabase("default", "desc", "loc", Map())
Expand Down Expand Up @@ -93,6 +96,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand All @@ -112,6 +116,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand Down