Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
gweidner committed May 9, 2015
2 parents cafd104 + 84bf931 commit 1ac10e5
Show file tree
Hide file tree
Showing 38 changed files with 532 additions and 103 deletions.
Expand Up @@ -48,7 +48,9 @@ class LocalSparkCluster(
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")

// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false")
val _conf = conf.clone()
.setIfMissing("spark.master.rest.enabled", "false")
.set("spark.shuffle.service.enabled", "false")

/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
Expand Down
Expand Up @@ -59,12 +59,22 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
}
transportContext = new TransportContext(transportConf, rpcHandler)
clientFactory = transportContext.createClientFactory(clientBootstrap.toList)
server = transportContext.createServer(conf.getInt("spark.blockManager.port", 0),
serverBootstrap.toList)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
logInfo("Server created on " + server.getPort)
}

/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
val server = transportContext.createServer(port, bootstraps)
(server, server.getPort)
}

val portToTry = conf.getInt("spark.blockManager.port", 0)
Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
}

override def fetchBlocks(
host: String,
port: Int,
Expand Down
30 changes: 20 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -717,7 +717,8 @@ abstract class RDD[T: ClassTag](
def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
val cleanF = sc.clean(f)
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}

Expand All @@ -741,9 +742,11 @@ abstract class RDD[T: ClassTag](
def mapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
val cleanA = sc.clean(constructA)
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.map(t => f(t, a))
val a = cleanA(index)
iter.map(t => cleanF(t, a))
}, preservesPartitioning)
}

Expand All @@ -756,9 +759,11 @@ abstract class RDD[T: ClassTag](
def flatMapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => Seq[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
val cleanA = sc.clean(constructA)
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.flatMap(t => f(t, a))
val a = cleanA(index)
iter.flatMap(t => cleanF(t, a))
}, preservesPartitioning)
}

Expand All @@ -769,9 +774,11 @@ abstract class RDD[T: ClassTag](
*/
@deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")
def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope {
val cleanF = sc.clean(f)
val cleanA = sc.clean(constructA)
mapPartitionsWithIndex { (index, iter) =>
val a = constructA(index)
iter.map(t => {f(t, a); t})
val a = cleanA(index)
iter.map(t => {cleanF(t, a); t})
}
}

Expand All @@ -782,9 +789,11 @@ abstract class RDD[T: ClassTag](
*/
@deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope {
val cleanP = sc.clean(p)
val cleanA = sc.clean(constructA)
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.filter(t => p(t, a))
val a = cleanA(index)
iter.filter(t => cleanP(t, a))
}, preservesPartitioning = true)
}

Expand Down Expand Up @@ -901,7 +910,8 @@ abstract class RDD[T: ClassTag](
* Return an RDD that contains all matching values by applying `f`.
*/
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
filter(f.isDefinedAt).map(f)
val cleanF = sc.clean(f)
filter(cleanF.isDefinedAt).map(cleanF)
}

/**
Expand Down
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Expand Up @@ -102,25 +102,33 @@ private[spark] object RDDOperationScope {
/**
* Execute the given body such that all RDDs created in this body will have the same scope.
*
* If nesting is allowed, this concatenates the previous scope with the new one in a way that
* signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
* this method executed in the body will have no effect.
* If nesting is allowed, any subsequent calls to this method in the given body will instantiate
* child scopes that are nested within our scope. Otherwise, these calls will take no effect.
*
* Additionally, the caller of this method may optionally ignore the configurations and scopes
* set by the higher level caller. In this case, this method will ignore the parent caller's
* intention to disallow nesting, and the new scope instantiated will not have a parent. This
* is useful for scoping physical operations in Spark SQL, for instance.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean)(body: => T): T = {
allowNesting: Boolean,
ignoreParent: Boolean = false)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
// Set the scope only if the higher level caller allows us to do so
if (sc.getLocalProperty(noOverrideKey) == null) {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
Expand Down
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import org.apache.spark.network.BlockDataManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.mockito.Mockito.mock
import org.scalatest._

class NettyBlockTransferServiceSuite extends FunSuite with BeforeAndAfterEach with ShouldMatchers {
private var service0: NettyBlockTransferService = _
private var service1: NettyBlockTransferService = _

override def afterEach() {
if (service0 != null) {
service0.close()
service0 = null
}

if (service1 != null) {
service1.close()
service1 = null
}
}

test("can bind to a random port") {
service0 = createService(port = 0)
service0.port should not be 0
}

test("can bind to two random ports") {
service0 = createService(port = 0)
service1 = createService(port = 0)
service0.port should not be service1.port
}

test("can bind to a specific port") {
val port = 17634
service0 = createService(port)
service0.port should be >= port
service0.port should be <= (port + 10) // avoid testing equality in case of simultaneous tests
}

test("can bind to a specific port twice and the second increments") {
val port = 17634
service0 = createService(port)
service1 = createService(port)
service0.port should be >= port
service0.port should be <= (port + 10)
service1.port should be (service0.port + 1)
}

private def createService(port: Int): NettyBlockTransferService = {
val conf = new SparkConf()
.set("spark.app.id", s"test-${getClass.getName}")
.set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
val service = new NettyBlockTransferService(conf, securityManager, numCores = 1)
service.init(blockDataManager)
service
}
}
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util

import java.io.NotSerializableException
import java.util.Random

import org.scalatest.FunSuite

Expand Down Expand Up @@ -92,6 +93,11 @@ class ClosureCleanerSuite extends FunSuite {
expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions4(rdd) }
Expand Down Expand Up @@ -260,6 +266,21 @@ private object TestUserClosuresActuallyCleaned {
def testMapPartitionsWithIndex(rdd: RDD[Int]): Unit = {
rdd.mapPartitionsWithIndex { (_, it) => return; it }.count()
}
def testFlatMapWith(rdd: RDD[Int]): Unit = {
rdd.flatMapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; Seq() }.count()
}
def testMapWith(rdd: RDD[Int]): Unit = {
rdd.mapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; 0 }.count()
}
def testFilterWith(rdd: RDD[Int]): Unit = {
rdd.filterWith ((index: Int) => new Random(index + 42)){ (_, it) => return; true }.count()
}
def testForEachWith(rdd: RDD[Int]): Unit = {
rdd.foreachWith ((index: Int) => new Random(index + 42)){ (_, it) => return }
}
def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = {
rdd.mapPartitionsWithContext { (_, it) => return; it }.count()
}
def testZipPartitions2(rdd: RDD[Int]): Unit = {
rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count()
}
Expand Down
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.util.random.XORShiftRandom
* Common params for ALS.
*/
private[recommendation] trait ALSParams extends Params with HasMaxIter with HasRegParam
with HasPredictionCol with HasCheckpointInterval {
with HasPredictionCol with HasCheckpointInterval with HasSeed {

/**
* Param for rank of the matrix factorization (>= 1).
Expand Down Expand Up @@ -147,7 +147,7 @@ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasR

setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10,
implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item",
ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10)
ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10, seed -> 0L)

/**
* Validates and transforms the input schema.
Expand Down Expand Up @@ -278,6 +278,9 @@ class ALS extends Estimator[ALSModel] with ALSParams {
/** @group setParam */
def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)

/** @group setParam */
def setSeed(value: Long): this.type = set(seed, value)

/**
* Sets both numUserBlocks and numItemBlocks to the specific value.
* @group setParam
Expand All @@ -290,15 +293,16 @@ class ALS extends Estimator[ALSModel] with ALSParams {

override def fit(dataset: DataFrame): ALSModel = {
val ratings = dataset
.select(col($(userCol)), col($(itemCol)), col($(ratingCol)).cast(FloatType))
.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType),
col($(ratingCol)).cast(FloatType))
.map { row =>
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}
val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs),
alpha = $(alpha), nonnegative = $(nonnegative),
checkpointInterval = $(checkpointInterval))
checkpointInterval = $(checkpointInterval), seed = $(seed))
copyValues(new ALSModel(this, $(rank), userFactors, itemFactors))
}

Expand Down
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import org.apache.spark.network.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,7 +66,12 @@ public TransportServer(
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

init(portToBind);
try {
init(portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}

public int getPort() {
Expand Down Expand Up @@ -114,7 +120,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
});

bindRightPort(portToBind);
channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
channelFuture.syncUninterruptibly();

port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port :" + port);
Expand All @@ -135,38 +142,4 @@ public void close() {
}
bootstrap = null;
}

/**
* Attempt to bind to the specified port up to a fixed number of retries.
* If all attempts fail after the max number of retries, exit.
*/
private void bindRightPort(int portToBind) {
int maxPortRetries = conf.portMaxRetries();

for (int i = 0; i <= maxPortRetries; i++) {
int tryPort = -1;
if (0 == portToBind) {
// Do not increment port if tryPort is 0, which is treated as a special port
tryPort = 0;
} else {
// If the new port wraps around, do not try a privilege port
tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
}
try {
channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
channelFuture.syncUninterruptibly();
return;
} catch (Exception e) {
logger.warn("Netty service could not bind on port " + tryPort +
". Attempting the next port.");
if (i >= maxPortRetries) {
logger.error(e.getMessage() + ": Netty server failed after "
+ maxPortRetries + " retries.");

// If it can't find a right port, it should exit directly.
System.exit(-1);
}
}
}
}
}
2 changes: 2 additions & 0 deletions python/pyspark/ml/param/_shared_params_code_gen.py
Expand Up @@ -97,6 +97,8 @@ def get$Name(self):
("inputCol", "input column name", None),
("inputCols", "input column names", None),
("outputCol", "output column name", None),
("numFeatures", "number of features", None),
("checkpointInterval", "checkpoint interval (>= 1)", None),
("seed", "random seed", None),
("tol", "the convergence tolerance for iterative algorithms", None),
("stepSize", "Step size to be used for each iteration of optimization.", None)]
Expand Down

0 comments on commit 1ac10e5

Please sign in to comment.