Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into concurrent-sql-…
Browse files Browse the repository at this point in the history
…executions
  • Loading branch information
Andrew Or committed Sep 14, 2015
2 parents 984a92f + 8a634e9 commit fce3819
Show file tree
Hide file tree
Showing 23 changed files with 137 additions and 59 deletions.
6 changes: 6 additions & 0 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
object Bagel extends Logging {
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -270,18 +271,21 @@ object Bagel extends Logging {
}
}

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Combiner[M, C] {
def createCombiner(msg: M): C
def mergeMsg(combiner: C, msg: M): C
def mergeCombiners(a: C, b: C): C
}

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Aggregator[V, A] {
def createAggregator(vert: V): A
def mergeAggregators(a: A, b: A): A
}

/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M] =
Array(msg)
Expand All @@ -297,6 +301,7 @@ class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializab
* Subclasses may store state along with each vertex and must
* inherit from java.io.Serializable or scala.Serializable.
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Vertex {
def active: Boolean
}
Expand All @@ -307,6 +312,7 @@ trait Vertex {
* Subclasses may contain a payload to deliver to the target vertex
* and must inherit from java.io.Serializable or scala.Serializable.
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Message[K] {
def targetId: K
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
protected[spark] val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = {
// Note: make a clone such that changes in the parent properties aren't reflected in
// the those of the children threads, which has confusing semantics (SPARK-10564).
// the those of the children threads, which has confusing semantics (SPARK-10563).
SerializationUtils.clone(parent).asInstanceOf[Properties]
}
override protected def initialValue(): Properties = new Properties()
Expand Down
44 changes: 38 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*
* Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly.
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
* result of using direct output committer with speculation enabled.
*/
def saveAsHadoopFile(
path: String,
Expand All @@ -1030,10 +1035,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
Expand All @@ -1047,6 +1049,19 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
Expand All @@ -1057,6 +1072,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*
* Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly.
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
* result of using direct output committer with speculation enabled.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
Expand Down Expand Up @@ -1115,6 +1135,20 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = jobCommitter.getClass.getSimpleName
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
Expand All @@ -1129,7 +1163,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val wrappedConf = new SerializableConfiguration(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
Expand Down Expand Up @@ -1157,7 +1190,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.preSetup()

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
Expand Down
10 changes: 1 addition & 9 deletions docs/bagel-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ displayTitle: Bagel Programming Guide
title: Bagel
---

**Bagel will soon be superseded by [GraphX](graphx-programming-guide.html); we recommend that new users try GraphX instead.**
**Bagel is deprecated, and superseded by [GraphX](graphx-programming-guide.html).**

Bagel is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.

Expand Down Expand Up @@ -157,11 +157,3 @@ trait Message[K] {
def targetId: K
}
{% endhighlight %}

# Where to Go from Here

Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/org/apache/spark/examples/bagel`. You can run them by passing the class name to the `bin/run-example` script included in Spark; e.g.:

./bin/run-example org.apache.spark.examples.bagel.WikipediaPageRank

Each example program prints usage help when run without any arguments.
1 change: 0 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ options for deployment:
* [Spark SQL and DataFrames](sql-programming-guide.html): support for structured data and relational queries
* [MLlib](mllib-guide.html): built-in machine learning library
* [GraphX](graphx-programming-guide.html): Spark's new API for graph processing
* [Bagel (Pregel on Spark)](bagel-programming-guide.html): older, simple graph processing model

**API Docs:**

Expand Down
2 changes: 1 addition & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1687,7 +1687,7 @@ The following options can be used to configure the version of Hive that is used
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.sql.hive.metastore.version</code></td>
<td><code>0.13.1</code></td>
<td><code>1.2.1</code></td>
<td>
Version of the Hive metastore. Available
options are <code>0.12.0</code> through <code>1.2.1</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ final class DecisionTreeClassificationModel private[ml] (
}

override def toString: String = {
s"DecisionTreeClassificationModel of depth $depth with $numNodes nodes"
s"DecisionTreeClassificationModel (uid=$uid) of depth $depth with $numNodes nodes"
}

/** (private[ml]) Convert to a model in the old API */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ final class GBTClassificationModel(
}

override def toString: String = {
s"GBTClassificationModel with $numTrees trees"
s"GBTClassificationModel (uid=$uid) with $numTrees trees"
}

/** (private[ml]) Convert to a model in the old API */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class NaiveBayesModel private[ml] (
}

override def toString: String = {
s"NaiveBayesModel with ${pi.size} classes"
s"NaiveBayesModel (uid=$uid) with ${pi.size} classes"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ final class RandomForestClassificationModel private[ml] (
}

override def toString: String = {
s"RandomForestClassificationModel with $numTrees trees"
s"RandomForestClassificationModel (uid=$uid) with $numTrees trees"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class RFormula(override val uid: String) extends Estimator[RFormulaModel] with R

override def copy(extra: ParamMap): RFormula = defaultCopy(extra)

override def toString: String = s"RFormula(${get(formula)})"
override def toString: String = s"RFormula(${get(formula)}) (uid=$uid)"
}

/**
Expand Down Expand Up @@ -171,7 +171,7 @@ class RFormulaModel private[feature](
override def copy(extra: ParamMap): RFormulaModel = copyValues(
new RFormulaModel(uid, resolvedFormula, pipelineModel))

override def toString: String = s"RFormulaModel(${resolvedFormula})"
override def toString: String = s"RFormulaModel(${resolvedFormula}) (uid=$uid)"

private def transformLabel(dataset: DataFrame): DataFrame = {
val labelName = resolvedFormula.label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.ml.Transformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, NumericType, StringType, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashMap

/**
Expand Down Expand Up @@ -229,8 +229,7 @@ class IndexToString private[ml] (
val outputColName = $(outputCol)
require(inputFields.forall(_.name != outputColName),
s"Output column $outputColName already exists.")
val attr = NominalAttribute.defaultAttr.withName($(outputCol))
val outputFields = inputFields :+ attr.toStructField()
val outputFields = inputFields :+ StructField($(outputCol), StringType)
StructType(outputFields)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ final class DecisionTreeRegressionModel private[ml] (
}

override def toString: String = {
s"DecisionTreeRegressionModel of depth $depth with $numNodes nodes"
s"DecisionTreeRegressionModel (uid=$uid) of depth $depth with $numNodes nodes"
}

/** Convert to a model in the old API */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ final class GBTRegressionModel(
}

override def toString: String = {
s"GBTRegressionModel with $numTrees trees"
s"GBTRegressionModel (uid=$uid) with $numTrees trees"
}

/** (private[ml]) Convert to a model in the old API */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ final class RandomForestRegressionModel private[ml] (
}

override def toString: String = {
s"RandomForestRegressionModel with $numTrees trees"
s"RandomForestRegressionModel (uid=$uid) with $numTrees trees"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ private[python] class PythonMLLibAPI extends Serializable {
regParam: Double,
regType: String,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val lrAlg = new LinearRegressionWithSGD()
lrAlg.setIntercept(intercept)
.setValidateData(validateData)
Expand All @@ -141,6 +142,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
lrAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
lrAlg,
Expand All @@ -159,7 +161,8 @@ private[python] class PythonMLLibAPI extends Serializable {
miniBatchFraction: Double,
initialWeights: Vector,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val lassoAlg = new LassoWithSGD()
lassoAlg.setIntercept(intercept)
.setValidateData(validateData)
Expand All @@ -168,6 +171,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
trainRegressionModel(
lassoAlg,
data,
Expand All @@ -185,7 +189,8 @@ private[python] class PythonMLLibAPI extends Serializable {
miniBatchFraction: Double,
initialWeights: Vector,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val ridgeAlg = new RidgeRegressionWithSGD()
ridgeAlg.setIntercept(intercept)
.setValidateData(validateData)
Expand All @@ -194,6 +199,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
trainRegressionModel(
ridgeAlg,
data,
Expand All @@ -212,7 +218,8 @@ private[python] class PythonMLLibAPI extends Serializable {
initialWeights: Vector,
regType: String,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val SVMAlg = new SVMWithSGD()
SVMAlg.setIntercept(intercept)
.setValidateData(validateData)
Expand All @@ -221,6 +228,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
SVMAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
SVMAlg,
Expand All @@ -240,7 +248,8 @@ private[python] class PythonMLLibAPI extends Serializable {
regParam: Double,
regType: String,
intercept: Boolean,
validateData: Boolean): JList[Object] = {
validateData: Boolean,
convergenceTol: Double): JList[Object] = {
val LogRegAlg = new LogisticRegressionWithSGD()
LogRegAlg.setIntercept(intercept)
.setValidateData(validateData)
Expand All @@ -249,6 +258,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
.setConvergenceTol(convergenceTol)
LogRegAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
LogRegAlg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.ml.feature

import org.apache.spark.sql.types.{StringType, StructType, StructField, DoubleType}
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ml.attribute.{Attribute, NominalAttribute}
import org.apache.spark.ml.param.ParamsSuite
Expand Down Expand Up @@ -165,4 +166,11 @@ class StringIndexerSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(a === b)
}
}

test("IndexToString.transformSchema (SPARK-10573)") {
val idxToStr = new IndexToString().setInputCol("input").setOutputCol("output")
val inSchema = StructType(Seq(StructField("input", DoubleType)))
val outSchema = idxToStr.transformSchema(inSchema)
assert(outSchema("output").dataType === StringType)
}
}
Loading

0 comments on commit fce3819

Please sign in to comment.