From 15f0eb418e1027853a3706e3f44fd268ac37a3e8 Mon Sep 17 00:00:00 2001 From: Michael Giannakopoulos Date: Sun, 6 Jul 2014 11:11:13 -0400 Subject: [PATCH 1/6] Java examples included in 'mllib-linear-methods.md' file. --- docs/mllib-linear-methods.md | 170 +++++++++++++++++++++++++++++++++-- 1 file changed, 165 insertions(+), 5 deletions(-) diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index 4dfbebbcd04b7..cf95dcd9abc37 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -151,10 +151,10 @@ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)). Logistic regression algorithm outputs a logistic regression model, which makes predictions by applying the logistic function `\[ -\mathrm{logit}(z) = \frac{1}{1 + e^{-z}} +\mathrm{f}(z) = \frac{1}{1 + e^{-z}} \]` -$\wv^T \x$. -By default, if $\mathrm{logit}(\wv^T x) > 0.5$, the outcome is positive, or negative otherwise. +where $z = \wv^T \x$. +By default, if $\mathrm{f}(\wv^T x) > 0.5$, the outcome is positive, or negative otherwise. For the same reason mentioned above, quite often in practice, this default threshold is not a good choice. The threshold should be determined via model evaluation. @@ -242,7 +242,98 @@ Similarly, you can use replace `SVMWithSGD` by All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. A standalone application example +that is equivalent to the provided example in Scala is given bellow: + +{% highlight java %} +import org.apache.spark.api.java.*; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.mllib.classification.*; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; + +import java.util.Random; + +import scala.Product2; +import scala.Tuple2; + +/** + * Simple SVM example written in Java using Spark. + */ +public class SVMClassifier { + public static void main( String[] args ) { + SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); + SparkContext sc = new SparkContext(conf); + String path = "{SPARK_HOME}/mllib/data/sample_libsvm_data.txt"; + JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); + //Split initial RDD into two... [60% training data, 40% testing data]. + JavaRDD training = data.filter( + new Function() { + public final Random random = new Random(); + public Boolean call(LabeledPoint p) { + if (random.nextDouble() <= 0.6) + return true; + else + return false; + } + } + ); + training.cache(); + JavaRDD test = data.subtract(training); + + // Run training algorithm to build the model. + int numIterations = 100; + final SVMModel model = SVMWithSGD.train(JavaRDD.toRDD(training), numIterations); + + // Clear the default threshold. + model.clearThreshold(); + + // Compute raw scores on the test set. + JavaRDD> scoreAndLabels = test.map( + new Function>() { + public final SVMModel m = model; + public Tuple2 call(LabeledPoint p) { + Double score = m.predict((Vector) p.productElement(1)); + return new Tuple2(score, (Double) p.productElement(0)); + } + } + ); + + // Get evaluation metrics. + BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); + double auROC = metrics.areaUnderROC(); + + System.out.println("Area under ROC = " + auROC); +} +{% endhighlight %} + +The `SVMWithSGD.train()` method by default performs L2 regularization with the +regularization parameter set to 1.0. If we want to configure this algorithm, we +can customize `SVMWithSGD` further by creating a new object directly and +calling setter methods. All other MLlib algorithms support customization in +this way as well. For example, the following code produces an L1 regularized +variant of SVMs with regularization parameter set to 0.1, and runs the training +algorithm for 200 iterations. + +{% highlight java %} +import org.apache.spark.mllib.optimization.L1Updater; + +SVMWithSGD svmAlg = new SVMWithSGD(); +svmAlg.optimizer().setNumIterations(200); +svmAlg.optimizer().setRegParam(0.1); +svmAlg.optimizer().setUpdater(new L1Updater()); +final SVMModel modelL1 = svmAlg.run(JavaRDD.toRDD(training)); +{% endhighlight %} + +In order to run the following standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your **pom.xml** file both *spark-core_2.10* and +*spark-mllib_2.10* as dependencies.
@@ -338,7 +429,76 @@ and [`LassoWithSGD`](api/scala/index.html#org.apache.spark.mllib.regression.Lass All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. The corresponding Java example to +the Scala snippet provided, is presented bellow: + +{% highlight java %} +import org.apache.spark.api.java.*; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.regression.LinearRegressionWithSGD; +import org.apache.spark.mllib.regression.LinearRegressionModel; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.linalg.Vector; + +import scala.Product2; + +public class LinearRegression { + public static void main( String[] args ) { + SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse the data + String path = "{SPARK_HOME}/mllib/data/ridge-data/lpsa.data"; + JavaRDD data = sc.textFile(path); + JavaRDD parsedData = data.map( + new Function() { + public LabeledPoint call(String line) { + String[] parts = line.split(","); + String[] features = parts[1].split(" "); + double[] v = new double[features.length]; + for (int i = 0; i < features.length - 1; i++) + v[i] = Double.parseDouble(features[i]); + return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); + } + } + ); + + // Building the model + int numIterations = 100; + final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); + + // Evaluate model on training examples and compute training error + JavaRDD valuesAndPreds = parsedData.map( + new Function() { + public final LinearRegressionModel m = model; + public double[] call(LabeledPoint point) { + double prediction = m.predict((Vector) point.productElement(1)); + return new double[] {(Double) point.productElement(0), prediction}; + } + } + ); + JavaRDD MSERdd = valuesAndPreds.map( + new Function() { + public Object call(double[] lp) { + return Math.pow(lp[0] - lp[1], 2.0); + } + } + ); + JavaDoubleRDD MSEDoubleRdd = new JavaDoubleRDD(JavaRDD.toRDD(MSERdd)); + double MSE = MSEDoubleRdd.mean(); + System.out.println("training Mean Squared Error = " + MSE); + } +} +{% endhighlight %} + +In order to run the following standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your **pom.xml** file both *spark-core_2.10* and +*spark-mllib_2.10* as dependencies.
From b1141b2e5be4f572f8407839aa494a7ccb2d47f0 Mon Sep 17 00:00:00 2001 From: Michael Giannakopoulos Date: Tue, 8 Jul 2014 20:22:25 -0400 Subject: [PATCH 2/6] Added Java examples for Clustering and Collaborative Filtering [mllib-clustering.md & mllib-collaborative-filtering.md]. --- docs/mllib-clustering.md | 50 ++++++++++++++++- docs/mllib-collaborative-filtering.md | 78 ++++++++++++++++++++++++++- 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 429cdf8d40cec..727c49dbd8281 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -69,7 +69,55 @@ println("Within Set Sum of Squared Errors = " + WSSSE) All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. A standalone application example +that is equivalent to the provided example in Scala is given bellow: + +{% highlight java %} +import org.apache.spark.api.java.*; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.clustering.KMeans; +import org.apache.spark.mllib.clustering.KMeansModel; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.linalg.Vector; + +public class Classifier { + public static void main( String[] args ) { + SparkConf conf = new SparkConf().setAppName("K-means Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse data + String path = "{SPARK_HOME}/data/kmeans_data.txt"; + JavaRDD data = sc.textFile(path); + JavaRDD parsedData = data.map( + new Function() { + public Vector call( String s ) { + String[] sarray = s.split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) + values[i] = Double.parseDouble(sarray[i]); + return Vectors.dense(values); + } + } + ); + + // Cluster the data into two classes using KMeans + int numClusters = 2; + int numIterations = 20; + KMeansModel clusters = KMeans.train(JavaRDD.toRDD(parsedData), numClusters, numIterations); + + // Evaluate clustering by computing Within Set Sum of Squared Errors + double WSSSE = clusters.computeCost(JavaRDD.toRDD(parsedData)); + System.out.println("Within Set Sum of Squared Errors = " + WSSSE); + } +} +{% endhighlight %} + +In order to run the following standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your **pom.xml** file both *spark-core_2.10* and +*spark-mllib_2.10* as dependencies.
diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index d51002f015670..6de2e6ddc4276 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -99,7 +99,83 @@ val model = ALS.trainImplicit(ratings, rank, numIterations, alpha) All of MLlib's methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by -calling `.rdd()` on your `JavaRDD` object. +calling `.rdd()` on your `JavaRDD` object. A standalone application example +that is equivalent to the provided example in Scala is given bellow: + +{% highlight java %} +import org.apache.spark.api.java.*; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.recommendation.ALS; +import org.apache.spark.mllib.recommendation.Rating; +import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; + +import scala.Tuple2; +import scala.Product; + +public class CollaborativeFiltering { + public static void main( String[] args ) { + SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse the data + String path = "{SPARK_HOME}/mllib/data/als/test.data"; + JavaRDD data = sc.textFile(path); + JavaRDD ratings = data.map( + new Function() { + public Rating call( String s ) { + String[] sarray = s.split(","); + return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2])); + } + } + ); + + // Build the recommendation model using ALS + int rank = 10; + int numIterations = 20; + MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); + + // Evaluate the model on rating data + JavaRDD> userProducts = ratings.map( + new Function>() { + public Tuple2 call( Rating r ) { + return new Tuple2(r.productElement(0), r.productElement(1)); + } + } + ); + JavaPairRDD, Double> predictions = JavaPairRDD.fromJavaRDD(model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( + new Function, Double>>() { + public Tuple2, Double> call( Rating r ){ + return new Tuple2, Double>(new Tuple2((Integer) r.productElement(0), (Integer) r.productElement(1)), (Double) r.productElement(2)); + } + } + )); + JavaPairRDD, Tuple2> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map( + new Function, Double>>() { + public Tuple2, Double> call( Rating r ){ + return new Tuple2, Double>(new Tuple2((Integer) r.productElement(0), (Integer) r.productElement(1)), (Double) r.productElement(2)); + } + } + )).join(predictions); + double MSE = JavaDoubleRDD.fromRDD(JavaRDD.toRDD(ratesAndPreds.map( + new Function, Tuple2>, Object>() { + public Object call( Tuple2, Tuple2> key_value ) { + Tuple2, Tuple2> val = (Tuple2, Tuple2>) key_value.productElement(1); + Double err = ((Double) val.productElement(0)) - ((Double) val.productElement(1)); + return err * err; + } + } + ))).mean(); + System.out.println("Mean Squared Error = " + MSE); + } +} +{% endhighlight %} + +In order to run the following standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your **pom.xml** file both *spark-core_2.10* and +*spark-mllib_2.10* as dependencies.
From cc0a0896d8f43ef120a0a5748d715cf48ec85a48 Mon Sep 17 00:00:00 2001 From: Michael Giannakopoulos Date: Thu, 10 Jul 2014 20:47:26 -0400 Subject: [PATCH 3/6] Modyfied Java examples so as to comply with coding standards. --- docs/mllib-clustering.md | 61 ++++---- docs/mllib-collaborative-filtering.md | 125 ++++++++-------- docs/mllib-linear-methods.md | 202 +++++++++++++------------- 3 files changed, 196 insertions(+), 192 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 727c49dbd8281..44df3f3ab543a 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -81,43 +81,42 @@ import org.apache.spark.mllib.clustering.KMeansModel; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.linalg.Vector; -public class Classifier { - public static void main( String[] args ) { - SparkConf conf = new SparkConf().setAppName("K-means Example"); - JavaSparkContext sc = new JavaSparkContext(conf); - - // Load and parse data - String path = "{SPARK_HOME}/data/kmeans_data.txt"; - JavaRDD data = sc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public Vector call( String s ) { - String[] sarray = s.split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) - values[i] = Double.parseDouble(sarray[i]); - return Vectors.dense(values); - } - } - ); - - // Cluster the data into two classes using KMeans - int numClusters = 2; - int numIterations = 20; - KMeansModel clusters = KMeans.train(JavaRDD.toRDD(parsedData), numClusters, numIterations); - - // Evaluate clustering by computing Within Set Sum of Squared Errors - double WSSSE = clusters.computeCost(JavaRDD.toRDD(parsedData)); - System.out.println("Within Set Sum of Squared Errors = " + WSSSE); - } +public class KMeansExample { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("K-means Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse data + String path = "{SPARK_HOME}/data/kmeans_data.txt"; + JavaRDD data = sc.textFile(path); + JavaRDD parsedData = data.map( + new Function() { + public Vector call(String s) { + String[] sarray = s.split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) + values[i] = Double.parseDouble(sarray[i]); + return Vectors.dense(values); + } + } + ); + + // Cluster the data into two classes using KMeans + int numClusters = 2; + int numIterations = 20; + KMeansModel clusters = KMeans.train(JavaRDD.toRDD(parsedData), numClusters, numIterations); + + // Evaluate clustering by computing Within Set Sum of Squared Errors + double WSSSE = clusters.computeCost(JavaRDD.toRDD(parsedData)); + System.out.println("Within Set Sum of Squared Errors = " + WSSSE); + } } {% endhighlight %} In order to run the following standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you -should include to your **pom.xml** file both *spark-core_2.10* and -*spark-mllib_2.10* as dependencies. +should include to your build file *spark-mllib* as a dependency.
diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index 6de2e6ddc4276..378b547194c30 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -103,6 +103,9 @@ calling `.rdd()` on your `JavaRDD` object. A standalone application example that is equivalent to the provided example in Scala is given bellow: {% highlight java %} +import scala.Tuple2; +import scala.Product; + import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; @@ -110,72 +113,76 @@ import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.Rating; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; -import scala.Tuple2; -import scala.Product; - public class CollaborativeFiltering { - public static void main( String[] args ) { - SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example"); - JavaSparkContext sc = new JavaSparkContext(conf); - - // Load and parse the data - String path = "{SPARK_HOME}/mllib/data/als/test.data"; - JavaRDD data = sc.textFile(path); - JavaRDD ratings = data.map( - new Function() { - public Rating call( String s ) { - String[] sarray = s.split(","); - return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2])); - } - } - ); - - // Build the recommendation model using ALS - int rank = 10; - int numIterations = 20; - MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); - - // Evaluate the model on rating data - JavaRDD> userProducts = ratings.map( - new Function>() { - public Tuple2 call( Rating r ) { - return new Tuple2(r.productElement(0), r.productElement(1)); - } - } - ); - JavaPairRDD, Double> predictions = JavaPairRDD.fromJavaRDD(model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( - new Function, Double>>() { - public Tuple2, Double> call( Rating r ){ - return new Tuple2, Double>(new Tuple2((Integer) r.productElement(0), (Integer) r.productElement(1)), (Double) r.productElement(2)); - } - } - )); - JavaPairRDD, Tuple2> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map( - new Function, Double>>() { - public Tuple2, Double> call( Rating r ){ - return new Tuple2, Double>(new Tuple2((Integer) r.productElement(0), (Integer) r.productElement(1)), (Double) r.productElement(2)); - } - } - )).join(predictions); - double MSE = JavaDoubleRDD.fromRDD(JavaRDD.toRDD(ratesAndPreds.map( - new Function, Tuple2>, Object>() { - public Object call( Tuple2, Tuple2> key_value ) { - Tuple2, Tuple2> val = (Tuple2, Tuple2>) key_value.productElement(1); - Double err = ((Double) val.productElement(0)) - ((Double) val.productElement(1)); - return err * err; - } - } - ))).mean(); - System.out.println("Mean Squared Error = " + MSE); - } + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse the data + String path = "{SPARK_HOME}/mllib/data/als/test.data"; + JavaRDD data = sc.textFile(path); + JavaRDD ratings = data.map( + new Function() { + public Rating call(String s) { + String[] sarray = s.split(","); + return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), + Double.parseDouble(sarray[2])); + } + } + ); + + // Build the recommendation model using ALS + int rank = 10; + int numIterations = 20; + MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); + + // Evaluate the model on rating data + JavaRDD> userProducts = ratings.map( + new Function>() { + public Tuple2 call(Rating r) { + return new Tuple2(r.productElement(0), r.productElement(1)); + } + } + ); + JavaPairRDD, Double> predictions = JavaPairRDD.fromJavaRDD( + model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( + new Function, Double>>() { + public Tuple2, Double> call(Rating r){ + return new Tuple2, Double>( + new Tuple2((Integer) r.productElement(0), + (Integer) r.productElement(1)), (Double) r.productElement(2)); + } + } + )); + JavaPairRDD, Tuple2> ratesAndPreds = + JavaPairRDD.fromJavaRDD(ratings.map( + new Function, Double>>() { + public Tuple2, Double> call(Rating r){ + return new Tuple2, Double>( + new Tuple2((Integer) r.productElement(0), + (Integer) r.productElement(1)), (Double) r.productElement(2)); + } + } + )).join(predictions); + double MSE = JavaDoubleRDD.fromRDD(JavaRDD.toRDD(ratesAndPreds.map( + new Function, Tuple2>, Object>() { + public Object call(Tuple2, Tuple2> key_value) { + Tuple2, Tuple2> val = + (Tuple2, Tuple2>) key_value.productElement(1); + Double err = ((Double) val.productElement(0)) - ((Double) val.productElement(1)); + return err * err; + } + } + ))).mean(); + System.out.println("Mean Squared Error = " + MSE); + } } {% endhighlight %} In order to run the following standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you -should include to your **pom.xml** file both *spark-core_2.10* and -*spark-mllib_2.10* as dependencies. +should include to your build file *spark-mllib* as a dependency.
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index cf95dcd9abc37..e4317d3ac5f46 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -246,6 +246,11 @@ calling `.rdd()` on your `JavaRDD` object. A standalone application example that is equivalent to the provided example in Scala is given bellow: {% highlight java %} +import java.util.Random; + +import scala.Product2; +import scala.Tuple2; + import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; @@ -256,58 +261,52 @@ import org.apache.spark.mllib.classification.*; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; -import java.util.Random; - -import scala.Product2; -import scala.Tuple2; - -/** - * Simple SVM example written in Java using Spark. - */ public class SVMClassifier { - public static void main( String[] args ) { - SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); - SparkContext sc = new SparkContext(conf); - String path = "{SPARK_HOME}/mllib/data/sample_libsvm_data.txt"; - JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); - //Split initial RDD into two... [60% training data, 40% testing data]. - JavaRDD training = data.filter( - new Function() { - public final Random random = new Random(); - public Boolean call(LabeledPoint p) { - if (random.nextDouble() <= 0.6) - return true; - else - return false; - } - } - ); - training.cache(); - JavaRDD test = data.subtract(training); - - // Run training algorithm to build the model. - int numIterations = 100; - final SVMModel model = SVMWithSGD.train(JavaRDD.toRDD(training), numIterations); - - // Clear the default threshold. - model.clearThreshold(); - - // Compute raw scores on the test set. - JavaRDD> scoreAndLabels = test.map( - new Function>() { - public final SVMModel m = model; - public Tuple2 call(LabeledPoint p) { - Double score = m.predict((Vector) p.productElement(1)); - return new Tuple2(score, (Double) p.productElement(0)); - } - } - ); - - // Get evaluation metrics. - BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); - double auROC = metrics.areaUnderROC(); - - System.out.println("Area under ROC = " + auROC); + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); + SparkContext sc = new SparkContext(conf); + String path = "{SPARK_HOME}/mllib/data/sample_libsvm_data.txt"; + JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); + //Split initial RDD into two... [60% training data, 40% testing data]. + JavaRDD training = data.filter( + new Function() { + public final Random random = new Random(); + public Boolean call(LabeledPoint p) { + if (random.nextDouble() <= 0.6) + return true; + else + return false; + } + } + ); + training.cache(); + JavaRDD test = data.subtract(training); + + // Run training algorithm to build the model. + int numIterations = 100; + final SVMModel model = SVMWithSGD.train(JavaRDD.toRDD(training), numIterations); + + // Clear the default threshold. + model.clearThreshold(); + + // Compute raw scores on the test set. + JavaRDD> scoreAndLabels = test.map( + new Function>() { + public final SVMModel m = model; + public Tuple2 call(LabeledPoint p) { + Double score = m.predict((Vector) p.productElement(1)); + return new Tuple2(score, (Double) p.productElement(0)); + } + } + ); + + // Get evaluation metrics. + BinaryClassificationMetrics metrics = + new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); + double auROC = metrics.areaUnderROC(); + + System.out.println("Area under ROC = " + auROC); + } } {% endhighlight %} @@ -332,8 +331,7 @@ final SVMModel modelL1 = svmAlg.run(JavaRDD.toRDD(training)); In order to run the following standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you -should include to your **pom.xml** file both *spark-core_2.10* and -*spark-mllib_2.10* as dependencies. +should include to your build file *spark-mllib* as a dependency.
@@ -433,6 +431,8 @@ calling `.rdd()` on your `JavaRDD` object. The corresponding Java example to the Scala snippet provided, is presented bellow: {% highlight java %} +import scala.Product2; + import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; @@ -443,62 +443,60 @@ import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.linalg.Vector; -import scala.Product2; - public class LinearRegression { - public static void main( String[] args ) { - SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); - JavaSparkContext sc = new JavaSparkContext(conf); - - // Load and parse the data - String path = "{SPARK_HOME}/mllib/data/ridge-data/lpsa.data"; - JavaRDD data = sc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public LabeledPoint call(String line) { - String[] parts = line.split(","); - String[] features = parts[1].split(" "); - double[] v = new double[features.length]; - for (int i = 0; i < features.length - 1; i++) - v[i] = Double.parseDouble(features[i]); - return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); - } - } - ); - - // Building the model - int numIterations = 100; - final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); - - // Evaluate model on training examples and compute training error - JavaRDD valuesAndPreds = parsedData.map( - new Function() { - public final LinearRegressionModel m = model; - public double[] call(LabeledPoint point) { - double prediction = m.predict((Vector) point.productElement(1)); - return new double[] {(Double) point.productElement(0), prediction}; - } - } - ); - JavaRDD MSERdd = valuesAndPreds.map( - new Function() { - public Object call(double[] lp) { - return Math.pow(lp[0] - lp[1], 2.0); - } - } - ); - JavaDoubleRDD MSEDoubleRdd = new JavaDoubleRDD(JavaRDD.toRDD(MSERdd)); - double MSE = MSEDoubleRdd.mean(); - System.out.println("training Mean Squared Error = " + MSE); - } + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); + JavaSparkContext sc = new JavaSparkContext(conf); + + // Load and parse the data + String path = "{SPARK_HOME}/mllib/data/ridge-data/lpsa.data"; + JavaRDD data = sc.textFile(path); + JavaRDD parsedData = data.map( + new Function() { + public LabeledPoint call(String line) { + String[] parts = line.split(","); + String[] features = parts[1].split(" "); + double[] v = new double[features.length]; + for (int i = 0; i < features.length - 1; i++) + v[i] = Double.parseDouble(features[i]); + return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); + } + } + ); + + // Building the model + int numIterations = 100; + final LinearRegressionModel model = + LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); + + // Evaluate model on training examples and compute training error + JavaRDD valuesAndPreds = parsedData.map( + new Function() { + public final LinearRegressionModel m = model; + public double[] call(LabeledPoint point) { + double prediction = m.predict((Vector) point.productElement(1)); + return new double[] {prediction, (Double) point.productElement(0)}; + } + } + ); + JavaRDD MSERdd = valuesAndPreds.map( + new Function() { + public Object call(double[] lp) { + return Math.pow(lp[0] - lp[1], 2.0); + } + } + ); + JavaDoubleRDD MSEDoubleRdd = new JavaDoubleRDD(JavaRDD.toRDD(MSERdd)); + double MSE = MSEDoubleRdd.mean(); + System.out.println("training Mean Squared Error = " + MSE); + } } {% endhighlight %} In order to run the following standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you -should include to your **pom.xml** file both *spark-core_2.10* and -*spark-mllib_2.10* as dependencies. +should include to your build file *spark-mllib* as a dependency.
From 38d92c7da34824000eb3d1465835a2985e92082c Mon Sep 17 00:00:00 2001 From: Michael Giannakopoulos Date: Sun, 13 Jul 2014 20:06:32 -0400 Subject: [PATCH 4/6] Adding PCA, SVD and LBFGS examples in Java. Performing minor updates in the already committed examples so as to eradicate the call of 'productElement' function whenever is possible. --- docs/mllib-clustering.md | 2 +- docs/mllib-collaborative-filtering.md | 14 ++-- docs/mllib-dimensionality-reduction.md | 94 +++++++++++++++++++++ docs/mllib-linear-methods.md | 20 ++--- docs/mllib-optimization.md | 109 ++++++++++++++++++++++++- 5 files changed, 218 insertions(+), 21 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 44df3f3ab543a..8078179f13cf8 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -113,7 +113,7 @@ public class KMeansExample { } {% endhighlight %} -In order to run the following standalone application using Spark framework make +In order to run the above standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you should include to your build file *spark-mllib* as a dependency. diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index 378b547194c30..46ae16ddcef89 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -103,8 +103,8 @@ calling `.rdd()` on your `JavaRDD` object. A standalone application example that is equivalent to the provided example in Scala is given bellow: {% highlight java %} -import scala.Tuple2; import scala.Product; +import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; @@ -119,7 +119,7 @@ public class CollaborativeFiltering { JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse the data - String path = "{SPARK_HOME}/mllib/data/als/test.data"; + String path = "/home/michael/workspace/spark/mllib/data/als/test.data"; JavaRDD data = sc.textFile(path); JavaRDD ratings = data.map( new Function() { @@ -140,7 +140,7 @@ public class CollaborativeFiltering { JavaRDD> userProducts = ratings.map( new Function>() { public Tuple2 call(Rating r) { - return new Tuple2(r.productElement(0), r.productElement(1)); + return new Tuple2(r.user(), r.product()); } } ); @@ -149,8 +149,7 @@ public class CollaborativeFiltering { new Function, Double>>() { public Tuple2, Double> call(Rating r){ return new Tuple2, Double>( - new Tuple2((Integer) r.productElement(0), - (Integer) r.productElement(1)), (Double) r.productElement(2)); + new Tuple2(r.user(), r.product()), r.rating()); } } )); @@ -159,8 +158,7 @@ public class CollaborativeFiltering { new Function, Double>>() { public Tuple2, Double> call(Rating r){ return new Tuple2, Double>( - new Tuple2((Integer) r.productElement(0), - (Integer) r.productElement(1)), (Double) r.productElement(2)); + new Tuple2(r.user(), r.product()), r.rating()); } } )).join(predictions); @@ -179,7 +177,7 @@ public class CollaborativeFiltering { } {% endhighlight %} -In order to run the following standalone application using Spark framework make +In order to run the above standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you should include to your build file *spark-mllib* as a dependency. diff --git a/docs/mllib-dimensionality-reduction.md b/docs/mllib-dimensionality-reduction.md index e3608075fbb13..a86a75127346c 100644 --- a/docs/mllib-dimensionality-reduction.md +++ b/docs/mllib-dimensionality-reduction.md @@ -57,10 +57,57 @@ val U: RowMatrix = svd.U // The U factor is a RowMatrix. val s: Vector = svd.s // The singular values are stored in a local dense vector. val V: Matrix = svd.V // The V factor is a local dense matrix. {% endhighlight %} + +Same code applies to `IndexedRowMatrix`. +The only difference that the `U` matrix becomes an `IndexedRowMatrix`.
+
+In order to run the following standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. + +{% highlight java %} +import java.util.LinkedList; + +import org.apache.spark.api.java.*; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.linalg.Matrix; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.apache.spark.mllib.linalg.SingularValueDecomposition; + +public class SVD { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("SVD Example"); + SparkContext sc = new SparkContext(conf); + + double[][] array = ... + LinkedList rowsList = new LinkedList(); + for (int i = 0; i < array.length; i++) { + Vector currentRow = Vectors.dense(array[i]); + rowsList.add(currentRow); + } + JavaRDD rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList); + + // Create a RowMatrix from JavaRDD. + RowMatrix mat = new RowMatrix(JavaRDD.toRDD(rows)); + + // Compute the top 4 singular values and corresponding singular vectors. + SingularValueDecomposition svd = mat.computeSVD(4, true, 1.0E-9d); + RowMatrix U = svd.U(); + Vector s = svd.s(); + Matrix V = svd.V(); + } +} +{% endhighlight %} Same code applies to `IndexedRowMatrix`. The only difference that the `U` matrix becomes an `IndexedRowMatrix`.
+ ## Principal component analysis (PCA) @@ -91,4 +138,51 @@ val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are val projected: RowMatrix = mat.multiply(pc) {% endhighlight %} + +
+ +The following code demonstrates how to compute principal components on a tall-and-skinny `RowMatrix` +and use them to project the vectors into a low-dimensional space. +The number of columns should be small, e.g, less than 1000. + +{% highlight java %} +import java.util.LinkedList; + +import org.apache.spark.api.java.*; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.linalg.Matrix; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; + +public class PCA { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("PCA Example"); + SparkContext sc = new SparkContext(conf); + + double[][] array = ... + LinkedList rowsList = new LinkedList(); + for (int i = 0; i < array.length; i++) { + Vector currentRow = Vectors.dense(array[i]); + rowsList.add(currentRow); + } + JavaRDD rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList); + + // Create a RowMatrix from JavaRDD. + RowMatrix mat = new RowMatrix(JavaRDD.toRDD(rows)); + + // Compute the top 3 principal components. + Matrix pc = mat.computePrincipalComponents(3); + RowMatrix projected = mat.multiply(pc); + } +} +{% endhighlight %} + +In order to run the above standalone application using Spark framework make +sure that you follow the instructions provided at section [Standalone +Applications](quick-start.html) of the quick-start guide. What is more, you +should include to your build file *spark-mllib* as a dependency. +
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index e4317d3ac5f46..8226b8d3a2ef2 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -248,7 +248,6 @@ that is equivalent to the provided example in Scala is given bellow: {% highlight java %} import java.util.Random; -import scala.Product2; import scala.Tuple2; import org.apache.spark.api.java.*; @@ -267,10 +266,11 @@ public class SVMClassifier { SparkContext sc = new SparkContext(conf); String path = "{SPARK_HOME}/mllib/data/sample_libsvm_data.txt"; JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); - //Split initial RDD into two... [60% training data, 40% testing data]. + + // Split initial RDD into two... [60% training data, 40% testing data]. JavaRDD training = data.filter( new Function() { - public final Random random = new Random(); + public final Random random = new Random(11L); public Boolean call(LabeledPoint p) { if (random.nextDouble() <= 0.6) return true; @@ -294,8 +294,8 @@ public class SVMClassifier { new Function>() { public final SVMModel m = model; public Tuple2 call(LabeledPoint p) { - Double score = m.predict((Vector) p.productElement(1)); - return new Tuple2(score, (Double) p.productElement(0)); + Double score = m.predict(p.features()); + return new Tuple2(score, p.label()); } } ); @@ -328,7 +328,7 @@ svmAlg.optimizer().setUpdater(new L1Updater()); final SVMModel modelL1 = svmAlg.run(JavaRDD.toRDD(training)); {% endhighlight %} -In order to run the following standalone application using Spark framework make +In order to run the above standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you should include to your build file *spark-mllib* as a dependency. @@ -431,8 +431,6 @@ calling `.rdd()` on your `JavaRDD` object. The corresponding Java example to the Scala snippet provided, is presented bellow: {% highlight java %} -import scala.Product2; - import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; @@ -474,8 +472,8 @@ public class LinearRegression { new Function() { public final LinearRegressionModel m = model; public double[] call(LabeledPoint point) { - double prediction = m.predict((Vector) point.productElement(1)); - return new double[] {prediction, (Double) point.productElement(0)}; + double prediction = m.predict(point.features()); + return new double[] {prediction, point.label()}; } } ); @@ -493,7 +491,7 @@ public class LinearRegression { } {% endhighlight %} -In order to run the following standalone application using Spark framework make +In order to run the above standalone application using Spark framework make sure that you follow the instructions provided at section [Standalone Applications](quick-start.html) of the quick-start guide. What is more, you should include to your build file *spark-mllib* as a dependency. diff --git a/docs/mllib-optimization.md b/docs/mllib-optimization.md index ae9ede58e8e60..68204c2eb3d63 100644 --- a/docs/mllib-optimization.md +++ b/docs/mllib-optimization.md @@ -207,6 +207,10 @@ the loss computed for every iteration. Here is an example to train binary logistic regression with L2 regularization using L-BFGS optimizer. + +
+ +
{% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics @@ -263,7 +267,110 @@ println("Loss of each step in training process") loss.foreach(println) println("Area under ROC = " + auROC) {% endhighlight %} - +
+ +
+{% highlight java %} +import java.util.Random; +import java.util.Arrays; + +import scala.Product2; +import scala.Tuple2; + +import org.apache.spark.api.java.*; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.optimization.*; +import org.apache.spark.mllib.classification.LogisticRegressionModel; +import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; + +public class LBFGSExample { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("L-BFGS Example"); + SparkContext sc = new SparkContext(conf); + String path = "{SPARK_HOME}/mllib/data/sample_libsvm_data.txt"; + JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); + int numFeatures = data.take(1).get(0).features().size(); + + // Split initial RDD into two... [60% training data, 40% testing data]. + JavaRDD trainingInit = data.filter( + new Function() { + public final Random random = new Random(11L); + public Boolean call(LabeledPoint p) { + if (random.nextDouble() <= 0.6) + return true; + else + return false; + } + } + ); + JavaRDD test = data.subtract(trainingInit); + + // Append 1 into the training data as intercept. + JavaRDD> training = data.map( + new Function>() { + public Tuple2 call(LabeledPoint p) { + return new Tuple2(p.label(), MLUtils.appendBias(p.features())); + } + }); + training.cache(); + + // Run training algorithm to build the model. + int numCorrections = 10; + double convergenceTol = 1e-4; + int maxNumIterations = 20; + double regParam = 0.1; + Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]); + + Tuple2 result = LBFGS.runLBFGS( + JavaRDD.toRDD(training), + new LogisticGradient(), + new SquaredL2Updater(), + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + initialWeightsWithIntercept); + Vector weightsWithIntercept = (Vector) result.productElement(0); + double[] loss = (double[]) result.productElement(1); + + final LogisticRegressionModel model = new LogisticRegressionModel( + Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)), + (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]); + + // Clear the default threshold. + model.clearThreshold(); + + // Compute raw scores on the test set. + JavaRDD> scoreAndLabels = test.map( + new Function>() { + public final LogisticRegressionModel m = model; + public Tuple2 call(LabeledPoint p) { + Double score = m.predict(p.features()); + return new Tuple2(score, p.label()); + } + } + ); + + // Get evaluation metrics. + BinaryClassificationMetrics metrics = + new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); + double auROC = metrics.areaUnderROC(); + + System.out.println("Loss of each step in training process"); + for (double l : loss) + System.out.println(l); + System.out.println("Area under ROC = " + auROC); + } +} +{% endhighlight %} +
+
#### Developer's note Since the Hessian is constructed approximately from previous gradient evaluations, the objective function can not be changed during the optimization process. From 8ffe5abe47fbefe94d19e051a8884b2d9572d81c Mon Sep 17 00:00:00 2001 From: Michael Giannakopoulos Date: Tue, 15 Jul 2014 07:43:04 -0400 Subject: [PATCH 5/6] Update code so as to comply with code standards. --- docs/mllib-clustering.md | 10 ++-- docs/mllib-collaborative-filtering.md | 23 ++++---- docs/mllib-dimensionality-reduction.md | 26 ++++----- docs/mllib-linear-methods.md | 74 +++++++++++--------------- docs/mllib-optimization.md | 61 +++++++++------------ 5 files changed, 83 insertions(+), 111 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index a9bcaea846a46..561de48910132 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -74,12 +74,12 @@ that is equivalent to the provided example in Scala is given bellow: {% highlight java %} import org.apache.spark.api.java.*; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; -import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.SparkConf; public class KMeansExample { public static void main(String[] args) { @@ -87,7 +87,7 @@ public class KMeansExample { JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse data - String path = "{SPARK_HOME}/data/kmeans_data.txt"; + String path = "data/mllib/kmeans_data.txt"; JavaRDD data = sc.textFile(path); JavaRDD parsedData = data.map( new Function() { @@ -104,10 +104,10 @@ public class KMeansExample { // Cluster the data into two classes using KMeans int numClusters = 2; int numIterations = 20; - KMeansModel clusters = KMeans.train(JavaRDD.toRDD(parsedData), numClusters, numIterations); + KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); // Evaluate clustering by computing Within Set Sum of Squared Errors - double WSSSE = clusters.computeCost(JavaRDD.toRDD(parsedData)); + double WSSSE = clusters.computeCost(parsedData.rdd()); System.out.println("Within Set Sum of Squared Errors = " + WSSSE); } } diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index b82c8fab008cc..0d28b5f7c89b3 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -103,15 +103,14 @@ calling `.rdd()` on your `JavaRDD` object. A standalone application example that is equivalent to the provided example in Scala is given bellow: {% highlight java %} -import scala.Product; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.recommendation.ALS; -import org.apache.spark.mllib.recommendation.Rating; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; +import org.apache.spark.mllib.recommendation.Rating; +import org.apache.spark.SparkConf; public class CollaborativeFiltering { public static void main(String[] args) { @@ -119,7 +118,7 @@ public class CollaborativeFiltering { JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse the data - String path = "/home/michael/workspace/spark/mllib/data/als/test.data"; + String path = "data/mllib/als/test.data"; JavaRDD data = sc.textFile(path); JavaRDD ratings = data.map( new Function() { @@ -153,7 +152,7 @@ public class CollaborativeFiltering { } } )); - JavaPairRDD, Tuple2> ratesAndPreds = + JavaRDD> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map( new Function, Double>>() { public Tuple2, Double> call(Rating r){ @@ -161,17 +160,15 @@ public class CollaborativeFiltering { new Tuple2(r.user(), r.product()), r.rating()); } } - )).join(predictions); - double MSE = JavaDoubleRDD.fromRDD(JavaRDD.toRDD(ratesAndPreds.map( - new Function, Tuple2>, Object>() { - public Object call(Tuple2, Tuple2> key_value) { - Tuple2, Tuple2> val = - (Tuple2, Tuple2>) key_value.productElement(1); - Double err = ((Double) val.productElement(0)) - ((Double) val.productElement(1)); + )).join(predictions).values(); + double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map( + new Function, Object>() { + public Object call(Tuple2 pair) { + Double err = pair._1() - pair._2(); return err * err; } } - ))).mean(); + ).rdd()).mean(); System.out.println("Mean Squared Error = " + MSE); } } diff --git a/docs/mllib-dimensionality-reduction.md b/docs/mllib-dimensionality-reduction.md index a86a75127346c..8e434998c15ea 100644 --- a/docs/mllib-dimensionality-reduction.md +++ b/docs/mllib-dimensionality-reduction.md @@ -71,14 +71,14 @@ should include to your build file *spark-mllib* as a dependency. import java.util.LinkedList; import org.apache.spark.api.java.*; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.rdd.RDD; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.SingularValueDecomposition; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.rdd.RDD; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; public class SVD { public static void main(String[] args) { @@ -94,7 +94,7 @@ public class SVD { JavaRDD rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList); // Create a RowMatrix from JavaRDD. - RowMatrix mat = new RowMatrix(JavaRDD.toRDD(rows)); + RowMatrix mat = new RowMatrix(rows.rdd()); // Compute the top 4 singular values and corresponding singular vectors. SingularValueDecomposition svd = mat.computeSVD(4, true, 1.0E-9d); @@ -149,13 +149,13 @@ The number of columns should be small, e.g, less than 1000. import java.util.LinkedList; import org.apache.spark.api.java.*; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.rdd.RDD; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.mllib.linalg.Matrix; -import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.apache.spark.rdd.RDD; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; public class PCA { public static void main(String[] args) { @@ -171,7 +171,7 @@ public class PCA { JavaRDD rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList); // Create a RowMatrix from JavaRDD. - RowMatrix mat = new RowMatrix(JavaRDD.toRDD(rows)); + RowMatrix mat = new RowMatrix(rows.rdd()); // Compute the top 3 principal components. Matrix pc = mat.computePrincipalComponents(3); diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index 756bb37632181..254201147edc1 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -251,40 +251,30 @@ import java.util.Random; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; -import org.apache.spark.SparkContext; -import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.mllib.classification.*; -import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; public class SVMClassifier { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); SparkContext sc = new SparkContext(conf); - String path = "{SPARK_HOME}/mllib/data/sample_libsvm_data.txt"; + String path = "data/mllib/sample_libsvm_data.txt"; JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); // Split initial RDD into two... [60% training data, 40% testing data]. - JavaRDD training = data.filter( - new Function() { - public final Random random = new Random(11L); - public Boolean call(LabeledPoint p) { - if (random.nextDouble() <= 0.6) - return true; - else - return false; - } - } - ); + JavaRDD training = data.sample(false, 0.6, 11L); training.cache(); JavaRDD test = data.subtract(training); // Run training algorithm to build the model. int numIterations = 100; - final SVMModel model = SVMWithSGD.train(JavaRDD.toRDD(training), numIterations); + final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations); // Clear the default threshold. model.clearThreshold(); @@ -292,9 +282,8 @@ public class SVMClassifier { // Compute raw scores on the test set. JavaRDD> scoreAndLabels = test.map( new Function>() { - public final SVMModel m = model; public Tuple2 call(LabeledPoint p) { - Double score = m.predict(p.features()); + Double score = model.predict(p.features()); return new Tuple2(score, p.label()); } } @@ -322,10 +311,11 @@ algorithm for 200 iterations. import org.apache.spark.mllib.optimization.L1Updater; SVMWithSGD svmAlg = new SVMWithSGD(); -svmAlg.optimizer().setNumIterations(200); -svmAlg.optimizer().setRegParam(0.1); -svmAlg.optimizer().setUpdater(new L1Updater()); -final SVMModel modelL1 = svmAlg.run(JavaRDD.toRDD(training)); +svmAlg.optimizer() + .setNumIterations(200) + .setRegParam(0.1) + .setUpdater(new L1Updater()); +final SVMModel modelL1 = svmAlg.run(training.rdd()); {% endhighlight %} In order to run the above standalone application using Spark framework make @@ -431,15 +421,16 @@ calling `.rdd()` on your `JavaRDD` object. The corresponding Java example to the Scala snippet provided, is presented bellow: {% highlight java %} +import scala.Tuple2; + import org.apache.spark.api.java.*; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.mllib.regression.LinearRegressionWithSGD; import org.apache.spark.mllib.regression.LinearRegressionModel; -import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.regression.LinearRegressionWithSGD; +import org.apache.spark.SparkConf; public class LinearRegression { public static void main(String[] args) { @@ -447,7 +438,7 @@ public class LinearRegression { JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse the data - String path = "{SPARK_HOME}/mllib/data/ridge-data/lpsa.data"; + String path = "data/mllib/ridge-data/lpsa.data"; JavaRDD data = sc.textFile(path); JavaRDD parsedData = data.map( new Function() { @@ -468,24 +459,21 @@ public class LinearRegression { LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); // Evaluate model on training examples and compute training error - JavaRDD valuesAndPreds = parsedData.map( - new Function() { - public final LinearRegressionModel m = model; - public double[] call(LabeledPoint point) { - double prediction = m.predict(point.features()); - return new double[] {prediction, point.label()}; + JavaRDD> valuesAndPreds = parsedData.map( + new Function>() { + public Tuple2 call(LabeledPoint point) { + double prediction = model.predict(point.features()); + return new Tuple2(prediction, point.label()); } } ); - JavaRDD MSERdd = valuesAndPreds.map( - new Function() { - public Object call(double[] lp) { - return Math.pow(lp[0] - lp[1], 2.0); + JavaRDD MSE = new JavaDoubleRDD(valuesAndPreds.map( + new Function, Object>() { + public Object call(Tuple2 pair) { + return Math.pow(pair._1() - pair._2(), 2.0); } } - ); - JavaDoubleRDD MSEDoubleRdd = new JavaDoubleRDD(JavaRDD.toRDD(MSERdd)); - double MSE = MSEDoubleRdd.mean(); + ).rdd()).mean(); System.out.println("training Mean Squared Error = " + MSE); } } diff --git a/docs/mllib-optimization.md b/docs/mllib-optimization.md index 8582f3a32385f..26ce5f3c501ff 100644 --- a/docs/mllib-optimization.md +++ b/docs/mllib-optimization.md @@ -271,44 +271,33 @@ println("Area under ROC = " + auROC)
{% highlight java %} -import java.util.Random; import java.util.Arrays; +import java.util.Random; -import scala.Product2; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.mllib.classification.LogisticRegressionModel; +import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.optimization.*; -import org.apache.spark.mllib.classification.LogisticRegressionModel; -import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; public class LBFGSExample { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("L-BFGS Example"); SparkContext sc = new SparkContext(conf); - String path = "{SPARK_HOME}/mllib/data/sample_libsvm_data.txt"; + String path = "data/mllib/sample_libsvm_data.txt"; JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); int numFeatures = data.take(1).get(0).features().size(); // Split initial RDD into two... [60% training data, 40% testing data]. - JavaRDD trainingInit = data.filter( - new Function() { - public final Random random = new Random(11L); - public Boolean call(LabeledPoint p) { - if (random.nextDouble() <= 0.6) - return true; - else - return false; - } - } - ); + JavaRDD trainingInit = data.sample(false, 0.6, 11L); JavaRDD test = data.subtract(trainingInit); // Append 1 into the training data as intercept. @@ -328,20 +317,20 @@ public class LBFGSExample { Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]); Tuple2 result = LBFGS.runLBFGS( - JavaRDD.toRDD(training), - new LogisticGradient(), - new SquaredL2Updater(), - numCorrections, - convergenceTol, - maxNumIterations, - regParam, - initialWeightsWithIntercept); - Vector weightsWithIntercept = (Vector) result.productElement(0); - double[] loss = (double[]) result.productElement(1); + training.rdd(), + new LogisticGradient(), + new SquaredL2Updater(), + numCorrections, + convergenceTol, + maxNumIterations, + regParam, + initialWeightsWithIntercept); + Vector weightsWithIntercept = result._1(); + double[] loss = result._2(); final LogisticRegressionModel model = new LogisticRegressionModel( - Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)), - (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]); + Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)), + (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]); // Clear the default threshold. model.clearThreshold(); @@ -349,17 +338,15 @@ public class LBFGSExample { // Compute raw scores on the test set. JavaRDD> scoreAndLabels = test.map( new Function>() { - public final LogisticRegressionModel m = model; public Tuple2 call(LabeledPoint p) { - Double score = m.predict(p.features()); + Double score = model.predict(p.features()); return new Tuple2(score, p.label()); } - } - ); + }); // Get evaluation metrics. BinaryClassificationMetrics metrics = - new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); + new BinaryClassificationMetrics(scoreAndLabels.rdd()); double auROC = metrics.areaUnderROC(); System.out.println("Loss of each step in training process"); From 4912516b64dba9bcb48d6e0060b01eeac0b63cdb Mon Sep 17 00:00:00 2001 From: Michael Giannakopoulos Date: Sun, 27 Jul 2014 22:23:06 -0400 Subject: [PATCH 6/6] Added support for regularizer and intercection parameters for linear regression method. --- .../mllib/api/python/PythonMLLibAPI.scala | 59 +++++++++++++++++++ python/pyspark/mllib/regression.py | 17 ++++++ 2 files changed, 76 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 954621ee8b933..a4170dbd8a20a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -24,6 +24,7 @@ import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.util.MLUtils @@ -261,6 +262,64 @@ class PythonMLLibAPI extends Serializable { initialWeightsBA) } + /** + * Java stub for Python mllib LinearRegressionWithSGD.train() function + * allowing users to define the regularizer and intercept parameters using L2 + * optimization. + */ + def trainLinearRegressionModelWithSGDL2Opt( + dataBytesJRDD: JavaRDD[Array[Byte]], + numIterations: Int, + stepSize: Double, + regParam: Double, + intercept: Boolean, + miniBatchFraction: Double, + initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + val lrAlg = new LinearRegressionWithSGD() + lrAlg.setIntercept(intercept) + lrAlg.optimizer. + setNumIterations(numIterations). + setRegParam(regParam). + setStepSize(stepSize). + setUpdater(new SquaredL2Updater) + trainRegressionModel( + (data, initialWeights) => + lrAlg.run( + data, + initialWeights), + dataBytesJRDD, + initialWeightsBA) + } + + /** + * Java stub for Python mllib LinearRegressionWithSGD.train() function + * allowing users to define the regularizer and intercept parameters using L1 + * optimization. + */ + def trainLinearRegressionModelWithSGDL1Opt( + dataBytesJRDD: JavaRDD[Array[Byte]], + numIterations: Int, + stepSize: Double, + regParam: Double, + intercept: Boolean, + miniBatchFraction: Double, + initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + val lrAlg = new LinearRegressionWithSGD() + lrAlg.setIntercept(intercept) + lrAlg.optimizer. + setNumIterations(numIterations). + setRegParam(regParam). + setStepSize(stepSize). + setUpdater(new L1Updater) + trainRegressionModel( + (data, initialWeights) => + lrAlg.run( + data, + initialWeights), + dataBytesJRDD, + initialWeightsBA) + } + /** * Java stub for Python mllib LassoWithSGD.train() */ diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index b84bc531dec8c..fa50407909c7a 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -120,6 +120,23 @@ def train(cls, data, iterations=100, step=1.0, d._jrdd, iterations, step, miniBatchFraction, i) return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights) + @classmethod + def trainL2Opt(cls, data, iterations=100, step=1.0, regParam=1.0, + intercept=False, miniBatchFraction=1.0, initialWeights=None): + """Train a linear regression model on the given data using L2 optimizer.""" + sc = data.context + train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGDL2Opt( + d._jrdd, iterations, step, regParam, intercept, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights) + + @classmethod + def trainL1Opt(cls, data, iterations=100, step=1.0, regParam=1.0, + intercept=False, miniBatchFraction=1.0, initialWeights=None): + """Train a linear regression model on the given data using L1 optimizer.""" + sc = data.context + train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGDL1Opt( + d._jrdd, iterations, step, regParam, intercept, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights) class LassoModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit with an