Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jul 28, 2014
2 parents 1d93395 + 81fcdd2 commit 692c0b9
Show file tree
Hide file tree
Showing 203 changed files with 6,353 additions and 1,042 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ metastore_db/
metastore/
warehouse/
TempStatsStore/
sql/hive-thriftserver/test_warehouses
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thriftserver</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.10</artifactId>
<properties>
<sbt.project.name>bagel</sbt.project.name>
<sbt.project.name>bagel</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
Expand Down
45 changes: 45 additions & 0 deletions bin/beeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi

# Compute classpath using external script
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi

CLASS="org.apache.hive.beeline.BeeLine"
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
1 change: 1 addition & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
fi

Expand Down
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
36 changes: 36 additions & 0 deletions bin/spark-sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Shell script for starting the Spark SQL CLI

# Enter posix mode for bash
set -o posix

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/spark-sql [options]"
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<properties>
<sbt.project.name>core</sbt.project.name>
<sbt.project.name>core</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Core</name>
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ case class Aggregator[K, V, C] (
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
combiners.insertAll(iter)
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
Expand Down
72 changes: 51 additions & 21 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

Expand All @@ -30,7 +30,7 @@ import org.apache.spark.storage._
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()
private val loading = new mutable.HashSet[RDDBlockId]

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
Expand Down Expand Up @@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}

/**
* Cache the values of a partition, keeping track of any updates in the storage statuses
* of other blocks along the way.
* Cache the values of a partition, keeping track of any updates in the storage statuses of
* other blocks along the way.
*
* The effective storage level refers to the level that actually specifies BlockManager put
* behavior, not the level originally specified by the user. This is mainly for forcing a
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
* while preserving the the original semantics of the RDD as specified by the application.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
storageLevel: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {

if (!storageLevel.useMemory) {
/* This RDD is not to be cached in memory, so we can just pass the computed values
* as an iterator directly to the BlockManager, rather than first fully unrolling
* it in memory. The latter option potentially uses much more memory and risks OOM
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
/*
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back, e.g.
* when the entirety of the RDD does not fit in memory. */
val elements = new ArrayBuffer[Any]
elements ++= values
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
* we may end up dropping a partition from memory store before getting it back.
*
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle

Expand Down Expand Up @@ -62,7 +63,8 @@ class ShuffleDependency[K, V, C](
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
val mapSideCombine: Boolean = false,
val sortOrder: Option[SortOrder] = None)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, long = "")
case Some(callSite) => CallSite(callSite, longForm = "")
case None => Utils.getCallSite
}
}
Expand All @@ -1059,11 +1059,12 @@ class SparkContext(config: SparkConf) extends Logging {
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.short)
logInfo("Starting job: " + callSite.shortForm)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo(
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}

Expand Down Expand Up @@ -1144,11 +1145,12 @@ class SparkContext(config: SparkConf) extends Logging {
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
val callSite = getCallSite
logInfo("Starting job: " + callSite.short)
logInfo("Starting job: " + callSite.shortForm)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo(
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class SparkEnv (
val metricsSystem: MetricsSystem,
val conf: SparkConf) extends Logging {

// A mapping of thread ID to amount of memory used for shuffle in bytes
// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{EmptyRDD, RDD}

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
Expand Down Expand Up @@ -112,6 +112,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork

def startTime: java.lang.Long = sc.startTime

/** The version of Spark on which this application is running. */
def version: String = sc.version

/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism

Expand All @@ -132,6 +135,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
}

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T]: JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
JavaRDD.fromRDD(new EmptyRDD[T](sc))
}


/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
parallelize(list, sc.defaultParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

private[spark] class PythonRDD[T: ClassTag](
parent: RDD[T],
private[spark] class PythonRDD(
parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand All @@ -57,7 +57,10 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
val worker: Socket = env.createPythonWorker(pythonExec,
envVars.toMap + ("SPARK_LOCAL_DIR" -> localdir))

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)
Expand Down
Loading

0 comments on commit 692c0b9

Please sign in to comment.