Skip to content

Commit

Permalink
Implement Andrew's suggestion (import foo.config._).
Browse files Browse the repository at this point in the history
A little bit noisier since package objects cannot be `private[foo]`,
apparently.
  • Loading branch information
Marcelo Vanzin committed Dec 14, 2015
1 parent c858fa8 commit c1de25f
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 301 deletions.
72 changes: 0 additions & 72 deletions core/src/main/scala/org/apache/spark/config/CoreConfigKeys.scala

This file was deleted.

79 changes: 79 additions & 0 deletions core/src/main/scala/org/apache/spark/config/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.launcher.SparkLauncher

package object config {
import ConfigEntry._

private[spark] val DRIVER_CLASS_PATH = stringConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH)
.optional

private[spark] val DRIVER_JAVA_OPTIONS = stringConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)
.optional

private[spark] val DRIVER_LIBRARY_PATH = stringConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)
.optional

private[spark] val DRIVER_USER_CLASS_PATH_FIRST = booleanConf("spark.driver.userClassPathFirst",
defaultValue = Some(false))

private[spark] val EXECUTOR_CLASS_PATH = stringConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH)
.optional

private[spark] val EXECUTOR_JAVA_OPTIONS = stringConf(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
.optional

private[spark] val EXECUTOR_LIBRARY_PATH = stringConf(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH)
.optional

private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
booleanConf("spark.executor.userClassPathFirst", defaultValue = Some(false))

private[spark] val IS_PYTHON_APP = booleanConf("spark.yarn.isPython",
defaultValue = Some(false),
isPublic = false)

private[spark] val CPUS_PER_TASK = intConf("spark.task.cpus",
defaultValue = Some(1))

private[spark] val DYN_ALLOCATION_MIN_EXECUTORS = intConf("spark.dynamicAllocation.minExecutors",
defaultValue = Some(0))

private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
fallbackConf("spark.dynamicAllocation.initialExecutors",
fallback = DYN_ALLOCATION_MIN_EXECUTORS)

private[spark] val DYN_ALLOCATION_MAX_EXECUTORS = intConf("spark.dynamicAllocation.maxExecutors",
defaultValue = Some(Int.MaxValue))

private[spark] val SHUFFLE_SERVICE_ENABLED = booleanConf("spark.shuffle.service.enabled",
defaultValue = Some(false))

private[spark] val KEYTAB = stringConf("spark.yarn.keytab",
doc = "Location of user's keytab.")
.optional

private[spark] val PRINCIPAL = stringConf("spark.yarn.principal",
doc = "Name of the Kerberos principal.")
.optional

private[spark] val EXECUTOR_INSTANCES = intConf("spark.executor.instances").optional

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.config.CoreConfigKeys._
import org.apache.spark.config._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.util.ThreadUtils

/*
Expand All @@ -53,8 +54,6 @@ private[yarn] class AMDelegationTokenRenewer(
sparkConf: SparkConf,
hadoopConf: Configuration) extends Logging {

import YarnConfigKeys._

private var lastCredentialsFileSuffix = 0

private val delegationTokenRenewer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv,
SparkException, SparkUserAppException}
import org.apache.spark.config.CoreConfigKeys._
import org.apache.spark.config._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util._
Expand All @@ -47,8 +48,6 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {

import YarnConfigKeys._

// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
if (args.propertiesFile != null) {
Expand Down
8 changes: 3 additions & 5 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.config.CoreConfigKeys._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.config._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.util.Utils

private[spark] class Client(
Expand All @@ -67,7 +68,6 @@ private[spark] class Client(
extends Logging {

import Client._
import YarnConfigKeys._

def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
Expand Down Expand Up @@ -1043,8 +1043,6 @@ private[spark] class Client(

object Client extends Logging {

import YarnConfigKeys._

def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ package org.apache.spark.deploy.yarn
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.config.CoreConfigKeys._
import org.apache.spark.config._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.util.{IntParam, MemoryParam, Utils}

// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
private[spark] class ClientArguments(
args: Array[String],
sparkConf: SparkConf) {

import YarnConfigKeys._

var addJars: String = null
var files: String = null
var archives: String = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.util.{ThreadUtils, Utils}

import scala.util.control.NonFatal
Expand All @@ -34,7 +35,7 @@ private[spark] class ExecutorDelegationTokenUpdater(

@volatile private var lastCredentialsFileSuffix = 0

private val credentialsFile = sparkConf.get(YarnConfigKeys.CREDENTIALS_FILE_PATH)
private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
private val freshHadoopConf =
SparkHadoopUtil.get.getConfBypassingFSCache(
hadoopConf, new Path(credentialsFile).toUri.getScheme)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.config.CoreConfigKeys._
import org.apache.spark.config._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
Expand All @@ -56,8 +57,6 @@ private[yarn] class ExecutorRunnable(
securityMgr: SecurityManager)
extends Runnable with Logging {

import YarnConfigKeys._

var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.RackResolver

import org.apache.spark.SparkConf
import org.apache.spark.config.CoreConfigKeys
import org.apache.spark.config._

private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])

Expand Down Expand Up @@ -85,9 +85,6 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
val yarnConf: Configuration,
val resource: Resource) {

// Number of CPUs per task
private val CPUS_PER_TASK = sparkConf.get(CoreConfigKeys.CPUS_PER_TASK)

/**
* Calculate each container's node locality and rack locality
* @param numContainer number of containers to calculate
Expand Down Expand Up @@ -160,7 +157,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
*/
private def numExecutorsPending(numTasksPending: Int): Int = {
val coresPerExecutor = resource.getVirtualCores
(numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor
(numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.log4j.{Level, Logger}

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
Expand Down Expand Up @@ -65,7 +66,6 @@ private[yarn] class YarnAllocator(
extends Logging {

import YarnAllocator._
import YarnConfigKeys._

// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Expand Down
Loading

0 comments on commit c1de25f

Please sign in to comment.