# Introduction to Spark RDD

## Setup 
`pip install pyspark`

In this notebook, you will be introduced to the Apache Spark libary for big data processing. There's a now a python package called `pyspark` which will load Spark for you. Tthe variable `sc` is a Spark context that lets you interact with the Spark runtime. Check that it is correctly initialised:

In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
print(sc)
print("Ready to go!")

<SparkContext master=local[*] appName=pyspark-shell>
Ready to go!


### Learning activity: Create RDD with `parallelize`
Transform the list `words` into an rdd. The count should return `3`

In [2]:
words = ["We", "love", "Coding"]
words_rdd = sc.parallelize(words)
words_rdd.count()

3

### Learning activity: Create RDDs

To analyse large datasets using Spark you will load them into Resilient Distributed Datasets (RDDs). There are a number of ways in which you can create RDDs. Use the `parallelize()` function to create one from a Python collection, and use the `textFile()` function to create an RDD from the file `data/war-and-peace.txt`. 

In [3]:
data = sc.textFile("D:/DataScienceTraining/ADS06/02-spark_rdd/data/war-and-peace.txt")


In [4]:
data

D:/DataScienceTraining/ADS06/02-spark_rdd/data/war-and-peace.txt MapPartitionsRDD[3] at textFile at <unknown>:0

### Learning activity: Basic RDD manipulation

Print the number of lines in War and Peace using the method `count()`

In [5]:
data.count()


54223

Print the first 15 lines using the method `take()`.

In [6]:
for line in data.take(15):
    print(line)


                                      1869
                                 WAR AND PEACE
                                 by Leo Tolstoy
BK1
                                 BOOK ONE: 1805
BK1|CH1
  CHAPTER I

  "Well, Prince, so Genoa and Lucca are now just family estates of the
Buonapartes. But I warn you, if you don't tell me that this means war,
if you still try to defend the infamies and horrors perpetrated by
that Antichrist- I really believe he is Antichrist- I will have
nothing more to do with you and you are no longer my friend, no longer
my 'faithful slave,' as you call yourself! But how do you do? I see
I have frightened you- sit down and tell me all the news."


### Learning activity: `filter()` and `map()` and `distinct()`

Lets apply some transformations onto RDDs. The following helper function will be useful to select the words from a line.

In [10]:
# A helper function to compute the list of words in a line of text
import re
def get_words(line):
    return re.compile('\w+').findall(line)

print(get_words("This, is a test!"))

['This', 'is', 'a', 'test']


Use `filter()` to count the number of lines which mention `war` and the number of lines which mention `peace`.

In [None]:
# How often is war mentioned?
warcount = data.filter(lambda line: "war" in get_words(line)).count()
warcount


In [11]:
# How often is peace mentioned?
peacecount = data.filter(lambda line: "peace" in get_words(line)).count()
peacecount


104

Use `map()` to capitalise each line in the RDD, and print the first 15 capitalized lines.

In [12]:
# Capitalize each line in the RDD
for line in data.map(lambda line: line.upper()).take(15):
    print(line)


                                      1869
                                 WAR AND PEACE
                                 BY LEO TOLSTOY
BK1
                                 BOOK ONE: 1805
BK1|CH1
  CHAPTER I

  "WELL, PRINCE, SO GENOA AND LUCCA ARE NOW JUST FAMILY ESTATES OF THE
BUONAPARTES. BUT I WARN YOU, IF YOU DON'T TELL ME THAT THIS MEANS WAR,
IF YOU STILL TRY TO DEFEND THE INFAMIES AND HORRORS PERPETRATED BY
THAT ANTICHRIST- I REALLY BELIEVE HE IS ANTICHRIST- I WILL HAVE
NOTHING MORE TO DO WITH YOU AND YOU ARE NO LONGER MY FRIEND, NO LONGER
MY 'FAITHFUL SLAVE,' AS YOU CALL YOURSELF! BUT HOW DO YOU DO? I SEE
I HAVE FRIGHTENED YOU- SIT DOWN AND TELL ME ALL THE NEWS."


Use `flatMap()` to create an RDD of the words in War and Peace and count the number of words.

In [13]:
# Split each line into words using get_words()
words = data.flatMap(lambda line: get_words(line))
words.count()


573322

In [15]:
words.distinct()

PythonRDD[18] at RDD at PythonRDD.scala:48

Finally, use `distinct()` to count the number of different words in the RDD.

In [16]:
# Count the number of distinct words
words.distinct().count()


19206

### Learning activity: Set like transformations

Use the function `union()` to create an RDD of lines with either war or peace mentioned. Count how many lines.

In [17]:
warLines = data.filter(lambda line: "war" in get_words(line))
peaceLines = data.filter(lambda line: "peace" in get_words(line))
warOrPeaceLines = warLines.union(peaceLines)
warOrPeaceLines.count()


369

Use the function `intersection()` to create an RDD of lines with both war and peace being mentioned. Count how many lines.

In [18]:
warAndPeaceLines = warLines.intersection(peaceLines)
warAndPeaceLines.count()


7

Find all the lines that mention both war and peace without using `intersection()`

In [20]:
data.filter(lambda line: "war" in get_words(line) and "peace" in get_words(line)).take(5)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 37, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "d:\ProgramData\Miniconda3\envs\ads05\lib\site-packages\pyspark\rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "d:\ProgramData\Miniconda3\envs\ads05\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "d:\ProgramData\Miniconda3\envs\ads05\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "d:\ProgramData\Miniconda3\envs\ads05\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "d:\ProgramData\Miniconda3\envs\ads05\lib\site-packages\pyspark\rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "d:\ProgramData\Miniconda3\envs\ads05\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "d:\ProgramData\Miniconda3\envs\ads05\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "d:\ProgramData\Miniconda3\envs\ads05\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


### Learning activity: `reduce()`

You have already seen three actions: `collect()` which returns all elements in the RDD, `take(n)`, which return the first `n` elements of the RDD, and `count()` which returns the number of elements in the RDD.

The action `reduce()` takes as input a function which collapses two elements into one. Use it to find the longest word in War and Peace.

In [21]:
data.flatMap(lambda line: get_words(line)).reduce(lambda acc, line: line if len(line) > len(acc) else acc)


'characteristically'

### Bonus activity: Finding proper nouns

The Python function `str.istitle()` returns `True` if the string `str` is titlecased: the first character is uppercase and others are lowercase. Use it to:
* Find the set of distinct words in War and Peace which are titlecased
* Find the set of distinct words in War and Peace which are not titlecased

The Python function `str.lower()` returns a string with all characters of str lowercase. Use it, along with your previously generated RDD to find the set of words in War and Peace which only appear titlecased.

In [22]:
words = data.flatMap(lambda line: get_words(line)).distinct()
titled_words = words.filter(lambda word: word.istitle())
print(titled_words.count())

untitled_words = words.filter(lambda word: not word.istitle())
print(untitled_words.count())

lower_titles = titled_words.map(lambda word: word.lower())
nouns = lower_titles.subtract(untitled_words)
nouns.take(50)


3068
16138


['leo',
 'lucca',
 'july',
 'buonaparte',
 'funke',
 'mary',
 'bolkonskaya',
 'kutuzov',
 'enghien',
 'andrew',
 'boris',
 'michael',
 'conde',
 'jacobin',
 'jaffa',
 'englishman',
 'rostovs',
 'rostova',
 'vasilevich',
 'mimi',
 'ilynichna',
 'orlov',
 'straits',
 'sparrow',
 'dmitrievna',
 'madeira',
 'suvorovs',
 'russians',
 'egyptian',
 'mikhelson',
 'pomerania',
 'stralsund',
 'achilles',
 'frederick',
 'ferdinand',
 'poland',
 'prussians',
 'theresa',
 'denisov',
 'bondarenko',
 'rook',
 'bogdanich',
 'bilibin',
 'franz',
 'auersperg',
 'formio',
 'amelie',
 'bohemian',
 'buxhowden',
 'hark']

# Key/Value pairs in Spark

### Learning activity: WordCount in Spark

Use the functions `flatMap()` and `reduceByKey()` to count the number of occurences of each word in War and Peace, and print the count of five words.

In [27]:
words = data.flatMap(lambda line: get_words(line))
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda c1, c2: c1 + c2)
word_counts.take(10)


[('PEACE', 1),
 ('Leo', 1),
 ('BOOK', 15),
 ('ONE', 2),
 ('1805', 24),
 ('Prince', 1578),
 ('Genoa', 3),
 ('Lucca', 2),
 ('are', 1222),
 ('now', 1158)]

In [26]:
word_pairs.take(10)

[('1869', 2),
 ('WAR', 2),
 ('AND', 2),
 ('PEACE', 2),
 ('by', 2),
 ('Leo', 2),
 ('Tolstoy', 2),
 ('BK1', 2),
 ('BOOK', 2),
 ('ONE', 2)]

### Learning activity: using `groupByKey()`

Reimplement the above word count using `groupByKey()` instead of `reduceByKey()`

In [28]:
word_pairs = words.map(lambda word: (word, 1))
word_pairs.groupByKey().map(lambda pair: (pair[0], len(pair[1]))).take(5)


[('PEACE', 1), ('Leo', 1), ('BOOK', 15), ('ONE', 2), ('1805', 24)]

### Learning activity: computing the average of each key

The pair RDD defined below `word_line_pairs` has an element for each line in War and Peace with as key the first word, and as value the line itself. Use it to compute the average length of each line for each starting word.

In [None]:
word_line_pairs = data.filter(lambda line: len(get_words(line)) > 0).map(lambda line: (get_words(line)[0], line))       

In [84]:
word_line_pairs = data.filter(lambda line: len(get_words(line)) > 0).take(10)[0]

In [125]:
worda = [[('a',1),('b',1)],[('a',10),('b',1)],3,4,5]
wordrdd = sc.parallelize(worda,1)



In [126]:
wordrdd.getNumPartitions()

1

In [127]:
wordrdd.count()

5

In [128]:
wordrdd.first()

[('a', 1), ('b', 1)]

In [129]:
wordrdd.take(1)

[[('a', 1), ('b', 1)]]

In [130]:
wordrdd.take(2)

[[('a', 1), ('b', 1)], [('a', 10), ('b', 1)]]

In [51]:
data.collect()[0:3][0][0]

' '

In [None]:
# Write your answer here
word_line_length = word_line_pairs.map(lambda wl: (wl[0], (len(wl[1]), 1)))  \
                                  .reduceByKey(lambda leftpair, rightpair: (leftpair[0] + rightpair[0], leftpair[1] + rightpair[1]))  \
                                  .map(lambda wordpair: (wordpair[0], wordpair[1][0] / wordpair[1][1]))
word_line_length.take(5) 


In [172]:
a = [('a',1),('b',2),('c',3),('d',4)]
a = [['abcdefghijk','akdkd222kdkdkd','sdkfsdjfsdjfs'],['abc121fghijk','akdkdkdkdkd','sdkfsdjfsdjfs']]
ardd = sc.parallelize(a,1)

In [173]:
ardd.map(lambda w: w[1]).collect()

['akdkd222kdkdkd', 'akdkdkdkdkd']

In [163]:
abc.collect()

['abcd', 'abc1']

In [195]:
# a1 = [('a',1),('b',2),('c',3),('a',4)]
# a1 = [['a',1],('b',2), {'c':3}),('a',4)]

a1 = [{'a':1},{'b':2}, {'c':3},{'a':4}]
a1rdd = sc.parallelize(a1,1)

In [243]:
list(a1[0].items())

[('a', 1)]

In [196]:
a1rdd.collect()

[{'a': 1}, {'b': 2}, {'c': 3}, {'a': 4}]

In [250]:
a2rdd = a1rdd.map( lambda x: x.items())

In [257]:
a2rdd = a1rdd.flatMap( lambda x: list(x.items()))

In [259]:
a2rdd.collect()

[('a', 1), ('b', 2), ('c', 3), ('a', 4)]

In [258]:
a2rdd.reduceByKey(lambda x,y: x+y).collect()

[('a', 5), ('b', 2), ('c', 3)]

In [265]:
a2rdd.groupByKey().map(lambda pair : ( pair[0], list(pair[1]))).collectAsMap().lookup('a')

AttributeError: 'dict' object has no attribute 'lookup'

In [276]:
data.map(lambda x : x.split(' ')).filter(lambda y : len(y[0]) > 0).flatMap(lambda x: (x,1)).collect()

[['BK1'],
 1,
 ['BK1|CH1'],
 1,
 ['Buonapartes.',
  'But',
  'I',
  'warn',
  'you,',
  'if',
  'you',
  "don't",
  'tell',
  'me',
  'that',
  'this',
  'means',
  'war,'],
 1,
 ['if',
  'you',
  'still',
  'try',
  'to',
  'defend',
  'the',
  'infamies',
  'and',
  'horrors',
  'perpetrated',
  'by'],
 1,
 ['that',
  'Antichrist-',
  'I',
  'really',
  'believe',
  'he',
  'is',
  'Antichrist-',
  'I',
  'will',
  'have'],
 1,
 ['nothing',
  'more',
  'to',
  'do',
  'with',
  'you',
  'and',
  'you',
  'are',
  'no',
  'longer',
  'my',
  'friend,',
  'no',
  'longer'],
 1,
 ['my',
  "'faithful",
  "slave,'",
  'as',
  'you',
  'call',
  'yourself!',
  'But',
  'how',
  'do',
  'you',
  'do?',
  'I',
  'see'],
 1,
 ['I',
  'have',
  'frightened',
  'you-',
  'sit',
  'down',
  'and',
  'tell',
  'me',
  'all',
  'the',
  'news."'],
 1,
 ['Pavlovna',
  'Scherer,',
  'maid',
  'of',
  'honor',
  'and',
  'favorite',
  'of',
  'the',
  'Empress',
  'Marya'],
 1,
 ['Fedorovna.',
  'Wit