Skip to content

Commit

Permalink
Merge github.com:apache/spark
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
  • Loading branch information
andrewor14 committed Apr 4, 2014
2 parents 050419e + f1fa617 commit e2f4ff9
Show file tree
Hide file tree
Showing 266 changed files with 7,368 additions and 1,626 deletions.
3 changes: 3 additions & 0 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ if [ -z "$SPARK_ENV_LOADED" ]; then
use_conf_dir=${SPARK_CONF_DIR:-"$parent_dir/conf"}

if [ -f "${use_conf_dir}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${use_conf_dir}/spark-env.sh"
set +a
fi
fi
3 changes: 2 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

if [[ "$IPYTHON" = "1" ]] ; then
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON" "$@"
Expand Down
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ function set_spark_log_conf(){

function set_spark_master(){
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
MASTER="$1"
export MASTER="$1"
else
out_error "wrong format for $2"
fi
Expand All @@ -145,7 +145,7 @@ function resolve_spark_master(){
fi

if [ -z "$MASTER" ]; then
MASTER="$DEFAULT_MASTER"
export MASTER="$DEFAULT_MASTER"
fi

}
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.6</version>
<!-- see also exclusion for lift-json; this is necessary since it depends on
scala-library and scalap 2.10.0, but we use 2.10.3, and only override
scala-library and scalap 2.10.0, but we use 2.10.4, and only override
scala-library -->
<exclusions>
<exclusion>
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -369,6 +370,39 @@ class SparkContext(
minSplits).map(pair => pair._2.toString)
}

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
*/
def wholeTextFiles(path: String): RDD[(String, String)] = {
newAPIHadoopFile(
path,
classOf[WholeTextFileInputFormat],
classOf[String],
classOf[String])
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,34 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `JavaPairRDD<String, String> rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit

/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
* the value is the entire content of file.
*/

private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {

new CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import com.google.common.io.{ByteStreams, Closeables}

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
*/
private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, String] {

private val path = split.getPath(index)
private val fs = path.getFileSystem(context.getConfiguration)

// True means the current file has been processed, then skip it.
private var processed = false

private val key = path.toString
private var value: String = null

override def initialize(split: InputSplit, context: TaskAttemptContext) = {}

override def close() = {}

override def getProgress = if (processed) 1.0f else 0.0f

override def getCurrentKey = key

override def getCurrentValue = value

override def nextKeyValue = {
if (!processed) {
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)

value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)

processed = true
true
} else {
false
}
}
}
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DAGScheduler(
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]

Expand Down Expand Up @@ -536,7 +536,7 @@ class DAGScheduler(
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
stageIdToActiveJob(jobId) = job
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
Expand All @@ -559,7 +559,7 @@ class DAGScheduler(
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...

case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
Expand All @@ -569,7 +569,6 @@ class DAGScheduler(

case BeginEvent(task, taskInfo) =>
for (
job <- stageIdToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
Expand Down Expand Up @@ -697,7 +696,7 @@ class DAGScheduler(
private def activeJobForStage(stage: Stage): Option[Int] = {
if (stageIdToJobIds.contains(stage.id)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
jobsThatUseStage.find(stageIdToActiveJob.contains)
jobsThatUseStage.find(jobIdToActiveJob.contains)
} else {
None
}
Expand Down Expand Up @@ -750,8 +749,8 @@ class DAGScheduler(
}
}

val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
Expand Down Expand Up @@ -827,7 +826,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
stageIdToActiveJob -= stage.jobId
jobIdToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
Expand Down Expand Up @@ -986,11 +985,11 @@ class DAGScheduler(
val independentStages = removeJobAndIndependentStages(jobId)
independentStages.foreach(taskScheduler.cancelTasks)
val error = new SparkException("Job %d cancelled".format(jobId))
val job = stageIdToActiveJob(jobId)
val job = jobIdToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
stageIdToActiveJob -= jobId
jobIdToActiveJob -= jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
}
}
Expand All @@ -1011,7 +1010,7 @@ class DAGScheduler(
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
jobIdToStageIdsRemove(job.jobId)
stageIdToActiveJob -= resultStage.jobId
jobIdToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.spark.ui

import java.net.{InetSocketAddress, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
import scala.xml.Node

import org.eclipse.jetty.server.{DispatcherType, Server}
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
/** If stages is too large, remove and garbage collect old stages */
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = retainedStages / 10
stages.takeRight(toRemove).foreach( s => {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTaskData.remove(s.stageId)
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
Expand All @@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToTasksFailed.remove(s.stageId)
stageIdToPool.remove(s.stageId)
if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
})
stages.trimEnd(toRemove)
}
stages.trimStart(toRemove)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListe

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
val rddInfo = stageSubmitted.stageInfo.rddInfo
_rddInfoMap(rddInfo.id) = rddInfo
_rddInfoMap.getOrElseUpdate(rddInfo.id, rddInfo)
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
Expand Down
Loading

0 comments on commit e2f4ff9

Please sign in to comment.