Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into dsl-missing-…
Browse files Browse the repository at this point in the history
…operator
  • Loading branch information
sarutak committed Dec 2, 2014
2 parents 1b88e2e + b0a46d8 commit 8f366f8
Show file tree
Hide file tree
Showing 18 changed files with 93 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ dependency-reduced-pom.xml
checkpoint
derby.log
dist/
spark-*-bin.tar.gz
spark-*-bin-*.tgz
unit-tests.log
/lib/
rat-results.txt
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[spark] class PipedRDD[T: ClassTag](

/**
* A FilenameFilter that accepts anything that isn't equal to the name passed in.
* @param name of file or directory to leave out
* @param filterName of file or directory to leave out
*/
class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean = {
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,16 @@ private[spark] object AkkaUtils extends Logging {
Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds")
}

private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024

/** Returns the configured max frame size for Akka messages in bytes. */
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
val frameSizeInMB = conf.getInt("spark.akka.frameSize", 10)
if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
throw new IllegalArgumentException("spark.akka.frameSize should not be greater than "
+ AKKA_MAX_FRAME_SIZE_IN_MB + "MB")
}
frameSizeInMB * 1024 * 1024
}

/** Space reserved for extra data in an Akka message besides serialized task or task result. */
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ object ShuffleSuite {

def mergeCombineException(x: Int, y: Int): Int = {
throw new SparkException("Exception for map-side combine.")
x + y
}

class NonJavaSerializableClass(val value: Int) extends Comparable[NonJavaSerializableClass] {
Expand Down
8 changes: 7 additions & 1 deletion docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,12 @@ for details.
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network. </td>
</tr>
<tr>
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td>
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
each partition because it can push the sorting down into the shuffle machinery. </td>
</tr>
</table>

### Actions
Expand Down Expand Up @@ -1177,7 +1183,7 @@ Accumulators are variables that are only "added" to through an associative opera
therefore be efficiently supported in parallel. They can be used to implement counters (as in
MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers
can add support for new types. If accumulators are created with a name, they will be
displayed in Spark's UI. This can can be useful for understanding the progress of
displayed in Spark's UI. This can be useful for understanding the progress of
running stages (NOTE: this is not yet supported in Python).

An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
Expand Down
50 changes: 41 additions & 9 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ describes the various methods for loading data into a SchemaRDD.

Spark SQL supports two different methods for converting existing RDDs into SchemaRDDs. The first
method uses reflection to infer the schema of an RDD that contains specific types of objects. This
reflection based approach leads to more concise code and works well when you already know the schema
reflection based approach leads to more concise code and works well when you already know the schema
while writing your Spark application.

The second method for creating SchemaRDDs is through a programmatic interface that allows you to
Expand Down Expand Up @@ -566,7 +566,7 @@ for teenName in teenNames.collect():

### Configuration

Configuration of Parquet can be done using the `setConf` method on SQLContext or by running
Configuration of Parquet can be done using the `setConf` method on SQLContext or by running
`SET key=value` commands using SQL.

<table class="table">
Expand All @@ -575,8 +575,8 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
<td><code>spark.sql.parquet.binaryAsString</code></td>
<td>false</td>
<td>
Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
not differentiate between binary data and strings when writing out the Parquet schema. This
Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
not differentiate between binary data and strings when writing out the Parquet schema. This
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
</td>
</tr>
Expand All @@ -591,10 +591,20 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
<td><code>spark.sql.parquet.compression.codec</code></td>
<td>gzip</td>
<td>
Sets the compression codec use when writing Parquet files. Acceptable values include:
Sets the compression codec use when writing Parquet files. Acceptable values include:
uncompressed, snappy, gzip, lzo.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.filterPushdown</code></td>
<td>false</td>
<td>
Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known
bug in Paruet 1.6.0rc3 (<a href="https://issues.apache.org/jira/browse/PARQUET-136">PARQUET-136</a>).
However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn
this feature on.
</td>
</tr>
<tr>
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
<td>true</td>
Expand Down Expand Up @@ -900,7 +910,6 @@ export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...
```
{% endhighlight %}

or system properties:
Expand All @@ -911,7 +920,6 @@ or system properties:
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...
```
{% endhighlight %}

Now you can use beeline to test the Thrift JDBC/ODBC server:
Expand Down Expand Up @@ -947,7 +955,7 @@ options.

## Migration Guide for Shark User

### Scheduling
### Scheduling
To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
users can set the `spark.sql.thriftserver.scheduler.pool` variable:

Expand Down Expand Up @@ -994,7 +1002,7 @@ Several caching related features are not supported yet:
## Compatibility with Apache Hive

Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Spark
SQL is based on Hive 0.12.0.
SQL is based on Hive 0.12.0 and 0.13.1.

#### Deploying in Existing Hive Warehouses

Expand Down Expand Up @@ -1033,6 +1041,7 @@ Spark SQL supports the vast majority of Hive features, such as:
* Sampling
* Explain
* Partitioned tables
* View
* All Hive DDL Functions, including:
* `CREATE TABLE`
* `CREATE TABLE AS SELECT`
Expand All @@ -1048,6 +1057,7 @@ Spark SQL supports the vast majority of Hive features, such as:
* `STRING`
* `BINARY`
* `TIMESTAMP`
* `DATE`
* `ARRAY<>`
* `MAP<>`
* `STRUCT<>`
Expand Down Expand Up @@ -1148,6 +1158,7 @@ evaluated by the SQL execution engine. A full list of the functions supported c
* Datetime type
- `TimestampType`: Represents values comprising values of fields year, month, day,
hour, minute, and second.
- `DateType`: Represents values comprising values of fields year, month, day.
* Complex types
- `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of
elements with the type of `elementType`. `containsNull` is used to indicate if
Expand Down Expand Up @@ -1255,6 +1266,13 @@ import org.apache.spark.sql._
TimestampType
</td>
</tr>
<tr>
<td> <b>DateType</b> </td>
<td> java.sql.Date </td>
<td>
DateType
</td>
</tr>
<tr>
<td> <b>ArrayType</b> </td>
<td> scala.collection.Seq </td>
Expand Down Expand Up @@ -1381,6 +1399,13 @@ please use factory methods provided in
DataType.TimestampType
</td>
</tr>
<tr>
<td> <b>DateType</b> </td>
<td> java.sql.Date </td>
<td>
DataType.DateType
</td>
</tr>
<tr>
<td> <b>ArrayType</b> </td>
<td> java.util.List </td>
Expand Down Expand Up @@ -1528,6 +1553,13 @@ from pyspark.sql import *
TimestampType()
</td>
</tr>
<tr>
<td> <b>DateType</b> </td>
<td> datetime.date </td>
<td>
DateType()
</td>
</tr>
<tr>
<td> <b>ArrayType</b> </td>
<td> list, tuple, or array </td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ object HiveFromSpark {
val sc = new SparkContext(sparkConf)
val path = s"${System.getenv("SPARK_HOME")}/examples/src/main/resources/kv1.txt"

// A local hive context creates an instance of the Hive Metastore in process, storing
// the warehouse data in the current directory. This location can be overridden by
// specifying a second parameter to the constructor.
// A hive context adds support for finding tables in the MetaStore and writing queries
// using HiveQL. Users who do not have an existing Hive deployment can still create a
// HiveContext. When not configured by the hive-site.xml, the context automatically
// creates metastore_db and warehouse in the current directory.
val hiveContext = new HiveContext(sc)
import hiveContext._

Expand Down
4 changes: 2 additions & 2 deletions external/mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
<version>0.4.0</version>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
Expand Down
3 changes: 3 additions & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ if [ -e "$FWDIR"/CHANGES.txt ]; then
cp "$FWDIR/CHANGES.txt" "$DISTDIR"
fi

# Copy data files
cp -r "$FWDIR/data" "$DISTDIR"

# Copy other things
mkdir "$DISTDIR"/conf
cp "$FWDIR"/conf/*.template "$DISTDIR"/conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {

/** Generate all variations of upper and lower case of a given string */
def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
if (s.isEmpty) {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toLower) #:::
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class SqlParser extends AbstractSparkSQLParser {
| SUM ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => SumDistinct(exp) }
| COUNT ~ "(" ~> "*" <~ ")" ^^ { case _ => Count(Literal(1)) }
| COUNT ~ "(" ~> expression <~ ")" ^^ { case exp => Count(exp) }
| COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => CountDistinct(exp :: Nil) }
| COUNT ~> "(" ~> DISTINCT ~> repsep(expression, ",") <~ ")" ^^
{ case exps => CountDistinct(exps) }
| APPROXIMATE ~ COUNT ~ "(" ~ DISTINCT ~> expression <~ ")" ^^
{ case exp => ApproxCountDistinct(exp) }
| APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ COUNT ~ "(" ~ DISTINCT ~ expression <~ ")" ^^
Expand Down Expand Up @@ -340,18 +341,13 @@ class SqlParser extends AbstractSparkSQLParser {
| floatLit ^^ { f => Literal(f.toDouble) }
)

private val longMax = BigDecimal(s"${Long.MaxValue}")
private val longMin = BigDecimal(s"${Long.MinValue}")
private val intMax = BigDecimal(s"${Int.MaxValue}")
private val intMin = BigDecimal(s"${Int.MinValue}")

private def toNarrowestIntegerType(value: String) = {
val bigIntValue = BigDecimal(value)

bigIntValue match {
case v if v < longMin || v > longMax => v
case v if v < intMin || v > intMax => v.toLong
case v => v.toInt
case v if bigIntValue.isValidInt => v.toIntExact
case v if bigIntValue.isValidLong => v.toLongExact
case v => v
}
}

Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ class SchemaRDD(
* {{{
* schemaRDD.limit(10)
* }}}
*
* @group Query
*/
def limit(limitNum: Int): SchemaRDD =
new SchemaRDD(sqlContext, Limit(Literal(limitNum), logicalPlan))
Expand Down Expand Up @@ -355,6 +357,8 @@ class SchemaRDD(
* Return the number of elements in the RDD. Unlike the base RDD implementation of count, this
* implementation leverages the query optimizer to compute the count on the SchemaRDD, which
* supports features such as filter pushdown.
*
* @group Query
*/
@Experimental
override def count(): Long = aggregate(Count(Literal(1))).collect().head.getLong(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import scala.collection.JavaConversions._

/**
* Allows creation of parquet based tables using the syntax
* `CREATE TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option required
* is `path`, which should be the location of a collection of, optionally partitioned,
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
class DefaultSource extends RelationProvider {
Expand All @@ -49,7 +49,7 @@ class DefaultSource extends RelationProvider {
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val path =
parameters.getOrElse("path", sys.error("'path' must be specifed for parquet tables."))
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))

ParquetRelation2(path)(sqlContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable

/**
* CREATE FOREIGN TEMPORARY TABLE avroTable
* CREATE TEMPORARY TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,4 +992,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
"nulldata2 on nulldata1.value <=> nulldata2.value"),
(1 to 2).map(i => Seq(i)))
}

test("Multi-column COUNT(DISTINCT ...)") {
val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.registerTempTable("distinctData")
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), 2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private[hive] object HiveQl {
protected def nameExpressions(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
case (ne: NamedExpression, _) => ne
case (e, i) => Alias(e, s"c_$i")()
case (e, i) => Alias(e, s"_c$i")()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.hive._
* :: DeveloperApi ::
* The Hive table scan operator. Column and partition pruning are both handled.
*
* @param attributes Attributes to be fetched from the Hive table.
* @param requestedAttributes Attributes to be fetched from the Hive table.
* @param relation The Hive table be be scanned.
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[hive] case class HiveGenericUdtf(
}

override protected def makeOutput() = {
// Use column names when given, otherwise c_1, c_2, ... c_n.
// Use column names when given, otherwise _c1, _c2, ... _cn.
if (aliasNames.size == outputDataTypes.size) {
aliasNames.zip(outputDataTypes).map {
case (attrName, attrDataType) =>
Expand All @@ -288,7 +288,7 @@ private[hive] case class HiveGenericUdtf(
} else {
outputDataTypes.zipWithIndex.map {
case (attrDataType, i) =>
AttributeReference(s"c_$i", attrDataType, nullable = true)()
AttributeReference(s"_c$i", attrDataType, nullable = true)()
}
}
}
Expand Down

0 comments on commit 8f366f8

Please sign in to comment.