{% highlight scala %}
+import org.apache.spark.mllib.linalg.Matrix
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.linalg.SingularValueDecomposition
+
val mat: RowMatrix = ...
// Compute the top 20 singular values and corresponding singular vectors.
@@ -74,6 +78,9 @@ and use them to project the vectors into a low-dimensional space.
The number of columns should be small, e.g, less than 1000.
{% highlight scala %}
+import org.apache.spark.mllib.linalg.Matrix
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+
val mat: RowMatrix = ...
// Compute the top 10 principal components.
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index c49f857d07557..842ca5c8c6d8a 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -94,7 +94,7 @@ import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
double[] array = ... // a double array
-Vector vector = Vectors.dense(array) // a dense vector
+Vector vector = Vectors.dense(array); // a dense vector
{% endhighlight %}
[`Vectors`](api/mllib/index.html#org.apache.spark.mllib.linalg.Vectors$) provides factory methods to
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index ebb555f974bf7..40b7a7f80708c 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -63,7 +63,7 @@ methods MLlib supports:
hinge loss | $\max \{0, 1-y \wv^T \x \}, \quad y \in \{-1, +1\}$ |
- $\begin{cases}-y \cdot \x & \text{if $y \wv^T \x <1$}, \\ 0 &
+ | $\begin{cases}-y \cdot \x & \text{if $y \wv^T \x <1$}, \\ 0 &
\text{otherwise}.\end{cases}$ |
@@ -225,10 +225,11 @@ algorithm for 200 iterations.
import org.apache.spark.mllib.optimization.L1Updater
val svmAlg = new SVMWithSGD()
-svmAlg.optimizer.setNumIterations(200)
- .setRegParam(0.1)
- .setUpdater(new L1Updater)
-val modelL1 = svmAlg.run(parsedData)
+svmAlg.optimizer.
+ setNumIterations(200).
+ setRegParam(0.1).
+ setUpdater(new L1Updater)
+val modelL1 = svmAlg.run(training)
{% endhighlight %}
Similarly, you can use replace `SVMWithSGD` by
@@ -322,7 +323,7 @@ val valuesAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
-val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.reduce(_ + _) / valuesAndPreds.count
+val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
println("training Mean Squared Error = " + MSE)
{% endhighlight %}
diff --git a/docs/mllib-naive-bayes.md b/docs/mllib-naive-bayes.md
index 6160fe5b2fe8c..c47508b7daa2d 100644
--- a/docs/mllib-naive-bayes.md
+++ b/docs/mllib-naive-bayes.md
@@ -7,13 +7,13 @@ Naive Bayes is a simple multiclass classification algorithm with the assumption
between every pair of features. Naive Bayes can be trained very efficiently. Within a single pass to
the training data, it computes the conditional probability distribution of each feature given label,
and then it applies Bayes' theorem to compute the conditional probability distribution of label
-given an observation and use it for prediction. For more details, please visit the wikipedia page
+given an observation and use it for prediction. For more details, please visit the Wikipedia page
[Naive Bayes classifier](http://en.wikipedia.org/wiki/Naive_Bayes_classifier).
In MLlib, we implemented multinomial naive Bayes, which is typically used for document
classification. Within that context, each observation is a document, each feature represents a term,
-whose value is the frequency of the term. For its formulation, please visit the wikipedia page
-[Multinomial naive Bayes](http://en.wikipedia.org/wiki/Naive_Bayes_classifier#Multinomial_naive_Bayes)
+whose value is the frequency of the term. For its formulation, please visit the Wikipedia page
+[Multinomial Naive Bayes](http://en.wikipedia.org/wiki/Naive_Bayes_classifier#Multinomial_naive_Bayes)
or the section
[Naive Bayes text classification](http://nlp.stanford.edu/IR-book/html/htmledition/naive-bayes-text-classification-1.html)
from the book Introduction to Information
@@ -36,9 +36,18 @@ can be used for evaluation and prediction.
{% highlight scala %}
import org.apache.spark.mllib.classification.NaiveBayes
-
-val training: RDD[LabeledPoint] = ... // training set
-val test: RDD[LabeledPoint] = ... // test set
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
+
+val data = sc.textFile("mllib/data/sample_naive_bayes_data.txt")
+val parsedData = data.map { line =>
+ val parts = line.split(',')
+ LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
+}
+// Split data into training (60%) and test (40%).
+val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
+val training = splits(0)
+val test = splits(1)
val model = NaiveBayes.train(training, lambda = 1.0)
val prediction = model.predict(test.map(_.features))
@@ -58,29 +67,36 @@ optionally smoothing parameter `lambda` as input, and output a
can be used for evaluation and prediction.
{% highlight java %}
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.NaiveBayes;
+import org.apache.spark.mllib.classification.NaiveBayesModel;
+import org.apache.spark.mllib.regression.LabeledPoint;
+import scala.Tuple2;
JavaRDD training = ... // training set
JavaRDD test = ... // test set
-NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
+final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
-JavaRDD prediction = model.predict(test.map(new Function() {
- public Vector call(LabeledPoint p) {
- return p.features();
+JavaRDD prediction =
+ test.map(new Function() {
+ @Override public Double call(LabeledPoint p) {
+ return model.predict(p.features());
}
- })
+ });
JavaPairRDD predictionAndLabel =
prediction.zip(test.map(new Function() {
- public Double call(LabeledPoint p) {
+ @Override public Double call(LabeledPoint p) {
return p.label();
}
- })
+ }));
double accuracy = 1.0 * predictionAndLabel.filter(new Function, Boolean>() {
- public Boolean call(Tuple2 pl) {
+ @Override public Boolean call(Tuple2 pl) {
return pl._1() == pl._2();
}
- }).count() / test.count()
+ }).count() / test.count();
{% endhighlight %}
@@ -93,7 +109,7 @@ smoothing parameter `lambda` as input, and output a
[NaiveBayesModel](api/pyspark/pyspark.mllib.classification.NaiveBayesModel-class.html), which can be
used for evaluation and prediction.
-
+
{% highlight python %}
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 98c456228af9f..6813963bb080c 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -60,12 +60,9 @@ By default, PySpark requires `python` to be available on the system `PATH` and u
All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
-Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
-The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
+Standalone PySpark applications should be run using the `bin/spark-submit` script, which automatically
+configures the Java and Python environment for running Spark.
-# Running PySpark on YARN
-
-To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
# Interactive Use
@@ -103,7 +100,7 @@ $ MASTER=local[4] ./bin/pyspark
## IPython
-It is also possible to launch PySpark in [IPython](http://ipython.org), the
+It is also possible to launch the PySpark shell in [IPython](http://ipython.org), the
enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To
use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`:
@@ -123,18 +120,17 @@ IPython also works on a cluster or on multiple cores if you set the `MASTER` env
# Standalone Programs
-PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/pyspark`.
+PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/spark-submit`.
The Quick Start guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application.
-Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor:
+Code dependencies can be deployed by passing .zip or .egg files in the `--py-files` option of `spark-submit`:
-{% highlight python %}
-from pyspark import SparkContext
-sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
+{% highlight bash %}
+./bin/spark-submit --py-files lib1.zip,lib2.zip my_script.py
{% endhighlight %}
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
-Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
+Code dependencies can also be added to an existing SparkContext at runtime using its `addPyFile()` method.
You can set [configuration properties](configuration.html#spark-properties) by passing a
[SparkConf](api/python/pyspark.conf.SparkConf-class.html) object to SparkContext:
@@ -142,12 +138,16 @@ You can set [configuration properties](configuration.html#spark-properties) by p
{% highlight python %}
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
- .setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
{% endhighlight %}
+`spark-submit` supports launching Python applications on standalone, Mesos or YARN clusters, through
+its `--master` argument. However, it currently requires the Python driver program to run on the local
+machine, not the cluster (i.e. the `--deploy-mode` parameter cannot be `cluster`).
+
+
# API Docs
[API documentation](api/python/index.html) for PySpark is available as Epydoc.
@@ -161,9 +161,9 @@ some example applications.
# Where to Go from Here
-PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples).
+PySpark also includes several sample programs in the [`examples/src/main/python` folder](https://github.com/apache/spark/tree/master/examples/src/main/python).
You can run them by passing the files to `pyspark`; e.g.:
- ./bin/pyspark python/examples/wordcount.py
+ ./bin/spark-submit examples/src/main/python/wordcount.py
Each program prints usage help when run without arguments.
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index b8d89cf00ffbd..f25e9cca88524 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -48,12 +48,12 @@ how to access a cluster. To create a `SparkContext` you first need to build a `S
that contains information about your application.
{% highlight scala %}
-val conf = new SparkConf().setAppName().setMaster()
+val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
{% endhighlight %}
-The `` parameter is a string specifying a [Spark, Mesos or YARN cluster URL](#master-urls)
-to connect to, or a special "local" string to run in local mode, as described below. `` is
+The `master` parameter is a string specifying a [Spark, Mesos or YARN cluster URL](#master-urls)
+to connect to, or a special "local" string to run in local mode, as described below. `appName` is
a name for your application, which will be shown in the cluster web UI. It's also possible to set
these variables [using a configuration file](cluster-overview.html#loading-configurations-from-a-file)
which avoids hard-coding the master name in your application.
@@ -81,9 +81,8 @@ The master URL passed to Spark can be in one of the following formats:
Master URL | Meaning |
local | Run Spark locally with one worker thread (i.e. no parallelism at all). |
- local[K] | Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
+ |
local[K] | Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). |
local[*] | Run Spark locally with as many worker threads as logical cores on your machine. |
-
spark://HOST:PORT | Connect to the given Spark standalone
cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
|
@@ -146,7 +145,7 @@ RDDs support two types of operations: *transformations*, which create a new data
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently -- for example, we can realize that a dataset created through `map` will be used in a `reduce` and return only the result of the `reduce` to the driver, rather than the larger mapped dataset.
-By default, each transformed RDD is recomputed each time you run an action on it. However, you may also *persist* an RDD in memory using the `persist` (or `cache`) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting datasets on disk, or replicated across the cluster. The next section in this document describes these options.
+By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also *persist* an RDD in memory using the `persist` (or `cache`) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting datasets on disk, or replicated across the cluster. The next section in this document describes these options.
The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD) for details):
@@ -280,8 +279,8 @@ it is computed in an action, it will be kept in memory on the nodes. The cache i
if any partition of an RDD is lost, it will automatically be recomputed using the transformations
that originally created it.
-In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to
-persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space),
+In addition, each persisted RDD can be stored using a different *storage level*, allowing you, for example,
+to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space),
or replicate it across nodes, or store the data in off-heap memory in [Tachyon](http://tachyon-project.org/).
These levels are chosen by passing a
[`org.apache.spark.storage.StorageLevel`](api/scala/index.html#org.apache.spark.storage.StorageLevel)
@@ -331,6 +330,8 @@ available storage levels is:
+Spark sometimes automatically persists intermediate state from RDD operations, even without users calling persist() or cache(). In particular, if a shuffle happens when computing an RDD, Spark will keep the outputs from the map side of the shuffle on disk to avoid re-computing the entire dependency graph if an RDD is re-used. We still recommend users call persist() if they plan to re-use an RDD iteratively.
+
### Which Storage Level to Choose?
Spark's storage levels are meant to provide different trade-offs between memory usage and CPU
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 0c743c9d6010d..8a785450adfde 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -416,3 +416,4 @@ results = hiveCtx.hql("FROM src SELECT key, value").collect()
{% endhighlight %}
+
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index e8b718b303560..939599aa6855b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -129,7 +129,7 @@ ssc.awaitTermination() // Wait for the computation to terminate
{% endhighlight %}
The complete code can be found in the Spark Streaming example
-[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala).
+[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala).
@@ -215,7 +215,7 @@ jssc.awaitTermination(); // Wait for the computation to terminate
{% endhighlight %}
The complete code can be found in the Spark Streaming example
-[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java).
+[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java).
@@ -234,12 +234,12 @@ Then, in a different terminal, you can start the example by using
{% highlight bash %}
-$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
+$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999
{% endhighlight %}
{% highlight bash %}
-$ ./bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999
+$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999
{% endhighlight %}
@@ -268,7 +268,7 @@ hello world
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
-$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
+$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
@@ -609,7 +609,7 @@ JavaPairDStream runningCounts = pairs.updateStateByKey(updateFu
The update function will be called for each word, with `newValues` having a sequence of 1's (from
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
Scala code, take a look at the example
-[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala).
+[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala).
Transform Operation
@@ -1135,7 +1135,7 @@ If the `checkpointDirectory` exists, then the context will be recreated from the
If the directory does not exist (i.e., running for the first time),
then the function `functionToCreateContext` will be called to create a new
context and set up the DStreams. See the Scala example
-[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala).
+[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
This example appends the word counts of network data into a file.
You can also explicitly create a `StreamingContext` from the checkpoint data and start the
@@ -1174,7 +1174,7 @@ If the `checkpointDirectory` exists, then the context will be recreated from the
If the directory does not exist (i.e., running for the first time),
then the function `contextFactory` will be called to create a new
context and set up the DStreams. See the Scala example
-[JavaRecoverableWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples/JavaRecoverableWordCount.scala)
+[JavaRecoverableWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaRecoverableWordCount.scala)
(note that this example is missing in the 0.9 release, so you can test it using the master branch).
This example appends the word counts of network data into a file.
@@ -1374,7 +1374,6 @@ package and renamed for better clarity.
[ZeroMQUtils](api/java/org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/org/apache/spark/streaming/mqtt/MQTTUtils.html)
-* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples)
- and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/streaming/examples)
-* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and
-[video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.
+* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
+ and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)
+* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
similarity index 98%
rename from examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
rename to examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
index c516199d61c72..4533c4c5f241a 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.mllib.examples;
+package org.apache.spark.examples.mllib;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
similarity index 98%
rename from examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
rename to examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
index 7461609ab9e8f..0cfb8e69ed28f 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.mllib.examples;
+package org.apache.spark.examples.mllib;
import java.util.regex.Pattern;
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
similarity index 98%
rename from examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
rename to examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
index e3ab87cc722f3..f6e48b498727b 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.mllib.examples;
+package org.apache.spark.examples.mllib;
import java.util.regex.Pattern;
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
similarity index 98%
rename from examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
rename to examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index e36c7800be23e..7f558f3ee713a 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples;
+package org.apache.spark.examples.streaming;
import com.google.common.collect.Lists;
@@ -48,7 +48,7 @@
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run org.apache.spark.streaming.examples.JavaCustomReceiver local[2] localhost 9999`
+ * `$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] localhost 9999`
*/
public class JavaCustomReceiver extends Receiver {
@@ -149,5 +149,3 @@ private void receive() {
}
}
}
-
-
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
similarity index 96%
rename from examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
rename to examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index c59f7538f8a82..a5ece68cef870 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples;
+package org.apache.spark.examples.streaming;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
similarity index 96%
rename from examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
rename to examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 8da9bcd05ac38..da51eb189a649 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples;
+package org.apache.spark.examples.streaming;
import java.util.Map;
import java.util.HashMap;
@@ -26,6 +26,7 @@
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -44,7 +45,7 @@
* is the number of threads the kafka consumer should use
*
* Example:
- * `./bin/run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
+ * `./bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount local[2] zoo01,zoo02,
* zoo03 my-consumer-group topic1,topic2 1`
*/
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
similarity index 95%
rename from examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
rename to examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 098c329ff6808..ac84991d87b8b 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples;
+package org.apache.spark.examples.streaming;
import com.google.common.collect.Lists;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -23,6 +23,7 @@
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -39,7 +40,7 @@
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
+ * `$ ./run org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999`
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
similarity index 96%
rename from examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
rename to examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
index 88ad341641e0a..819311968fac5 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples;
+package org.apache.spark.examples.streaming;
import com.google.common.collect.Lists;
import scala.Tuple2;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
diff --git a/python/examples/als.py b/examples/src/main/python/als.py
similarity index 100%
rename from python/examples/als.py
rename to examples/src/main/python/als.py
diff --git a/python/examples/kmeans.py b/examples/src/main/python/kmeans.py
similarity index 98%
rename from python/examples/kmeans.py
rename to examples/src/main/python/kmeans.py
index d8387b0b183e6..e3596488faf9e 100755
--- a/python/examples/kmeans.py
+++ b/examples/src/main/python/kmeans.py
@@ -18,7 +18,7 @@
"""
The K-means algorithm written from scratch against PySpark. In practice,
one may prefer to use the KMeans algorithm in MLlib, as shown in
-python/examples/mllib/kmeans.py.
+examples/src/main/python/mllib/kmeans.py.
This example requires NumPy (http://www.numpy.org/).
"""
diff --git a/python/examples/logistic_regression.py b/examples/src/main/python/logistic_regression.py
similarity index 97%
rename from python/examples/logistic_regression.py
rename to examples/src/main/python/logistic_regression.py
index 28d52e6a40b45..fe5373cf799b1 100755
--- a/python/examples/logistic_regression.py
+++ b/examples/src/main/python/logistic_regression.py
@@ -20,7 +20,7 @@
to act on batches of input data using efficient matrix operations.
In practice, one may prefer to use the LogisticRegression algorithm in
-MLlib, as shown in python/examples/mllib/logistic_regression.py.
+MLlib, as shown in examples/src/main/python/mllib/logistic_regression.py.
"""
from collections import namedtuple
diff --git a/python/examples/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
similarity index 100%
rename from python/examples/mllib/kmeans.py
rename to examples/src/main/python/mllib/kmeans.py
diff --git a/python/examples/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
similarity index 100%
rename from python/examples/mllib/logistic_regression.py
rename to examples/src/main/python/mllib/logistic_regression.py
diff --git a/python/examples/pagerank.py b/examples/src/main/python/pagerank.py
similarity index 100%
rename from python/examples/pagerank.py
rename to examples/src/main/python/pagerank.py
diff --git a/python/examples/pi.py b/examples/src/main/python/pi.py
similarity index 100%
rename from python/examples/pi.py
rename to examples/src/main/python/pi.py
diff --git a/python/examples/sort.py b/examples/src/main/python/sort.py
similarity index 100%
rename from python/examples/sort.py
rename to examples/src/main/python/sort.py
diff --git a/python/examples/transitive_closure.py b/examples/src/main/python/transitive_closure.py
similarity index 100%
rename from python/examples/transitive_closure.py
rename to examples/src/main/python/transitive_closure.py
diff --git a/python/examples/wordcount.py b/examples/src/main/python/wordcount.py
similarity index 100%
rename from python/examples/wordcount.py
rename to examples/src/main/python/wordcount.py
diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
similarity index 98%
rename from examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
rename to examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index 8210ad977f066..ff9254b044c24 100644
--- a/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.examples
+package org.apache.spark.examples.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
similarity index 98%
rename from examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
rename to examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index 62329bde84481..66ce93a26ef42 100644
--- a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.hive.examples
+package org.apache.spark.examples.sql.hive
import org.apache.spark.SparkContext
import org.apache.spark.sql._
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
similarity index 95%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index c845dd8904c90..84cf43df0f96c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
@@ -78,7 +78,7 @@ class FeederActor extends Actor {
* goes and subscribe to a typical publisher/feeder actor and receives
* data.
*
- * @see [[org.apache.spark.streaming.examples.FeederActor]]
+ * @see [[org.apache.spark.examples.streaming.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends Actor with ActorHelper {
@@ -131,9 +131,9 @@ object FeederActor {
* and describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
- * `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
* and then run the example
- * `./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ * `./bin/run-example org.apache.spark.examples.streaming.ActorWordCount local[2] 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
similarity index 97%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
index eebffd824983f..e317e2d36ae43 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import java.io.{InputStreamReader, BufferedReader, InputStream}
import java.net.Socket
@@ -37,7 +37,7 @@ import org.apache.spark.streaming.receiver.Receiver
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999`
+ * `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999`
*/
object CustomReceiver {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
similarity index 98%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
index 26b6024534124..5b2a1035fc779 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
similarity index 95%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
index 7f86fc792aacf..b440956ba3137 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
@@ -27,7 +27,7 @@ import org.apache.spark.streaming.StreamingContext._
* is the directory that Spark Streaming will use to find and read new text files.
*
* To run this on your local machine on directory `localdir`, run this example
- * `$ ./bin/run-example org.apache.spark.streaming.examples.HdfsWordCount local[2] localdir`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.HdfsWordCount local[2] localdir`
* Then create a text file in `localdir` and the words in the file will get counted.
*/
object HdfsWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
similarity index 93%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index 2aa4f1474a59e..c3aae5af05b1c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import java.util.Properties
@@ -24,7 +24,6 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
-import org.apache.spark.streaming.util.RawTextHelper._
// scalastyle:off
/**
@@ -37,7 +36,7 @@ import org.apache.spark.streaming.util.RawTextHelper._
* is the number of threads the kafka consumer should use
*
* Example:
- * `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
+ * `./bin/run-example org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
*/
// scalastyle:on
object KafkaWordCount {
@@ -59,7 +58,7 @@ object KafkaWordCount {
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
- .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
+ .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
similarity index 95%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 62aef0fb47107..47bf1e5a06439 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -79,9 +79,9 @@ object MQTTPublisher {
* and describe where Mqtt publisher is running.
*
* To run this example locally, you may run publisher as
- * `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
* and run the example as
- * `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTWordCount local[2] tcp://localhost:1883 foo`
*/
// scalastyle:on
object MQTTWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
similarity index 95%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
index 272ab11212451..acfe9a4da3596 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
@@ -32,7 +32,7 @@ import org.apache.spark.storage.StorageLevel
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999`
*/
// scalastyle:on
object NetworkWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
similarity index 97%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
index ff2a205ec1c15..f92f72f2de876 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import scala.collection.mutable.SynchronizedQueue
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala
similarity index 92%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala
index d915c0c39b334..1b0319a046433 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
-import org.apache.spark.streaming.util.RawTextHelper
import org.apache.spark.util.IntParam
/**
@@ -52,9 +51,6 @@ object RawNetworkGrep {
val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- RawTextHelper.warmUp(ssc.sparkContext)
-
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = ssc.union(rawStreams)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
similarity index 96%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 4aacbb1991418..b0bc31cc66ab5 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
@@ -44,7 +44,7 @@ import java.nio.charset.Charset
*
* and run the example as
*
- * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \
+ * `$ ./run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
* local[2] localhost 9999 ~/checkpoint/ ~/out`
*
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
@@ -56,7 +56,7 @@ import java.nio.charset.Charset
*
* `$ ./spark-class org.apache.spark.deploy.Client -s launch \
* \
- * org.apache.spark.streaming.examples.RecoverableNetworkWordCount \
+ * org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
* localhost 9999 ~/checkpoint ~/out`
*
* would typically be
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
similarity index 96%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index ef94c9298dd93..8001d56c98d86 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
@@ -31,7 +31,7 @@ import org.apache.spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount local[2] localhost 9999`
*/
// scalastyle:on
object StatefulNetworkWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala
similarity index 97%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala
index 99f1502046f53..8396e65d0d588 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.Logging
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
similarity index 99%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
index c38905e8f3663..b12617d881787 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import com.twitter.algebird._
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
similarity index 98%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
index c067046f9020e..22f232c72545c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import com.twitter.algebird.HyperLogLogMonoid
import com.twitter.algebird.HyperLogLog._
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
similarity index 98%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
index 2597c8178862a..5b58e94600a16 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
similarity index 95%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 109ff855b5228..de46e5f5b10b6 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples
+package org.apache.spark.examples.streaming
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
@@ -68,9 +68,9 @@ object SimpleZeroMQPublisher {
* and describe where zeroMq publisher is running.
*
* To run this example locally, you may run publisher as
- * `$ ./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and run the example as
- * `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
// scalastyle:on
object ZeroMQWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
similarity index 95%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index 251f65fe4df9c..97e0cb92078dc 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples.clickstream
+package org.apache.spark.examples.streaming.clickstream
import java.net.ServerSocket
import java.io.PrintWriter
@@ -40,8 +40,8 @@ object PageView extends Serializable {
/** Generates streaming events to simulate page views on a website.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*
* When running this, you may want to set the root logging level to ERROR in
* conf/log4j.properties to reduce the verbosity of the output.
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
similarity index 94%
rename from examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
rename to examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
index 673013f7cf948..d30ceffbe29cb 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.examples.clickstream
+package org.apache.spark.examples.streaming.clickstream
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.examples.StreamingExamples
+import org.apache.spark.examples.streaming.StreamingExamples
// scalastyle:off
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
* operators available in Spark streaming.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
+ * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*/
// scalastyle:on
object PageViewStream {
diff --git a/make-distribution.sh b/make-distribution.sh
index 8a63133bc45d4..ebcd8c74fc5a6 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -39,18 +39,11 @@
# 5) ./bin/spark-shell --master spark://my-master-ip:7077
#
+set -o pipefail
# Figure out where the Spark framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
DISTDIR="$FWDIR/dist"
-set -o pipefail
-VERSION=$(mvn help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1)
-if [ $? != 0 ]; then
- echo -e "You need Maven installed to build Spark."
- echo -e "Download Maven from https://maven.apache.org/"
- exit -1;
-fi
-
if [ -z "$JAVA_HOME" ]; then
echo "Error: JAVA_HOME is not set, cannot proceed."
exit -1
@@ -59,10 +52,17 @@ fi
JAVA_CMD="$JAVA_HOME"/bin/java
JAVA_VERSION=$("$JAVA_CMD" -version 2>&1)
if ! [[ "$JAVA_VERSION" =~ "1.6" ]]; then
- echo "Error: JAVA_HOME must point to a JDK 6 installation (see SPARK-1703)."
+ echo "***NOTE***: JAVA_HOME is not set to a JDK 6 installation. The resulting"
+ echo " distribution will not support Java 6. See SPARK-1703."
echo "Output from 'java -version' was:"
echo "$JAVA_VERSION"
- exit -1
+fi
+
+VERSION=$(mvn help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1)
+if [ $? != 0 ]; then
+ echo -e "You need Maven installed to build Spark."
+ echo -e "Download Maven from https://maven.apache.org/"
+ exit -1;
fi
# Initialize defaults
diff --git a/mllib/data/sample_naive_bayes_data.txt b/mllib/data/sample_naive_bayes_data.txt
index f874adbaf4665..981da382d6ac8 100644
--- a/mllib/data/sample_naive_bayes_data.txt
+++ b/mllib/data/sample_naive_bayes_data.txt
@@ -1,6 +1,6 @@
-0, 1 0 0
-0, 2 0 0
-1, 0 1 0
-1, 0 2 0
-2, 0 0 1
-2, 0 0 2
+0,1 0 0
+0,2 0 0
+1,0 1 0
+1,0 2 0
+2,0 0 1
+2,0 0 2
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1ad05d9e46dd6..7f9746ec4acc0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -356,7 +356,8 @@ object SparkBuild extends Build {
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
"com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
- "org.spark-project" % "pyrolite" % "2.0.1"
+ "org.spark-project" % "pyrolite" % "2.0.1",
+ "net.sf.py4j" % "py4j" % "0.8.1"
),
libraryDependencies ++= maybeAvro
)
@@ -569,7 +570,6 @@ object SparkBuild extends Build {
)
def assemblyProjSettings = sharedSettings ++ Seq(
- libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1",
name := "spark-assembly",
assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn,
jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" },
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c74dc5fd4f854..c7dc85ea03544 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -158,6 +158,12 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
for path in (pyFiles or []):
self.addPyFile(path)
+ # Deploy code dependencies set by spark-submit; these will already have been added
+ # with SparkContext.addFile, so we just need to add them
+ for path in self._conf.get("spark.submit.pyFiles", "").split(","):
+ if path != "":
+ self._python_includes.append(os.path.basename(path))
+
# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 032d960e40998..3d0936fdca911 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -27,39 +27,43 @@
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]
- set_env_vars_for_yarn()
-
- # Launch the Py4j gateway using Spark's run command so that we pick up the
- # proper classpath and settings from spark-env.sh
- on_windows = platform.system() == "Windows"
- script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
- command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
- "--die-on-broken-pipe", "0"]
- if not on_windows:
- # Don't send ctrl-c / SIGINT to the Java gateway:
- def preexec_func():
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+ gateway_port = -1
+ if "PYSPARK_GATEWAY_PORT" in os.environ:
+ gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
- # preexec_fn not supported on Windows
- proc = Popen(command, stdout=PIPE, stdin=PIPE)
- # Determine which ephemeral port the server started on:
- port = int(proc.stdout.readline())
- # Create a thread to echo output from the GatewayServer, which is required
- # for Java log output to show up:
- class EchoOutputThread(Thread):
- def __init__(self, stream):
- Thread.__init__(self)
- self.daemon = True
- self.stream = stream
+ # Launch the Py4j gateway using Spark's run command so that we pick up the
+ # proper classpath and settings from spark-env.sh
+ on_windows = platform.system() == "Windows"
+ script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
+ command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
+ "--die-on-broken-pipe", "0"]
+ if not on_windows:
+ # Don't send ctrl-c / SIGINT to the Java gateway:
+ def preexec_func():
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+ else:
+ # preexec_fn not supported on Windows
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ # Determine which ephemeral port the server started on:
+ gateway_port = int(proc.stdout.readline())
+ # Create a thread to echo output from the GatewayServer, which is required
+ # for Java log output to show up:
+ class EchoOutputThread(Thread):
+ def __init__(self, stream):
+ Thread.__init__(self)
+ self.daemon = True
+ self.stream = stream
+
+ def run(self):
+ while True:
+ line = self.stream.readline()
+ sys.stderr.write(line)
+ EchoOutputThread(proc.stdout).start()
- def run(self):
- while True:
- line = self.stream.readline()
- sys.stderr.write(line)
- EchoOutputThread(proc.stdout).start()
# Connect to the gateway
- gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
+ gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)
+
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
@@ -70,28 +74,5 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
- return gateway
-def set_env_vars_for_yarn():
- # Add the spark jar, which includes the pyspark files, to the python path
- env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
- if "PYTHONPATH" in env_map:
- env_map["PYTHONPATH"] += ":spark.jar"
- else:
- env_map["PYTHONPATH"] = "spark.jar"
-
- os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
-
-def parse_env(env_str):
- # Turns a comma-separated of env settings into a dict that maps env vars to
- # their values.
- env = {}
- for var_str in env_str.split(","):
- parts = var_str.split("=")
- if len(parts) == 2:
- env[parts[0]] = parts[1]
- elif len(var_str) > 0:
- print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
- sys.exit(1)
-
- return env
+ return gateway
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8cf9d9cf1bd66..64f2eeb12b4fc 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -22,11 +22,14 @@
from fileinput import input
from glob import glob
import os
+import re
import shutil
+import subprocess
import sys
-from tempfile import NamedTemporaryFile
+import tempfile
import time
import unittest
+import zipfile
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
@@ -55,7 +58,7 @@ class TestCheckpoint(PySparkTestCase):
def setUp(self):
PySparkTestCase.setUp(self)
- self.checkpointDir = NamedTemporaryFile(delete=False)
+ self.checkpointDir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(self.checkpointDir.name)
self.sc.setCheckpointDir(self.checkpointDir.name)
@@ -148,7 +151,7 @@ def test_save_as_textfile_with_unicode(self):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"
data = self.sc.parallelize([x])
- tempFile = NamedTemporaryFile(delete=True)
+ tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsTextFile(tempFile.name)
raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
@@ -172,7 +175,7 @@ def test_cartesian_on_textfile(self):
def test_deleting_input_files(self):
# Regression test for SPARK-1025
- tempFile = NamedTemporaryFile(delete=False)
+ tempFile = tempfile.NamedTemporaryFile(delete=False)
tempFile.write("Hello World!")
tempFile.close()
data = self.sc.textFile(tempFile.name)
@@ -236,5 +239,125 @@ def test_termination_sigterm(self):
from signal import SIGTERM
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
+
+class TestSparkSubmit(unittest.TestCase):
+ def setUp(self):
+ self.programDir = tempfile.mkdtemp()
+ self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
+
+ def tearDown(self):
+ shutil.rmtree(self.programDir)
+
+ def createTempFile(self, name, content):
+ """
+ Create a temp file with the given name and content and return its path.
+ Strips leading spaces from content up to the first '|' in each line.
+ """
+ pattern = re.compile(r'^ *\|', re.MULTILINE)
+ content = re.sub(pattern, '', content.strip())
+ path = os.path.join(self.programDir, name)
+ with open(path, "w") as f:
+ f.write(content)
+ return path
+
+ def createFileInZip(self, name, content):
+ """
+ Create a zip archive containing a file with the given content and return its path.
+ Strips leading spaces from content up to the first '|' in each line.
+ """
+ pattern = re.compile(r'^ *\|', re.MULTILINE)
+ content = re.sub(pattern, '', content.strip())
+ path = os.path.join(self.programDir, name + ".zip")
+ with zipfile.ZipFile(path, 'w') as zip:
+ zip.writestr(name, content)
+ return path
+
+ def test_single_script(self):
+ """Submit and test a single script file"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 4, 6]", out)
+
+ def test_script_with_local_functions(self):
+ """Submit and test a single script file calling a global function"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |def foo(x):
+ | return x * 3
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[3, 6, 9]", out)
+
+ def test_module_dependency(self):
+ """Submit and test a script with a dependency on another module"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ zip = self.createFileInZip("mylib.py", """
+ |def myfunc(x):
+ | return x + 1
+ """)
+ proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_module_dependency_on_cluster(self):
+ """Submit and test a script with a dependency on another module on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |from mylib import myfunc
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ """)
+ zip = self.createFileInZip("mylib.py", """
+ |def myfunc(x):
+ | return x + 1
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 3, 4]", out)
+
+ def test_single_script_on_cluster(self):
+ """Submit and test a single script on a cluster"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkContext
+ |
+ |def foo(x):
+ | return x * 2
+ |
+ |sc = SparkContext()
+ |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--master", "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("[2, 4, 6]", out)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index e33f4f9803054..566d96e16ed83 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -45,8 +45,7 @@ class ReplSuite extends FunSuite {
}
val interp = new SparkILoop(in, new PrintWriter(out), master)
org.apache.spark.repl.Main.interp = interp
- val separator = System.getProperty("path.separator")
- interp.process(Array("-classpath", paths.mkString(separator)))
+ interp.process(Array("-classpath", paths.mkString(File.pathSeparator)))
org.apache.spark.repl.Main.interp = null
if (interp.sparkContext != null) {
interp.sparkContext.stop()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index bbf57ef9275c0..a73d6f3bf0661 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
- /**
+ /**
* Splits lines and counts the words.
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
@@ -114,4 +114,3 @@ object RawTextHelper {
def max(v1: Long, v2: Long) = math.max(v1, v2)
}
-