Skip to content

Commit

Permalink
SPARK-1565, update examples to be used with spark-submit script.
Browse files Browse the repository at this point in the history
Commit for initial feedback, basically I am curious if we should prompt user for providing args esp. when its mandatory. And can we skip if they are not ?

Also few other things that did not work like
`bin/spark-submit examples/target/scala-2.10/spark-examples-1.0.0-SNAPSHOT-hadoop1.0.4.jar --class org.apache.spark.examples.SparkALS --arg 100 500 10 5 2`

Not all the args get passed properly, may be I have messed up something will try to sort it out hopefully.

Author: Prashant Sharma <prashant.s@imaginea.com>

Closes #552 from ScrapCodes/SPARK-1565/update-examples and squashes the following commits:

669dd23 [Prashant Sharma] Review comments
2727e70 [Prashant Sharma] SPARK-1565, update examples to be used with spark-submit script.
  • Loading branch information
ScrapCodes authored and pwendell committed May 8, 2014
1 parent 19c8fb0 commit 44dd57f
Show file tree
Hide file tree
Showing 56 changed files with 405 additions and 480 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ unit-tests.log
/lib/
rat-results.txt
scalastyle.txt
conf/*.conf

# For Hive
metastore_db/
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class SparkContext(config: SparkConf) extends Logging {
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
Expand Down
13 changes: 7 additions & 6 deletions examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.examples;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -103,16 +104,16 @@ public static void printWeights(double[] a) {

public static void main(String[] args) {

if (args.length < 3) {
System.err.println("Usage: JavaHdfsLR <master> <file> <iters>");
if (args.length < 2) {
System.err.println("Usage: JavaHdfsLR <file> <iters>");
System.exit(1);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
int ITERATIONS = Integer.parseInt(args[1]);

// Initialize w to a random value
double[] w = new double[D];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.Lists;
import scala.Tuple2;
import scala.Tuple3;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -34,6 +35,8 @@

/**
* Executes a roll up-style query against Apache logs.
*
* Usage: JavaLogQuery [logFile]
*/
public final class JavaLogQuery {

Expand Down Expand Up @@ -97,15 +100,11 @@ public static Stats extractStats(String line) {
}

public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
}

JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);

JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.spark.examples;



import scala.Tuple2;

import com.google.common.collect.Iterables;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -54,20 +57,20 @@ public Double call(Double a, Double b) {
}

public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
if (args.length < 2) {
System.err.println("Usage: JavaPageRank <file> <number_of_iterations>");
System.exit(1);
}

JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
Expand All @@ -87,7 +90,7 @@ public Double call(Iterable<String> rs) {
});

// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
for (int current = 0; current < Integer.parseInt(args[1]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.examples;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand All @@ -25,19 +26,18 @@
import java.util.ArrayList;
import java.util.List;

/** Computes an approximation to pi */
/**
* Computes an approximation to pi
* Usage: JavaSparkPi [slices]
*/
public final class JavaSparkPi {


public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: JavaSparkPi <master> [slices]");
System.exit(1);
}

JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaSparkPi",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
Expand Down
24 changes: 11 additions & 13 deletions examples/src/main/java/org/apache/spark/examples/JavaTC.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@

package org.apache.spark.examples;

import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

/**
* Transitive closure on a graph, implemented in Java.
* Usage: JavaTC [slices]
*/
public final class JavaTC {

Expand Down Expand Up @@ -61,14 +64,9 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> t
}

public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaTC <host> [<slices>]");
System.exit(1);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();

// Linear transitive closure: each round grows paths by one edge,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.examples;

import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -33,14 +34,15 @@ public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");

if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}

JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
Expand Down
22 changes: 11 additions & 11 deletions examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.examples.mllib;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -57,23 +58,22 @@ public String call(Tuple2<Object, double[]> element) {

public static void main(String[] args) {

if (args.length != 5 && args.length != 6) {
if (args.length < 4) {
System.err.println(
"Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
"Usage: JavaALS <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
System.exit(1);
}

int rank = Integer.parseInt(args[2]);
int iterations = Integer.parseInt(args[3]);
String outputDir = args[4];
SparkConf sparkConf = new SparkConf().setAppName("JavaALS");
int rank = Integer.parseInt(args[1]);
int iterations = Integer.parseInt(args[2]);
String outputDir = args[3];
int blocks = -1;
if (args.length == 6) {
blocks = Integer.parseInt(args[5]);
if (args.length == 5) {
blocks = Integer.parseInt(args[4]);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);

JavaRDD<Rating> ratings = lines.map(new ParseRating());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -48,24 +49,21 @@ public Vector call(String line) {
}

public static void main(String[] args) {

if (args.length < 4) {
if (args.length < 3) {
System.err.println(
"Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]");
"Usage: JavaKMeans <input_file> <k> <max_iterations> [<runs>]");
System.exit(1);
}

String inputFile = args[1];
int k = Integer.parseInt(args[2]);
int iterations = Integer.parseInt(args[3]);
String inputFile = args[0];
int k = Integer.parseInt(args[1]);
int iterations = Integer.parseInt(args[2]);
int runs = 1;

if (args.length >= 5) {
runs = Integer.parseInt(args[4]);
if (args.length >= 4) {
runs = Integer.parseInt(args[3]);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(inputFile);

JavaRDD<Vector> points = lines.map(new ParsePoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -51,17 +52,16 @@ public LabeledPoint call(String line) {
}

public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
if (args.length != 3) {
System.err.println("Usage: JavaLR <input_dir> <step_size> <niters>");
System.exit(1);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
SparkConf sparkConf = new SparkConf().setAppName("JavaLR");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);
int iterations = Integer.parseInt(args[3]);
double stepSize = Double.parseDouble(args[1]);
int iterations = Integer.parseInt(args[2]);

// Another way to configure LogisticRegression
//
Expand All @@ -73,7 +73,7 @@ public static void main(String[] args) {
// LogisticRegressionModel model = lr.train(points.rdd());

LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
iterations, stepSize);
iterations, stepSize);

System.out.print("Final w: " + model.weights());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -51,8 +52,8 @@ public void setAge(int age) {
}

public static void main(String[] args) throws Exception {
JavaSparkContext ctx = new JavaSparkContext("local", "JavaSparkSQL",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkSQL.class));
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

// Load a text file and convert each line to a Java Bean.
Expand Down
Loading

0 comments on commit 44dd57f

Please sign in to comment.