Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into newCodeGen
Browse files Browse the repository at this point in the history
Conflicts:
	project/SparkBuild.scala
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
  • Loading branch information
marmbrus committed Jul 29, 2014
2 parents 41a40c9 + dc96536 commit 4bdc42c
Show file tree
Hide file tree
Showing 111 changed files with 4,769 additions and 486 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ metastore_db/
metastore/
warehouse/
TempStatsStore/
sql/hive-thriftserver/test_warehouses
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thriftserver</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.10</artifactId>
<properties>
<sbt.project.name>bagel</sbt.project.name>
<sbt.project.name>bagel</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
Expand Down
45 changes: 45 additions & 0 deletions bin/beeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi

# Compute classpath using external script
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi

CLASS="org.apache.hive.beeline.BeeLine"
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
1 change: 1 addition & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
fi

Expand Down
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
36 changes: 36 additions & 0 deletions bin/spark-sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Shell script for starting the Spark SQL CLI

# Enter posix mode for bash
set -o posix

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/spark-sql [options]"
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<properties>
<sbt.project.name>core</sbt.project.name>
<sbt.project.name>core</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Core</name>
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle

Expand Down Expand Up @@ -63,8 +62,7 @@ class ShuffleDependency[K, V, C](
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val sortOrder: Option[SortOrder] = None)
val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
69 changes: 68 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, List => JList, Map => JMap}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -129,6 +129,73 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, seed)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, Utils.random.nextLong)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ private[spark] object PythonRDD extends Logging {
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
// TODO: Figure out why flatMap is necessay for pyspark
iter.flatMap { row =>
unpickle.loads(row) match {
// in case of objects are pickled in batch mode
case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
// Incase the partition doesn't have a collection
// not in batch mode
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
}
}
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ object SparkSubmit {
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

// A special jar name that indicates the class being run is inside of Spark itself, and therefore
// no user jar is needed.
private val SPARK_INTERNAL = "spark-internal"

// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
Expand Down Expand Up @@ -257,7 +261,9 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
Expand Down Expand Up @@ -332,7 +338,7 @@ object SparkSubmit {
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
!isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource)
}

/**
Expand All @@ -349,6 +355,10 @@ object SparkSubmit {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}

private[spark] def isInternal(primaryResource: String): Boolean = {
primaryResource == SPARK_INTERNAL
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {

/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
// Delineates parsing of Spark options from parsing of user options.
var inSparkOpts = true

// Delineates parsing of Spark options from parsing of user options.
parse(opts)

def parse(opts: Seq[String]): Unit = opts match {
Expand Down Expand Up @@ -318,7 +319,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource =
if (!SparkSubmit.isShell(v)) {
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
Utils.resolveURI(v).toString
} else {
v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V, P](self, part)
.setKeyOrdering(ordering)
.setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
}

private[spark] object SortOrder extends Enumeration {
type SortOrder = Value
val ASCENDING, DESCENDING = Value
}
Loading

0 comments on commit 4bdc42c

Please sign in to comment.