Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
mubarak committed Sep 6, 2014
2 parents ceb43da + ba5bcad commit c461cf4
Show file tree
Hide file tree
Showing 17 changed files with 177 additions and 61 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
data processing, MLLib for machine learning, GraphX for graph processing,
and Spark Streaming.
data processing, MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.

<http://spark.apache.org/>

Expand Down
2 changes: 2 additions & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS

# For pyspark tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
else
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
if (buf.size == 0) {
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
Expand Down
33 changes: 25 additions & 8 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,17 @@ def parse_args():
"(for debugging)")
parser.add_option(
"--ebs-vol-size", metavar="SIZE", type="int", default=0,
help="Attach a new EBS volume of size SIZE (in GB) to each node as " +
"/vol. The volumes will be deleted when the instances terminate. " +
"Only possible on EBS-backed AMIs.")
help="Size (in GB) of each EBS volume.")
parser.add_option(
"--ebs-vol-type", default="standard",
help="EBS volume type (e.g. 'gp2', 'standard').")
parser.add_option(
"--ebs-vol-num", type="int", default=1,
help="Number of EBS volumes to attach to each node as /vol[x]. " +
"The volumes will be deleted when the instances terminate. " +
"Only possible on EBS-backed AMIs. " +
"EBS volumes are only attached if --ebs-vol-size > 0." +
"Only support up to 8 EBS volumes.")
parser.add_option(
"--swap", metavar="SWAP", type="int", default=1024,
help="Swap space to set up per node, in MB (default: 1024)")
Expand Down Expand Up @@ -348,13 +356,16 @@ def launch_cluster(conn, opts, cluster_name):
print >> stderr, "Could not find AMI " + opts.ami
sys.exit(1)

# Create block device mapping so that we can add an EBS volume if asked to
# Create block device mapping so that we can add EBS volumes if asked to.
# The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
block_map = BlockDeviceMapping()
if opts.ebs_vol_size > 0:
device = EBSBlockDeviceType()
device.size = opts.ebs_vol_size
device.delete_on_termination = True
block_map["/dev/sdv"] = device
for i in range(opts.ebs_vol_num):
device = EBSBlockDeviceType()
device.size = opts.ebs_vol_size
device.volume_type=opts.ebs_vol_type
device.delete_on_termination = True
block_map["/dev/sd" + chr(ord('s') + i)] = device

# AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
if opts.instance_type.startswith('m3.'):
Expand Down Expand Up @@ -828,6 +839,12 @@ def get_partition(total, num_partitions, current_partitions):

def real_main():
(opts, action, cluster_name) = parse_args()

# Input parameter validation
if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)

try:
conn = ec2.connect_to_region(opts.region)
except Exception as e:
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,11 +1089,11 @@ def take(self, num):
# we actually cap it at totalParts in runJob.
numPartsToTry = 1
if partsScanned > 0:
# If we didn't find any rows after the first iteration, just
# try all partitions next. Otherwise, interpolate the number
# of partitions we need to try, but overestimate it by 50%.
# If we didn't find any rows after the previous iteration,
# quadruple and retry. Otherwise, interpolate the number of
# partitions we need to try, but overestimate it by 50%.
if len(items) == 0:
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
else:
numPartsToTry = int(1.5 * num * partsScanned / len(items))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.{SecurityManager, SparkConf, Logging}


class ExecutorRunnable(
Expand All @@ -46,7 +46,8 @@ class ExecutorRunnable(
slaveId: String,
hostname: String,
executorMemory: Int,
executorCores: Int)
executorCores: Int,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {

var rpc: YarnRPC = YarnRPC.create(conf)
Expand Down Expand Up @@ -86,6 +87,8 @@ class ExecutorRunnable(
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)

ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))

// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo

import org.apache.hadoop.conf.Configuration
Expand All @@ -41,21 +41,23 @@ private[yarn] class YarnAllocationHandler(
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {

private val lastResponseId = new AtomicInteger()
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()

override protected def allocateContainers(count: Int): YarnAllocateResponse = {
var resourceRequests: List[ResourceRequest] = null

// default.
if (count <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + count + ", host preferences: " +
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
logDebug("numExecutors: " + count)
if (count <= 0) {
resourceRequests = List()
} else if (preferredHostToCount.isEmpty) {
logDebug("host preferences is empty")
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils

Expand All @@ -45,15 +45,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String) = {
uiHistoryAddress: String,
securityMgr: SecurityManager) = {
this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress

resourceManager = registerWithResourceManager(conf)
registerApplicationMaster(uiAddress)

new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
preferredNodeLocations)
preferredNodeLocations, securityMgr)
}

override def getAttemptId() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val securityMgr = new SecurityManager(sparkConf)

if (isDriver) {
runDriver()
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.compareAndSet(sc, null)
}

private def registerAM(uiAddress: String) = {
private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()

val appId = client.getAttemptId().getApplicationId().toString()
Expand All @@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
historyAddress)
historyAddress,
securityMgr)

allocator.allocateResources()
reporterThread = launchReporterThread()
}

private def runDriver(): Unit = {
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
val userThread = startUserClass()

Expand All @@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
registerAM(sc.ui.appUIHostPort)
registerAM(sc.ui.appUIHostPort, securityMgr)
try {
userThread.join()
} finally {
Expand All @@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)

// In client mode the actor will stop the reporter thread.
reporterThread.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,8 @@ trait ClientBase extends Logging {

// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
val acls = Map[ApplicationAccessType, String] (
ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls,
ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls)
amContainer.setApplicationACLs(acls)
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))

amContainer
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse

import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

Expand All @@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator(
conf: Configuration,
sparkConf: SparkConf,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends Logging {

// These three are locked on allocatedHostToContainersMap. Complementary data structures
Expand Down Expand Up @@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator(
executorId,
executorHostname,
executorMemory,
executorCores)
executorCores,
securityMgr)
new Thread(executorRunnable).start()
}
}
Expand Down Expand Up @@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator(

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.{Map, Set}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.records._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler.SplitInfo

/**
Expand All @@ -45,7 +45,8 @@ trait YarnRMClient {
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String): YarnAllocator
uiHistoryAddress: String,
securityMgr: SecurityManager): YarnAllocator

/**
* Shuts down the AM. Guaranteed to only be called once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -211,4 +212,12 @@ object YarnSparkHadoopUtil {
}
}

private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
Map[ApplicationAccessType, String] = {
Map[ApplicationAccessType, String] (
ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
)
}

}
Loading

0 comments on commit c461cf4

Please sign in to comment.