Skip to content

Commit

Permalink
Merge pull request #3 from apache/master
Browse files Browse the repository at this point in the history
merge lastest spark
  • Loading branch information
pzzs committed Mar 10, 2015
2 parents cb1852d + 8767565 commit c87e8b6
Show file tree
Hide file tree
Showing 80 changed files with 763 additions and 332 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
8 changes: 7 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Expand Down Expand Up @@ -319,6 +319,12 @@
<artifactId>selenium-java</artifactId>
<scope>test</scope>
</dependency>
<!-- Added for selenium: -->
<dependency>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down
76 changes: 59 additions & 17 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@ package org.apache.spark.api.python

import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, UUID, Collections}

import org.apache.spark.input.PortableDataStream
import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap}

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials

import com.google.common.base.Charsets.UTF_8

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}

import org.apache.spark._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

import scala.util.control.NonFatal

private[spark] class PythonRDD(
@transient parent: RDD[_],
command: Array[Byte],
Expand Down Expand Up @@ -341,21 +342,33 @@ private[spark] object PythonRDD extends Logging {
/**
* Adapter for calling SparkContext#runJob from Python.
*
* This method will return an iterator of an array that contains all elements in the RDD
* This method will serve an iterator of an array that contains all elements in the RDD
* (effectively a collect()), but allows you to run on a certain subset of partitions,
* or to enable local execution.
*
* @return the port number of a local socket which serves the data collected from this job.
*/
def runJob(
sc: SparkContext,
rdd: JavaRDD[Array[Byte]],
partitions: JArrayList[Int],
allowLocal: Boolean): Iterator[Array[Byte]] = {
allowLocal: Boolean): Int = {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
flattenedPartition.iterator
serveIterator(flattenedPartition.iterator,
s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
}

/**
* A helper function to collect an RDD as an iterator, then serve it via socket.
*
* @return the port number of a local socket which serves the data collected from this job.
*/
def collectAndServe[T](rdd: RDD[T]): Int = {
serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
}

def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
Expand Down Expand Up @@ -575,15 +588,44 @@ private[spark] object PythonRDD extends Logging {
dataOut.write(bytes)
}

def writeToFile[T](items: java.util.Iterator[T], filename: String) {
import scala.collection.JavaConverters._
writeToFile(items.asScala, filename)
}
/**
* Create a socket server and a background thread to serve the data in `items`,
*
* The socket server can only accept one connection, or close if no connection
* in 3 seconds.
*
* Once a connection comes in, it tries to serialize all the data in `items`
* and send them into this connection.
*
* The thread will terminate after all the data are sent or any exceptions happen.
*/
private def serveIterator[T](items: Iterator[T], threadName: String): Int = {
val serverSocket = new ServerSocket(0, 1)
serverSocket.setReuseAddress(true)
// Close the socket if no connection in 3 seconds
serverSocket.setSoTimeout(3000)

new Thread(threadName) {
setDaemon(true)
override def run() {
try {
val sock = serverSocket.accept()
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
try {
writeIteratorToStream(items, out)
} finally {
out.close()
}
} catch {
case NonFatal(e) =>
logError(s"Error while sending iterator", e)
} finally {
serverSocket.close()
}
}
}.start()

def writeToFile[T](items: Iterator[T], filename: String) {
val file = new DataOutputStream(new FileOutputStream(filename))
writeIteratorToStream(items, file)
file.close()
serverSocket.getLocalPort
}

private def getMergedConf(confAsMap: java.util.HashMap[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LocalSparkCluster(
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum))
memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class Master(
val webUi = new MasterWebUI(this, webUiPort)

val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val webUiPort: Int,
val publicAddress: String,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
Expand Down Expand Up @@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[spark] class Worker(
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)

val publicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
var webUi: WorkerWebUI = null
Expand Down Expand Up @@ -362,7 +362,8 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl,
Expand Down Expand Up @@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer

import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor.{Actor, ActorRef, Props}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,69 @@

package org.apache.spark.deploy

import java.net.URL

import scala.collection.mutable
import scala.io.Source

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.FunSuite

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}

class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
private val WAIT_TIMEOUT_MILLIS = 10000

before {
test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,512]", "test")

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
val html = Source.fromURL(logUrl).mkString
assert(html.contains(s"$logType log page"))
}
}
}

test("verify log urls get propagated from workers") {
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
val SPARK_PUBLIC_DNS = "public_dns"
class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(getAll)
}
}
val conf = new MySparkConf()
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { logUrl =>
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,11 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}

test("fetch hcfs dir") {
val sourceDir = Utils.createTempDir()
val tempDir = Utils.createTempDir()
val sourceDir = new File(tempDir, "source-dir")
val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
val targetDir = new File(Utils.createTempDir(), "target-dir")
val targetDir = new File(tempDir, "target-dir")
Files.write("some text", sourceFile, UTF_8)

val path = new Path("file://" + sourceDir.getAbsolutePath)
Expand All @@ -413,7 +414,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(destInnerFile.isFile())

val filePath = new Path("file://" + sourceFile.getAbsolutePath)
val testFileDir = new File("test-filename")
val testFileDir = new File(tempDir, "test-filename")
val testFileName = "testFName"
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
Expand Down
2 changes: 1 addition & 1 deletion docs/_layouts/global.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
<li><a href="programming-guide.html">Spark Programming Guide</a></li>
<li class="divider"></li>
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
<li><a href="sql-programming-guide.html">Spark SQL</a></li>
<li><a href="sql-programming-guide.html">DataFrames and SQL</a></li>
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ options for deployment:
in all supported languages (Scala, Java, Python)
* Modules built on Spark:
* [Spark Streaming](streaming-programming-guide.html): processing real-time data streams
* [Spark SQL](sql-programming-guide.html): support for structured data and relational queries
* [Spark SQL and DataFrames](sql-programming-guide.html): support for structured data and relational queries
* [MLlib](mllib-guide.html): built-in machine learning library
* [GraphX](graphx-programming-guide.html): Spark's new API for graph processing
* [Bagel (Pregel on Spark)](bagel-programming-guide.html): older, simple graph processing model
Expand Down
Loading

0 comments on commit c87e8b6

Please sign in to comment.