Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into fileStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Mar 11, 2016
2 parents 65596df + 073bf9d commit c1cf71e
Show file tree
Hide file tree
Showing 77 changed files with 1,209 additions and 891 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,9 @@ private[spark] object SparkConf extends Logging {
"spark.memory.offHeap.enabled" -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6"))
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0"))
)

/**
Expand Down Expand Up @@ -718,7 +720,7 @@ private[spark] object SparkConf extends Logging {
allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
s"may be removed in the future. Please use the new key '$newKey' instead.")
return
}
if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ object SparkSubmit {
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.jar",
"spark.yarn.dist.files",
"spark.yarn.dist.archives")
pathConfigs.foreach { config =>
Expand Down
47 changes: 35 additions & 12 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ private[spark] class BlockManager(
private val compressRdds = conf.getBoolean("spark.rdd.compress", false)
// Whether to compress shuffle output temporarily spilled to disk
private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
// Max number of failures before this block manager refreshes the block locations from the driver
private val maxFailuresBeforeLocationRefresh =
conf.getInt("spark.block.failures.beforeLocationRefresh", 5)

private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
Expand Down Expand Up @@ -568,26 +571,46 @@ private[spark] class BlockManager(
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
val locations = getLocations(blockId)
var numFetchFailures = 0
for (loc <- locations) {
val maxFetchFailures = locations.size
var locationIterator = locations.iterator
while (locationIterator.hasNext) {
val loc = locationIterator.next()
logDebug(s"Getting remote block $blockId from $loc")
val data = try {
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
} catch {
case NonFatal(e) =>
numFetchFailures += 1
if (numFetchFailures == locations.size) {
// An exception is thrown while fetching this block from all locations
throw new BlockFetchException(s"Failed to fetch block from" +
s" ${locations.size} locations. Most recent failure cause:", e)
} else {
// This location failed, so we retry fetch from a different one by returning null here
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $numFetchFailures)", e)
null
runningFailureCount += 1
totalFailureCount += 1

if (totalFailureCount >= maxFetchFailures) {
// Give up trying anymore locations. Either we've tried all of the original locations,
// or we've refreshed the list of locations from the master, and have still
// hit failures after trying locations from the refreshed list.
throw new BlockFetchException(s"Failed to fetch block after" +
s" ${totalFailureCount} fetch failures. Most recent failure cause:", e)
}

logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $runningFailureCount)", e)

// If there is a large number of executors then locations list can contain a
// large number of stale entries causing a large number of retries that may
// take a significant amount of time. To get rid of these stale entries
// we refresh the block locations after a certain number of fetch failures
if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
locationIterator = getLocations(blockId).iterator
logDebug(s"Refreshed locations from the driver " +
s"after ${runningFailureCount} fetch failures.")
runningFailureCount = 0
}

// This location failed, so we retry fetch from a different one by returning null here
null
}

if (data != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@ import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.language.implicitConversions
import scala.language.postfixOps

import org.mockito.{Matchers => mc}
import org.mockito.Mockito.{mock, when}
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._

import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
Expand Down Expand Up @@ -66,9 +70,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
private def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER,
master: BlockManagerMaster = this.master): BlockManager = {
master: BlockManagerMaster = this.master,
transferService: Option[BlockTransferService] = Option.empty): BlockManager = {
val serializer = new KryoSerializer(conf)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val transfer = transferService
.getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
Expand Down Expand Up @@ -1287,6 +1293,78 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
}

test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") {
val mockBlockTransferService =
new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5))
store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true)
intercept[BlockFetchException] {
store.getRemoteBytes("item")
}
}

test("SPARK-13328: refresh block locations (fetch should succeed after location refresh)") {
val maxFailuresBeforeLocationRefresh =
conf.getInt("spark.block.failures.beforeLocationRefresh", 5)
val mockBlockManagerMaster = mock(classOf[BlockManagerMaster])
val mockBlockTransferService =
new MockBlockTransferService(maxFailuresBeforeLocationRefresh)
// make sure we have more than maxFailuresBeforeLocationRefresh locations
// so that we have a chance to do location refresh
val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh)
.map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) }
when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockManagerIds)
store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
transferService = Option(mockBlockTransferService))
val block = store.getRemoteBytes("item")
.asInstanceOf[Option[ByteBuffer]]
assert(block.isDefined)
verify(mockBlockManagerMaster, times(2)).getLocations("item")
}

class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0

override def init(blockDataManager: BlockDataManager): Unit = {}

override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener): Unit = {
listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1)))
}

override def close(): Unit = {}

override def hostName: String = { "MockBlockTransferServiceHost" }

override def port: Int = { 63332 }

override def uploadBlock(
hostname: String,
port: Int, execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel): Future[Unit] = {
import scala.concurrent.ExecutionContext.Implicits.global
Future {}
}

override def fetchBlockSync(
host: String,
port: Int,
execId: String,
blockId: String): ManagedBuffer = {
numCalls += 1
if (numCalls <= maxFailures) {
throw new RuntimeException("Failing block fetch in the mock block transfer service")
}
super.fetchBlockSync(host, port, execId, blockId)
}
}
}

private object BlockManagerSuite {
Expand Down
23 changes: 10 additions & 13 deletions dev/mima
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,21 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"

echo -e "q\n" | build/sbt oldDeps/update
TOOLS_CLASSPATH="$(build/sbt "export tools/fullClasspath" | tail -n1)"

rm -f .generated-mima*

generate_mima_ignore() {
SPARK_JAVA_OPTS="-XX:MaxPermSize=1g -Xmx2g" \
./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
java \
-XX:MaxPermSize=1g \
-Xmx2g \
-cp "$TOOLS_CLASSPATH:$1" \
org.apache.spark.tools.GenerateMIMAIgnore
}

# Generate Mima Ignore is called twice, first with latest built jars
# on the classpath and then again with previous version jars on the classpath.
# Because of a bug in GenerateMIMAIgnore that when old jars are ahead on classpath
# it did not process the new classes (which are in assembly jar).
generate_mima_ignore

export SPARK_CLASSPATH="$(build/sbt "export oldDeps/fullClasspath" | tail -n1)"
echo "SPARK_CLASSPATH=$SPARK_CLASSPATH"

generate_mima_ignore
SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export assembly/fullClasspath" | tail -n1)"
generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"

echo -e "q\n" | build/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
ret_val=$?
Expand Down
23 changes: 17 additions & 6 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ def build_spark_sbt(hadoop_version):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["package",
"assembly/assembly",
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly",
"streaming-mqtt-assembly/assembly",
Expand All @@ -350,6 +349,16 @@ def build_spark_sbt(hadoop_version):
exec_sbt(profiles_and_goals)


def build_spark_assembly_sbt(hadoop_version):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals
print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these arguments: ",
" ".join(profiles_and_goals))
exec_sbt(profiles_and_goals)


def build_apache_spark(build_tool, hadoop_version):
"""Will build Spark against Hive v1.2.1 given the passed in build tool (either `sbt` or
`maven`). Defaults to using `sbt`."""
Expand Down Expand Up @@ -561,11 +570,13 @@ def main():
# spark build
build_apache_spark(build_tool, hadoop_version)

# TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
# # backwards compatibility checks
# if build_tool == "sbt":
# # Note: compatiblity tests only supported in sbt for now
# detect_binary_inop_with_mima()
# backwards compatibility checks
if build_tool == "sbt":
# Note: compatibility tests only supported in sbt for now
detect_binary_inop_with_mima()
# Since we did not build assembly/assembly before running dev/mima, we need to
# do it here because the tests still rely on it; see SPARK-13294 for details.
build_spark_assembly_sbt(hadoop_version)

# run the test suites
run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)
Expand Down
32 changes: 32 additions & 0 deletions docs/ml-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,38 @@ for more details on the API.
</div>
</div>


## MaxAbsScaler

`MaxAbsScaler` transforms a dataset of `Vector` rows, rescaling each feature to range [-1, 1]
by dividing through the maximum absolute value in each feature. It does not shift/center the
data, and thus does not destroy any sparsity.

`MaxAbsScaler` computes summary statistics on a data set and produces a `MaxAbsScalerModel`. The
model can then transform each feature individually to range [-1, 1].

The following example demonstrates how to load a dataset in libsvm format and then rescale each feature to [-1, 1].

<div class="codetabs">
<div data-lang="scala" markdown="1">

Refer to the [MaxAbsScaler Scala docs](api/scala/index.html#org.apache.spark.ml.feature.MaxAbsScaler)
and the [MaxAbsScalerModel Scala docs](api/scala/index.html#org.apache.spark.ml.feature.MaxAbsScalerModel)
for more details on the API.

{% include_example scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala %}
</div>

<div data-lang="java" markdown="1">

Refer to the [MaxAbsScaler Java docs](api/java/org/apache/spark/ml/feature/MaxAbsScaler.html)
and the [MaxAbsScalerModel Java docs](api/java/org/apache/spark/ml/feature/MaxAbsScalerModel.html)
for more details on the API.

{% include_example java/org/apache/spark/examples/ml/JavaMaxAbsScalerExample.java %}
</div>
</div>

## Bucketizer

`Bucketizer` transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users. It takes a parameter:
Expand Down
6 changes: 6 additions & 0 deletions docs/mllib-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ Refer to the [`BisectingKMeans` Java docs](api/java/org/apache/spark/mllib/clust

{% include_example java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java %}
</div>

<div data-lang="python" markdown="1">
Refer to the [`BisectingKMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.BisectingKMeans) and [`BisectingKMeansModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.BisectingKMeansModel) for more details on the API.

{% include_example python/mllib/bisecting_k_means_example.py %}
</div>
</div>

## Streaming k-means
Expand Down
25 changes: 18 additions & 7 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,25 @@ If you need a reference to the proper location to put log files in the YARN so t
</td>
</tr>
<tr>
<td><code>spark.yarn.jar</code></td>
<td><code>spark.yarn.jars</code></td>
<td>(none)</td>
<td>
The location of the Spark jar file, in case overriding the default location is desired.
By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be
List of libraries containing Spark code to distribute to YARN containers.
By default, Spark on YARN will use Spark jars installed locally, but the Spark jars can also be
in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't
need to be distributed each time an application runs. To point to a jar on HDFS, for example,
set this configuration to <code>hdfs:///some/path</code>.
need to be distributed each time an application runs. To point to jars on HDFS, for example,
set this configuration to <code>hdfs:///some/path</code>. Globs are allowed.
</td>
</tr>
<tr>
<td><code>spark.yarn.archive</code></td>
<td>(none)</td>
<td>
An archive containing needed Spark jars for distribution to the YARN cache. If set, this
configuration replaces <code>spark.yarn.jars</code> and the archive is used in all the
application's containers. The archive should contain jar files in its root directory.
Like with the previous option, the archive can also be hosted on HDFS to speed up file
distribution.
</td>
</tr>
<tr>
Expand All @@ -288,8 +299,8 @@ If you need a reference to the proper location to put log files in the YARN so t
<td>
A comma-separated list of secure HDFS namenodes your Spark application is going to access. For
example, <code>spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
webhdfs://nn3.com:50070</code>. The Spark application must have access to the namenodes listed
and Kerberos must be properly configured to be able to access them (either in the same realm
webhdfs://nn3.com:50070</code>. The Spark application must have access to the namenodes listed
and Kerberos must be properly configured to be able to access them (either in the same realm
or in a trusted realm). Spark acquires security tokens for each of the namenodes so that
the Spark application can access those remote HDFS clusters.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.mllib.linalg.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down
Loading

0 comments on commit c1cf71e

Please sign in to comment.