# Introduction to pySpark
Part of this lecture is based on the material by [Dr. Gregory Watson](https://nyu-cds.github.io/python-itertools/)

Some relevant references:
- Benjamin Bengfort, [Getting Started with Spark (in Python)](https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python)
- [A Hands-on Introduction to MapReduce in Python](https://zettadatanet.wordpress.com/2015/04/04/a-hands-on-introduction-to-mapreduce-in-python)
- Lucas Allen, [Spark Dataframes and MLlib](http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/)
- [Hadoop with Spark](https://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/)

For this class you will need to have spark and pyspark installed. Instructions can be found in:
- [MacOS](https://medium.freecodecamp.org/installing-scala-and-apache-spark-on-mac-os-837ae57d283f)
- [Linux](https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f)
- [Windows](http://changhsinlee.com/install-pyspark-windows-jupyter/)

To use python3 we will need to follow [this](https://gist.github.com/flopezlasanta/d94a6df86f0dd12ca753) instructions and
```shell
export PYSPARK_PYTHON=python3
```

## Hadoop vs Spark
Hadoop and Spark are both projects from [Apache](http://apache.org). 
- __Hadoop__ software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines. The library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
- __Spark__ differs from Hadoop in several aspects, but mainly in that it processes data in RAM using a concept known as a Resilient Distributed Dataset (RDD). Spark is structured around Spark Core, the engine that drives the scheduling, optimizations, and RDD abstraction.

RDDs are essentially a programming abstraction that represents a read-only collection of objects that are partitioned across machines. RDDs are fault tolerant and are accessed via parallel operations.

<img src="interactive_operations_on_mapreduce.jpg" alt="Drawing" style="width: 300px;"/>
<img src="interactive_operations_on_spark_rdd.jpg" alt="Drawing" style="width: 400px;"/>

Because RDDs can be cached in memory, Spark is extremely effective at iterative applications, where the data is being reused throughout the course of an algorithm. Most machine learning and optimization algorithms are iterative, making Spark an extremely effective tool for data science. 

<img src="logistic-regression-hadoopXspark.png" alt="Drawing" style="width: 200px;"/>

The Spark library contains a lot of the application elements that have found their way into most Big Data applications including support for SQL-like querying, machine learning and graph algorithms, and even support for live streaming data.

<img src="sparkstack.png" alt="Drawing" style="width: 200px;"/>


---
## The concept of MapReduce
MapReduce is a framework for processing large data sets in a distributed fashion. The core idea behind MapReduce is mapping your data set into a collection of (key, value) pairs, and then reducing over all pairs with the same key.

The overall concept is simple, but is actually quite expressive when you consider that:
- Almost all data can be mapped into (key, value) pairs somehow, 
- Keys and values may be of any type: strings, integers, dummy types, and, of course, (key, value) pairs themselves

Counting the number of occurrences of words in a text is considered as the “Hello world!” equivalent of MapReduce. A classical way to write such a program is presented in the python script below (you will need [pg2701.txt](https://nyu-cds.github.io/python-bigdata/files/pg2701.txt)).

In [6]:
#####
# count words simpler version
##### 

import re

# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
    line = re.sub(r'^\W+|\W+$', '', line)
    return map(str.lower, re.split(r'\W+', line))
    
sums = {}
try:
    in_file = open('pg2701.txt', 'r')

    for line in in_file:
        for word in splitter(line):
            #word = word.lower()
            sums[word] = sums.get(word, 0) + 1
                 
    in_file.close()

except IOError:
    print("error performing file operation")
else:
    M = max(sums.keys(), key=lambda k: sums[k])
    print("max: %s = %d" % (M, sums[M]))

max: the = 14620


The issue with the code above is that the performance degrades as the size of the dictionary grows. As shown on the diagram below, the number of words processed per second diminishes when the size of the dictionary reaches the size of the processor data cache (note that if the cache is structured in several layers of different speeds, the processing speed will decrease each time the dictionary reaches the size of a layer).

A larger decrease in processing speed occurs when the dictionary reaches the size of the computer’s RAM.

Eventually, if the dictionary continues to grow, it will exceed the capacity of the swap space and an exception will be raised.
<img src="performance.png" alt="Drawing" style="width: 400px;"/>

### Using MapReduce approach
The main advantage of the MapReduce approach is that it does not require a central data structure so the memory issues that occur with the simplistic approch are avoided.

MapReduce consists of 3 steps:

- A mapping step that produces intermediate results and associates them with an output key;
- A shuffling step that groups intermediate results associated with the same output key; and
- A reducing step that processes groups of intermediate results with the same output key.
<img src="mapreducewc.png" alt="Drawing" style="width: 550px;"/>


#### Mapping
The idea is to apply a function to each element of a list and collect the result, essentially the same as the Python _map method_. Using the Python _map_ in the example above would simply result in reading the whole file into memory before performing the map operation, and  this would be no better than the original version. 
<img src="mapping.png" alt="Drawing" style="width: 200px;"/>
Instead, we must perform the map operation using a temporary file (that we will use later), as follows:

In [11]:
import re

# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
    line = re.sub(r'^\W+|\W+$', '', line)
    return map(str.lower, re.split(r'\W+', line))
    
input_file = 'pg2701.txt'
map_file = 'pg2701.txt.map'

# Implement our mapping function
import re
sums = {}
try:
    in_file = open(input_file, 'r')
    out_file = open(map_file, 'w')

    for line in in_file:
        for word in splitter(line):
            out_file.write(word + "\t1\n") # Separate key and value with 'tab'
            
    in_file.close()
    out_file.close()

except IOError:
    print("error performing file operation")

Notice that the file is not read into memory as a whole. The code just read a line at a time, map the words in the line, then read the next line, etc. The resulting file contains lines that look like this:
```shell
moby    1
dick    1
or  1
the 1
whale   1
:
:
```

#### Shuffling
The shuffling step consists of grouping all the intermediate values that have the same key. In  word count example, we want to sort the intermediate key/value pairs on their keys.
<img src="shuffling.png" alt="Drawing" style="width: 120px;"/>

Code below resort to the intermediate _pg2701.txt.map_ filed created in the previous code to avoid memory issues when dealing with larger data sets.

In [15]:
map_file = 'pg2701.txt.map'
sorted_map_file = 'pg2701.txt.map.sorted'

def build_index(filename):
    index = []
    f = open(filename)
    while True:
        offset = f.tell()
        line = f.readline()
        if not line:
            break
        length = len(line)
        col = line.split('\t')[0].strip()
        index.append((col, offset, length))
    f.close()
    index.sort()
    return index

try:
    index = build_index(map_file)
    in_file = open(map_file, 'r')
    out_file = open(sorted_map_file, 'w')
    for col, offset, length in index:
        in_file.seek(offset)
        out_file.write(in_file.read(length))
    in_file.close()
    out_file.close()
except IOError:
    print("error performing file operation")

One might argue that the __build_index()__ method is loading the whole _ pg2701.txt.map_  file in memory, what is similar to the simpler version above. However, the __shuffling__ version can be split into many independent parallel tasks. 

#### Reduce
The reduce step just counts the number of values with the same key. Now that the different values are ordered by keys (i.e., the different words are listed in alphabetic order), it becomes easy to count the number of times each key occurs.
<img src="reducing.png" alt="Drawing" style="width: 200px;"/>

In [16]:
previous = None
M = [None, 0]

def checkmax(key, sum):
    global m, M
    if M[1] < sum:
        M[1] = sum
        M[0] = key

try:
    in_file = open(sorted_map_file, 'r')
    for line in in_file:
        key, value = line.split('\t')
        
        if key != previous:
            if previous is not None:
                checkmax(previous, sum)
            previous = key
            sum = 0
            
        sum += int(value)
        
    checkmax(previous, sum)
    in_file.close()
except IOError:
    print("error performing file operation")
    
print("max: %s = %d" % (M[0], M[1]))

max: the = 14620


---
## Spark Overview
A Spark program typically follows a simple paradigm:
- __A driver is the main program__.
- One or more __workers run code sent by the driver__ on their partitions of the RDD, which is distributed across the cluster.
- __Results are sent back to the driver__ for aggregation or compilation.

Remember that an RDD is a Resilient Distributed Data set, which is essentially a distributed collection of items.

When Spark runs a closure (function) on a worker, any variables used in the closure are copied to that node, but are maintained within the local scope of that closure.

Spark provides __two types of shared variables__ that can be interacted with by all workers in a restricted fashion:

- __Broadcast variables__ are distributed to all workers, but are read-only. These variables can be used as lookup tables or stopword lists.
- __Accumulators__ are variables that workers can “add” to using associative operations and are typically used as counters.

### Spark Execution
Spark applications are run as independent sets of processes, coordinated by a SparkContext in the driver program. The context will connect to some cluster manager which allocates system resources.

Each worker in the cluster is managed by an executor, which is in turn managed by the SparkContext. The executor manages computation as well as storage and caching on each machine.

<img src="cluster.png" alt="Drawing" style="width: 400px;"/>

What is important to note is that:
- Application code is sent from the driver to the executors, which specify the context and the various tasks to be run.
- The executors communicate back and forth with the driver for data sharing or for interaction.
- Drivers are key participants in Spark jobs, and therefore, they should be on the same network as the cluster.

### MapReduce with Spark
To start using Spark, we have to create an RDD. The SparkContext provides a number of methods to do this. We will use the textFile method, which reads a file an creates an RDD of strings, one for each line in the file.

In [1]:
from pyspark import SparkContext

sc = SparkContext("local", "wordcount")

text = sc.textFile('pg2701.txt')

print(text.getNumPartitions())

# Running this program display the first 10 entries in the RDD:
print(text.take(10))

2


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:844)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


We use the same splitter function we used previously to split lines correctly. The flatMap method applies the function to all elements of the RDD and flattens the results into a single list of words.

In [1]:
from pyspark import SparkContext
import re

# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
    line = re.sub(r'^\W+|\W+$', '', line)
    return map(str.lower, re.split(r'\W+', line))

sc = SparkContext("local", "wordcount")

# creating a RDD from pg2701.txt
text = sc.textFile('pg2701.txt')

# creating a list of words in the RDD
words = text.flatMap(splitter)
print(words.take(10))
print(10*'---')

#mapping the list of words in a tuple
words_mapped = words.map(lambda x: (x,1))
print(words_mapped.take(10))
print(10*'---')

# sortByKey() expects that the input RDD contains tuples of the form (<key>,<value>). 
# Create a new RDD containing a tuple for each unique value of <key> in the input, 
# where the value in the second position of the tuple is created by applying the 
# supplied lambda function to the <value>s with the matching <key> in the input RDD
counts = words_mapped.reduceByKey(lambda x,y:x+y)
print(counts.take(10))
print(10*'---')

# swapping the first and second elements of each tuple
counts_swap = counts.map(lambda x:(x[1],x[0]))
print(counts_swap.take(10))
print(10*'---')

# sorting by the value
print(counts_swap.sortByKey(ascending=False).take(10))

['the', 'project', 'gutenberg', 'ebook', 'of', 'moby', 'dick', 'or', 'the', 'whale']
------------------------------
[('the', 1), ('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1), ('moby', 1), ('dick', 1), ('or', 1), ('the', 1), ('whale', 1)]
------------------------------
[('the', 14620), ('project', 91), ('gutenberg', 94), ('ebook', 10), ('of', 6732), ('moby', 89), ('dick', 89), ('or', 797), ('whale', 1233), ('by', 1226)]
------------------------------
[(14620, 'the'), (91, 'project'), (94, 'gutenberg'), (10, 'ebook'), (6732, 'of'), (89, 'moby'), (89, 'dick'), (797, 'or'), (1233, 'whale'), (1226, 'by')]
------------------------------
[(14620, 'the'), (6732, 'of'), (6502, 'and'), (4799, 'a'), (4706, 'to'), (4231, 'in'), (3235, ''), (3100, 'that'), (2536, 'it'), (2530, 'his')]


In [2]:
!export PYSPARK_PYTHON=python3