Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into viz2
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 4, 2015
2 parents 429e9e1 + 1ffa8cb commit 832443c
Show file tree
Hide file tree
Showing 15 changed files with 1,174 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ object SparkSubmit {
}

/** Provides utility functions to be used inside SparkSubmit. */
private[deploy] object SparkSubmitUtils {
private[spark] object SparkSubmitUtils {

// Exposed for testing
var printStream = SparkSubmit.printStream
Expand Down
4 changes: 0 additions & 4 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ Because HDFS is not protocol-compatible across versions, if you want to read fro
<tr><th>Hadoop version</th><th>Profile required</th></tr>
</thead>
<tbody>
<tr><td>0.23.x</td><td>hadoop-0.23</td></tr>
<tr><td>1.x to 2.1.x</td><td>(none)</td></tr>
<tr><td>2.2.x</td><td>hadoop-2.2</td></tr>
<tr><td>2.3.x</td><td>hadoop-2.3</td></tr>
Expand All @@ -82,9 +81,6 @@ mvn -Dhadoop.version=1.2.1 -DskipTests clean package

# Cloudera CDH 4.2.0 with MapReduce v1
mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package

# Apache Hadoop 0.23.x
mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package
{% endhighlight %}

You can enable the "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". Spark only supports YARN versions 2.2.0 and later.
Expand Down
3 changes: 0 additions & 3 deletions docs/hadoop-third-party-distributions.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors.
<tr><th>Release</th><th>Version code</th></tr>
<tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-cdh4.X.X</td></tr>
<tr><td>CDH 4.X.X</td><td>2.0.0-mr1-cdh4.X.X</td></tr>
<tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
<tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
<tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>
</table>
</td>
<td>
Expand Down
2 changes: 1 addition & 1 deletion make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ while (( "$#" )); do
--hadoop)
echo "Error: '--hadoop' is no longer supported:"
echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead."
echo "Error: Related profiles include hadoop-0.23, hdaoop-2.2, hadoop-2.3 and hadoop-2.4."
echo "Error: Related profiles include hadoop-2.2, hadoop-2.3 and hadoop-2.4."
exit_with_usage
;;
--with-yarn)
Expand Down
14 changes: 0 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1614,20 +1614,6 @@
http://hadoop.apache.org/docs/ra.b.c/hadoop-project-dist/hadoop-common/dependency-analysis.html
-->

<profile>
<id>hadoop-0.23</id>
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a Hadoop 0.23.X issue -->
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies>
<properties>
<hadoop.version>0.23.10</hadoop.version>
</properties>
</profile>

<profile>
<id>hadoop-2.2</id>
<properties>
Expand Down
84 changes: 84 additions & 0 deletions python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import itertools

__all__ = ['ParamGridBuilder']


class ParamGridBuilder(object):
"""
Builder for a param grid used in grid search-based model selection.
>>> from classification import LogisticRegression
>>> lr = LogisticRegression()
>>> output = ParamGridBuilder().baseOn({lr.labelCol: 'l'}) \
.baseOn([lr.predictionCol, 'p']) \
.addGrid(lr.regParam, [1.0, 2.0, 3.0]) \
.addGrid(lr.maxIter, [1, 5]) \
.addGrid(lr.featuresCol, ['f']) \
.build()
>>> expected = [ \
{lr.regParam: 1.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 2.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 3.0, lr.featuresCol: 'f', lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 1.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 2.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}, \
{lr.regParam: 3.0, lr.featuresCol: 'f', lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}]
>>> len(output) == len(expected)
True
>>> all([m in expected for m in output])
True
"""

def __init__(self):
self._param_grid = {}

def addGrid(self, param, values):
"""
Sets the given parameters in this grid to fixed values.
"""
self._param_grid[param] = values

return self

def baseOn(self, *args):
"""
Sets the given parameters in this grid to fixed values.
Accepts either a parameter dictionary or a list of (parameter, value) pairs.
"""
if isinstance(args[0], dict):
self.baseOn(*args[0].items())
else:
for (param, value) in args:
self.addGrid(param, [value])

return self

def build(self):
"""
Builds and returns all combinations of parameters specified
by the param grid.
"""
keys = self._param_grid.keys()
grid_values = self._param_grid.values()
return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)]


if __name__ == "__main__":
import doctest
doctest.testmod()
1 change: 1 addition & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ function run_ml_tests() {
echo "Run ml tests ..."
run_test "pyspark/ml/feature.py"
run_test "pyspark/ml/classification.py"
run_test "pyspark/ml/tuning.py"
run_test "pyspark/ml/tests.py"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
*/
class NoSuchTableException extends Exception

class NoSuchDatabaseException extends Exception

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,31 @@

package org.apache.spark.sql.catalyst

import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
import java.io._

import org.apache.spark.util.Utils

package object util {

/** Silences output to stderr or stdout for the duration of f */
def quietly[A](f: => A): A = {
val origErr = System.err
val origOut = System.out
try {
System.setErr(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))
System.setOut(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))

f
} finally {
System.setErr(origErr)
System.setOut(origOut)
}
}

def fileToString(file: File, encoding: String = "UTF-8"): String = {
val inStream = new FileInputStream(file)
val outStream = new ByteArrayOutputStream
Expand All @@ -42,10 +61,9 @@ package object util {
new String(outStream.toByteArray, encoding)
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
def resourceToBytes(
resource: String,
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
Expand All @@ -61,7 +79,14 @@ package object util {
finally {
inStream.close()
}
new String(outStream.toByteArray, encoding)
outStream.toByteArray
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
new String(resourceToBytes(resource, classLoader), encoding)
}

def stringToFile(file: File, str: String): File = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.client

import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}

case class HiveDatabase(
name: String,
location: String)

abstract class TableType { val name: String }
case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }

case class HiveStorageDescriptor(
location: String,
inputFormat: String,
outputFormat: String,
serde: String)

case class HivePartition(
values: Seq[String],
storage: HiveStorageDescriptor)

case class HiveColumn(name: String, hiveType: String, comment: String)
case class HiveTable(
specifiedDatabase: Option[String],
name: String,
schema: Seq[HiveColumn],
partitionColumns: Seq[HiveColumn],
properties: Map[String, String],
serdeProperties: Map[String, String],
tableType: TableType,
location: Option[String] = None,
inputFormat: Option[String] = None,
outputFormat: Option[String] = None,
serde: Option[String] = None) {

@transient
private[client] var client: ClientInterface = _

private[client] def withClient(ci: ClientInterface): this.type = {
client = ci
this
}

def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))

def isPartitioned: Boolean = partitionColumns.nonEmpty

def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)

// Hive does not support backticks when passing names to the client.
def qualifiedName: String = s"$database.$name"
}

/**
* An externally visible interface to the Hive client. This interface is shared across both the
* internal and external classloaders for a given version of Hive and thus must expose only
* shared classes.
*/
trait ClientInterface {
/**
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
* result in one string.
*/
def runSqlHive(sql: String): Seq[String]

/** Returns the names of all tables in the given database. */
def listTables(dbName: String): Seq[String]

/** Returns the name of the active database. */
def currentDatabase: String

/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
def getDatabase(name: String): HiveDatabase = {
getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
}

/** Returns the metadata for a given database, or None if it doesn't exist. */
def getDatabaseOption(name: String): Option[HiveDatabase]

/** Returns the specified table, or throws [[NoSuchTableException]]. */
def getTable(dbName: String, tableName: String): HiveTable = {
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
}

/** Returns the metadata for the specified table or None if it doens't exist. */
def getTableOption(dbName: String, tableName: String): Option[HiveTable]

/** Creates a table with the given metadata. */
def createTable(table: HiveTable): Unit

/** Updates the given table with new metadata. */
def alterTable(table: HiveTable): Unit

/** Creates a new database with the given name. */
def createDatabase(database: HiveDatabase): Unit

/** Returns all partitions for the given table. */
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]

/** Loads a static partition into an existing table. */
def loadPartition(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit

/** Loads data into an existing table. */
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit

/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit

/** Used for testing only. Removes all metadata from this instance of Hive. */
def reset(): Unit
}
Loading

0 comments on commit 832443c

Please sign in to comment.