Skip to content

Commit

Permalink
Update loadwholecsv and basicavgfromfiles
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed May 14, 2014
1 parent 7f4fec4 commit ce8a86b
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
@@ -0,0 +1,49 @@
/**
* Illustrates joining two csv files
*/
package com.oreilly.learningsparkexamples.java;

import java.io.StringReader;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import scala.Tuple2;

import au.com.bytecode.opencsv.CSVReader;

import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

public class BasicLoadWholeCsv {

public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> {
public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
CSVReader reader = new CSVReader(new StringReader(file._2()));
return reader.readAll();
}
}

public static void main(String[] args) throws Exception {
if (args.length != 3) {
throw new Exception("Usage BasicLoadCsv sparkMaster csvInputFile csvOutputFile key");
}
String master = args[0];
String csvInput = args[1];
String outputFile = args[2];
final String key = args[3];

JavaSparkContext sc = new JavaSparkContext(
master, "basicjoincsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaPairRDD<String, String> csvData = sc.wholeTextFiles(csvInput);
JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
JavaRDD<String[]> result =
keyedRDD.filter(new Function<String[], Boolean>() {
public Boolean call(String[] input) { return input[0].equals(key); }});

result.saveAsTextFile(outputFile);
}
}
Expand Up @@ -8,18 +8,19 @@ import org.apache.spark.SparkContext._

object BasicAvgFromFiles {
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: [sparkmaster] [inputdirectory]")
if (args.length < 3) {
println("Usage: [sparkmaster] [inputdirectory] [outputdirectory]")
exit(1)
}
val master = args(0)
val inputFile = args(1)
val outputFile = args(2)
val sc = new SparkContext(master, "BasicAvgFromFiles", System.getenv("SPARK_HOME"))
val input = sc.wholeTextFiles(inputFile)
val result = input.mapValues{y =>
val nums = y.split(" ").map(_.toDouble)
nums.sum / nums.size.toDouble
}
println(result.collect())
result.saveAsTextFile(outputFile)
}
}

0 comments on commit ce8a86b

Please sign in to comment.