Skip to content

Commit

Permalink
Merge pull request #15 from apache/master
Browse files Browse the repository at this point in the history
update
  • Loading branch information
YanTangZhai committed Dec 24, 2014
2 parents 718afeb + fd41eb9 commit e4c2c0a
Show file tree
Hide file tree
Showing 455 changed files with 9,746 additions and 4,443 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ checkpoint
derby.log
dist/
dev/create-release/*txt
dev/create-release/*new
dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
ec2/lib/
rat-results.txt
scalastyle.txt
scalastyle-output.xml
Expand Down
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ dist/*
logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
known_translations
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ THE SOFTWARE.

========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala:
========================================================================

Copyright (c) 2002-2013 EPFL
Expand Down
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@
</build>

<profiles>
<profile>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>yarn</id>
<dependencies>
Expand Down
12 changes: 8 additions & 4 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

. "$FWDIR"/bin/load-spark-env.sh

CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"
if [ -n "$SPARK_CLASSPATH" ]; then
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"
else
CLASSPATH="$SPARK_SUBMIT_CLASSPATH"
fi

# Build up classpath
if [ -n "$SPARK_CONF_DIR" ]; then
Expand Down Expand Up @@ -68,14 +72,14 @@ else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)"
num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
Expand Down Expand Up @@ -108,7 +112,7 @@ else
datanucleus_dir="$FWDIR"/lib_managed/jars
fi

datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")"
datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")"
datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)"

if [ -n "$datanucleus_jars" ]; then
Expand Down
7 changes: 7 additions & 0 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ source "$FWDIR"/bin/utils.sh
SUBMIT_USAGE_FUNCTION=usage
gatherSparkSubmitOpts "$@"

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag mnually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# through spark.driver.extraClassPath is not automatically propagated.
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
Expand Down
21 changes: 20 additions & 1 deletion bin/spark-shell2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,23 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %* spark-shell
echo "%*" | findstr " --help -h" >nul
if %ERRORLEVEL% equ 0 (
call :usage
exit /b 0
)

call %SPARK_HOME%\bin\windows-utils.cmd %*
if %ERRORLEVEL% equ 1 (
call :usage
exit /b 1
)

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %SUBMISSION_OPTS% spark-shell %APPLICATION_OPTS%

exit /b 0

:usage
echo "Usage: .\bin\spark-shell.cmd [options]" >&2
%SPARK_HOME%\bin\spark-submit --help 2>&1 | findstr /V "Usage" 1>&2
exit /b 0
59 changes: 59 additions & 0 deletions bin/windows-utils.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

rem Gather all spark-submit options into SUBMISSION_OPTS

set SUBMISSION_OPTS=
set APPLICATION_OPTS=

rem NOTE: If you add or remove spark-sumbmit options,
rem modify NOT ONLY this script but also SparkSubmitArgument.scala

:OptsLoop
if "x%1"=="x" (
goto :OptsLoopEnd
)

SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--py-files\> \<--files\>"
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"

echo %1 | findstr %opts% >nul
if %ERRORLEVEL% equ 0 (
if "x%2"=="x" (
echo "%1" requires an argument. >&2
exit /b 1
)
set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %2
shift
shift
goto :OptsLoop
)
echo %1 | findstr "\<--verbose\> \<-v\> \<--supervise\>" >nul
if %ERRORLEVEL% equ 0 (
set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1
shift
goto :OptsLoop
)
set APPLICATION_OPTS=%APPLICATION_OPTS% %1
shift
goto :OptsLoop

:OptsLoopEnd
exit /b 0
4 changes: 2 additions & 2 deletions conf/metrics.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
# sample false Whether to show entire set of samples for histograms ('false' or 'true')
#
# * Default path is /metrics/json for all instances except the master. The master has two paths:
# /metrics/aplications/json # App information
# /metrics/master/json # Master information
# /metrics/applications/json # App information
# /metrics/master/json # Master information

# org.apache.spark.metrics.sink.GraphiteSink
# Name: Default: Description:
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkJobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Jobs.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkJobInfo {
public interface SparkJobInfo extends Serializable {
int jobId();
int[] stageIds();
JobExecutionStatus status();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Stages.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkStageInfo {
public interface SparkStageInfo extends Serializable {
int stageId();
int currentAttemptId();
long submissionTime();
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,18 @@ span.additional-metric-title {
display: inline-block;
}

.version {
line-height: 30px;
vertical-align: bottom;
font-size: 12px;
padding: 0;
margin: 0;
font-weight: bold;
color: #777;
}

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .gc_time, .deserialization_time, .serialization_time, .getting_result_time {
.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {
display: none;
}
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong
import java.lang.ThreadLocal

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -278,10 +279,12 @@ object AccumulatorParam {

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
private[spark] object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0

def newId(): Long = synchronized {
Expand All @@ -293,22 +296,21 @@ private object Accumulators {
if (original) {
originals(a.id) = a
} else {
val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
accums(a.id) = a
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.remove(Thread.currentThread)
localAccums.get.clear
}
}

// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
}
return ret
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
// When spilling is enabled sorting will happen externally, but not necessarily with an
// ExternalSorter.
private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down Expand Up @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] (
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
: Iterator[(K, C)] =
{
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* A client that communicates with the cluster manager to request or kill executors.
*/
private[spark] trait ExecutorAllocationClient {

/**
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged by the cluster manager.
*/
def requestExecutors(numAdditionalExecutors: Int): Boolean

/**
* Request that the cluster manager kill the specified executors.
* Return whether the request is acknowledged by the cluster manager.
*/
def killExecutors(executorIds: Seq[String]): Boolean

/**
* Request that the cluster manager kill the specified executor.
* Return whether the request is acknowledged by the cluster manager.
*/
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
}
Loading

0 comments on commit e4c2c0a

Please sign in to comment.