# Big Data: Spark 101

## Getting acquainted with Spark and Spark Notebook

 Never used a Notebook? 
 Find useful advice in the 
 [Jupyter docs](http://jupyter-notebook.readthedocs.org/en/latest/examples/Notebook/rstversions/Notebook%20Basics.html).
 
 _I recommend taking a look at the shortcuts below that page. Press "H" for help (in command mode)._

## First steps Spark and Scala

### My First RDD

RDDs can be initiated from in-memory collections or from files in the (distributed or local) file system.

Let's first initialize an RDD from a collection. The second parameter is optional, and instructs the platform to split the data in 8 partitions.

In [ ]:
val rdd = sc.parallelize(0 to 999,8)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:48


Remember that evaluation is lazy, and only happens upon actions, not transformations; i.e., so far, nothing happened.

_Check:_ Spark UI: [stages](http://localhost:4040/stages/) is still empty.

In [ ]:
val sample = rdd.takeSample(false, 4)

sample: Array[Int] = Array(843, 130, 284, 199)


Only now, evaluation took place: see the [stages](http://localhost:4040/stages/) in the Spark UI.
Click on the links!
_Why are there two jobs?_

### Data

[Assignment 2A](http://rubigdata.github.io/course/assignments/A2a-spark-101.html) gives instructions how to get the full Shakespeare on your Spark Notebook container.

Use a shell escape to test if the Gutenberg data was correctly loaded on the docker container running the Spark Notebook.

In [ ]:
:sh ls /data

import sys.process._
res3: String =
"100.txt.utf-8
"


100.txt.utf-8


### Counting words

We will use the Shakespeare data for the classic Big Data "Hello World" exercise, counting words.

In [ ]:
val lines = sc.textFile("/data/100.txt.utf-8")

lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:51


Can you predict what the following commands will do?
Recognize the Map Reduce pattern on lines 2 and 3?

In [ ]:
println( "Lines:\t", lines.count, "\n" + 
         "Chars:\t", lines.map(s => s.length).
                           reduce((a, b) => a + b))

(Lines:	,124787,
Chars:	,5340312)


The map operator executes its parameter, the lambda function, on every item in the RDD.
Reduce is also defined using a lambda function.

_Note:_ if you never took a functional programming course, look at [this answer on StackExchange](http://stackoverflow.com/a/16509/2127435).

Now try to understand in detail the following example.
Why do we use `flatMap` and not `map`?

It is worth copying the cell, and inspecting output at intermediate steps (use `take()`, not `collect()` - _do you remember why?_).

In [ ]:
val words = lines.flatMap(line => line.split(" "))
              .filter(_ != "")
              .map(word => (word,1))

words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at map at <console>:60


In [ ]:
val wc = words.reduceByKey(_ + _)

wc: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:53


In [ ]:
wc.take(10)

res5: Array[(String, Int)] = Array((hack'd.,1), (durst,,1), (Ah!,3), (bone,7), (Worthy;,1), (vailing,1), (bombast,1), (person-,1), (LAFEU],1), (fiction.,1))


Take a look at how the platform processes this query:

In [ ]:
wc.toDebugString

res6: String =
(2) ShuffledRDD[16] at reduceByKey at <console>:53 []
 +-(2) MapPartitionsRDD[15] at map at <console>:54 []
    |  MapPartitionsRDD[14] at filter at <console>:53 []
    |  MapPartitionsRDD[13] at flatMap at <console>:52 []
    |  MapPartitionsRDD[11] at textFile at <console>:51 []
    |  /data/100.txt.utf-8 HadoopRDD[10] at textFile at <console>:51 []


Inspect the Spark UI to see the computations in the cluster → 
[see stages](http://localhost:4040/stages/) and [see tasks](http://localhost:4040/stages/stage/?id=7&attempt=0)

### To count or not to count
Ok, we can count words - let us find out which words Shakespeare used most often!

In [ ]:
val top10 = wc.takeOrdered(10)

top10: Array[(String, Int)] = Array((",241), ("'Tis,1), ("A,4), ("AS-IS".,1), ("Air,",1), ("Alas,,1), ("Amen",2), ("Amen"?,1), ("Amen,",1), ("And,1))


Ok, not quite what we wanted!
See what's wrong?

Let's fix the result ordering as follows.

In [ ]:
val top10 = wc.takeOrdered(10)(Ordering[Int].reverse.on(x=>x._2))

top10: Array[(String, Int)] = Array((the,23407), (I,19540), (and,18358), (to,15682), (of,15649), (a,12586), (my,10825), (in,9633), (you,9129), (is,7874))


You can render the collected results however you want to using the client programming language.

In [ ]:
top10.map({case(w,c) => "Word '%s' occurs %d times".format(w,c)}).map(println)

Word 'the' occurs 23407 times
Word 'I' occurs 19540 times
Word 'and' occurs 18358 times
Word 'to' occurs 15682 times
Word 'of' occurs 15649 times
Word 'a' occurs 12586 times
Word 'my' occurs 10825 times
Word 'in' occurs 9633 times
Word 'you' occurs 9129 times
Word 'is' occurs 7874 times
res7: Array[Unit] = Array((), (), (), (), (), (), (), (), (), ())


We can zoom in on specific word frequencies, that might be more interesting than stopwords!

In [ ]:
wc.filter(_._1 == "Romeo").collect

res19: Array[(String, Int)] = Array((Romeo,47))


In [ ]:
wc.filter(_._1 == "Julia").collect

res20: Array[(String, Int)] = Array((Julia,12))


In [ ]:
wc.cache()

res21: wc.type = ShuffledRDD[16] at reduceByKey at <console>:53


In [ ]:
wc.filter(_._1 == "Macbeth").collect

res22: Array[(String, Int)] = Array((Macbeth,30))


In [ ]:
wc.filter(_._1 == "Capulet").collect

res23: Array[(String, Int)] = Array((Capulet,5))


The next section saves the results of word counting in the filesystem. 

We use a simple shell command to look into the directory that has been created.
(Alternatively, you can navigate the filesystem after issuing a `docker exec -it HASH bash` command on the machine running the notebook container.)

In [ ]:
words.saveAsTextFile("wc")

In [ ]:
:sh ls wc

import sys.process._
res13: String =
"_SUCCESS
part-00000
part-00001
"


_SUCCESS
part-00000
part-00001


_Q: Explain why there are multiple result files._

In [ ]:
val result = "head -20 wc/part-00001" !!

result: String =
"(on,1)
(a,1)
(giant.,1)
(Bring,1)
(up,1)
(the,1)
(brown,1)
(bills.,1)
(O,,1)
(well,1)
(flown,,1)
(bird!,1)
(i',1)
(th',1)
(clout,,1)
(i',1)
(th',1)
(clout!,1)
(Hewgh!,1)
(Give,1)
"


Clean up the directory to save headaches when later rerunning the notebook.

In [ ]:
:sh rm -rf wc

import sys.process._
res14: String = ""




### How to count?

In [ ]:
val words = lines.flatMap(line => line.split(" "))
              .map(w => w.toLowerCase().replaceAll("(^[^a-z]+|[^a-z]+$)", ""))
              .filter(_ != "")
              .map(w => (w,1))
              .reduceByKey( _ + _ )

words: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[43] at reduceByKey at <console>:62


In [ ]:
words.filter(_._1 == "macbeth").collect
  .map({case (w,c) => "%s occurs %d times".format(w,c)}).map(println)

macbeth occurs 284 times
res24: Array[Unit] = Array(())


_Q: why are the counts different?_

**See also**

* http://spark.apache.org/examples.html
* http://spark.apache.org/docs/latest/programming-guide.html

Additional info on Scala (just in case, some background may be useful to get things done, but do not get carried away - the course is about big data processing, not functional programming!)
* Main [Scala site](http://scala-lang.org/), [tutorial](http://docs.scala-lang.org/tutorials/scala-for-java-programmers.html) and [API documentation](http://www.scala-lang.org/api/current/index.html)

Spark Notebook has many advanced uses, that are briefly discussed in the online documentation (which is however _work in progress_ and not always up to date). I found [details.md](https://github.com/andypetrella/spark-notebook/blob/master/details.md) worth a quick scan.

**PySpark alternative**

Students feeling adventurous may opt to carry out the coursework in python instead of scala, using `pyspark` on an alternative docker image, `prabeeshk/pyspark-notebook`.

After pulling that image, you can start a pyspark container using `docker run -d -t -p 8888:8888 -p 4040:4038 prabeeshk/pyspark-notebook`; notice that I mapped port 4040 to port 4038, such that the spark UI opens on [localhost:4038](http://localhost:4038) instead; this to avoid problems with the Spark Notebook that opened up 4000.