Skip to content

Commit

Permalink
Source files for presentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dean Wampler committed Dec 5, 2014
1 parent ca1f59a commit f227672
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 0 deletions.
88 changes: 88 additions & 0 deletions WhyScalaIsTakingOverTheBigDataWorld/MapReduce.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class LineIndexer {

public static void main(String[] args) {
JobClient client = new JobClient();
JobConf conf =
new JobConf(LineIndexer.class);

conf.setJobName("LineIndexer");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(conf,
new Path("input"));
FileOutputFormat.setOutputPath(conf,
new Path("output"));
conf.setMapperClass(
LineIndexMapper.class);
conf.setReducerClass(
LineIndexReducer.class);

client.setConf(conf);

try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}

public static class LineIndexMapper
extends MapReduceBase
implements Mapper<LongWritable, Text,
Text, Text> {
private final static Text word =
new Text();
private final static Text location =
new Text();

public void map(
LongWritable key, Text val,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {

FileSplit fileSplit =
(FileSplit)reporter.getInputSplit();
String fileName =
fileSplit.getPath().getName();
location.set(fileName);

String line = val.toString();
StringTokenizer itr = new
StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, location);
}
}
}

public static class LineIndexReducer
extends MapReduceBase
implements Reducer<Text, Text,
Text, Text> {
public void reduce(Text key,
Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
boolean first = true;
StringBuilder toReturn =
new StringBuilder();
while (values.hasNext()) {
if (!first)
toReturn.append(", ");
first=false;
toReturn.append(
values.next().toString());
}
output.collect(key,
new Text(toReturn.toString()));
}
}
}
22 changes: 22 additions & 0 deletions WhyScalaIsTakingOverTheBigDataWorld/Scalding.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import com.twitter.scalding._

class InvertedIndex(args: Args)
extends Job(args) {

  val texts = Tsv("texts.tsv", ('id, 'text))

  val wordToIds = texts
   .flatMap(('id, 'text) -> ('word, 'id2)) {
    fields: (Long, String) =>
     val (id2, text) =
      fields.text.split("\\s+").map {
word => (word, id2)
}
   }

  val invertedIndex =
wordToTweets.groupBy('word) {
_.toList[Long]('id2 -> 'ids)
}  
  invertedIndex.write(Tsv("output.tsv"))
}
44 changes: 44 additions & 0 deletions WhyScalaIsTakingOverTheBigDataWorld/Spark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object InvertedIndex {
def main(args: Array[String]) = {

val sc = new SparkContext(
"local", "Inverted Index")

sc.textFile("data/crawl")
.map { line =>
val array = line.split("\t", 2)
(array(0), array(1))
}
.flatMap {
case (path, text) =>
text.split("""\W+""") map {
word => (word, path)
}
}
.map {
case (w, p) => ((w, p), 1)
}
.reduceByKey {
case (n1, n2) => n1 + n2
}
.map {
case ((w, p), n) => (w, (p, n))
}
.groupBy {
case (w, (p, n)) => w
}
.map {
case (w, seq) =>
val seq2 = seq map {
case (_, (p, n)) => (p, n)
}
(w, seq2.mkString(", "))
}
.saveAsTextFile(argz.outpath)

sc.stop()
}
}
61 changes: 61 additions & 0 deletions WhyScalaIsTakingOverTheBigDataWorld/SparkSQLStreaming.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Adapted from Typesafe's Spark Workshop exercises.
// Copyright (c) 2014, Typesafe. All Rights Reserved.
//
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext

case class Flight(
number: Int, carrier: String, origin: String, destination: String)
object Flight {
def parse(recordString: String): Option[Flight] = {
val recordRE = """\[(\d+),(\w+),(\w+),(\w+)\]""".r
recordString match {
case recordRE(id, carrier, origin, dest) =>
Some(Flight(id.toInt, carrier, origin, dest))
case _ =>
println("Invalid Flight record: "+recordString)
None
}
}
}

object FrequentFlights {
def main(args: Array[String]): Unit = {
val master = array
val sourceServerPort = array(1)
run(master, sourceServerPort)
}

def run(master: String, serverPort: String): Unit = {
val SparkContext = new SparkContext(master, "FrequentFlights")
// Capture 60 second batches of events.
val streamingContext = new StreamingContext(SparkContext, Seconds(60))
val sqlContext = new SQLContext(SparkContext)
import sqlContext._

val (server,port) = {
val array = serverPort.split(":")
(array(0), array(1).toInt)
}
val dstream = streamingContext.socketTextStream(server, port)
val flights = for {
line <- dstream
flight <- Flight.parse(line)
} yield flight

flights.foreachRDD { (rdd, time) =>
rdd.registerTempTable("flights")
sql(s"""
SELECT $time, carrier, origin, destination, COUNT(*)
FROM flights
GROUP BY carrier, origin, destination
ORDER BY c4 DESC
LIMIT 20""").foreach(println)
}
streamingContext.start()
streamingContext.awaitTermination(2*1000)
}
}

0 comments on commit f227672

Please sign in to comment.