# Getting started with PySpark

## Apache Spark

Spark is software that is used to process large datasets. Each process is called a task and tasks can be distributed across multiple machines for parallel processing. Spark allows the usage of a bunch of languages to define and issue tasks, including Python, R, SQL and Java but we will be using Scala in this course.

## RDD

Spark works with **Resilient Distributed Datasets** or **RDD** for short. 
* Resilient because once an RDD has been created it cannot be changed, allowing for its quick reuse directly from memory instead of mass storage  (which makes them faster than map-reduce)
* Distributed because they can be  distributed over multiple computers
* Datasets because they hold data

RDDs can be created using the **Spark Context** object `sc`.
* using an already defined object, such as an array, with the function `parallelize`, 2nd argument is the number of workers
    * `rdd = sc.parallelize(0 to 999, 8)`
* using a file, with the function `textFile`, 2nd argument is again the number of workers
    * `sc.textFile("file",3)`

It is important to note most of the time you might not need to actually use the SparkContext but the SparkSession instead. The difference between the two in short:

* SparkSession:
    * for working with high-level APIs such as DataFrame and SQL.
    * Ideal for most data processing tasks because of its simplicity and unified interface.

* SparkContext:
    * low-level RDD operations.
    * Necessary when you need more control over the underlying Spark execution or need to manipulate RDDs directly.

In [12]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName("heySpark").getOrCreate()
spark

In [14]:
print("Spark version: %s" % spark.version)
sc = spark.sparkContext

Spark version: 3.5.5


## Lazy Evaluation
Spark uses lazy evaluation, meaning that execution of functions takes place only when other functions need it.

* for example ```example = sc.parallelize(range(999),8)``` defines how the variable `example` will be created, but it isn't actually evaluated and stored in memory until it is needed somewhere else, for example by the following line ```sample = example.takeSample(false, 4)``` which requires to evaluation of the variable `example` in orderto return samples from it.

In [None]:
rdd = sc.parallelize(range(999), 8) # input array + number of partitions of the new RDD

In [16]:
sample = rdd.takeSample(0, 4)
print(sample)

[227, 549, 267, 710]


## Jobs and stages

Jobs are essentially the queries that users submit when they run a command involving an RDD. Each job is divided into stages, with stages being stored in memory so that if another job requires the same stage it can be swiftly reused without extra computational cost. From the jobs tab you can see stats for each job, such as when it was submitted and how long it took to finish.

Clicking on the job in the Spark UI provides us with a more detailed overview, including a directed acyclic graph showing the functions that comprise the job.


# Loading text data
Romeo & Juliet provided by Project Guthenberg

In [None]:
# !wget --quiet https://raw.githubusercontent.com/rubigdata-dockerhub/hadoop-dockerfile/master/100.txt # Guthenberg data

In [None]:
lines = sc.textFile("data/guthenberg.txt")
print("Guthenberg text file contains:\n" + 
        "%d lines\n" % lines.count() + 
        "%d chars\n" % lines.map( lambda s: len(s) ).reduce(lambda a, b: a + b))

Guthenberg text file contains:
147838 lines
5545144 chars



In [69]:
print("Longest line: %s char" % lines.map( lambda s: len(s) ).reduce(lambda a, b: max(a,b)))

Longest line: 78 char


## Splitting the lines into words

In [None]:
# collect words
words = lines.flatMap(lambda s: s.split()).filter(lambda w: len(w) > 0).map(lambda w: (w, 1))

In [None]:
# reduce words collection to word count
wc = words.reduceByKey(lambda a, b: a + b)
print(wc.take(10))

[Stage 25:>                                                         (0 + 2) / 2]

[('Complete', 4), ('Works', 5), ('of', 16718), ('by', 3064), ('Shakespeare', 18), ('eBook', 8), ('for', 6014), ('use', 286), ('anyone', 6), ('United', 15)]


                                                                                

In [None]:
print(wc.toDebugString().decode("utf-8"))

(2) PythonRDD[53] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[51] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[50] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[49] at reduceByKey at /tmp/ipykernel_36535/1104536454.py:1 []
    |  PythonRDD[48] at reduceByKey at /tmp/ipykernel_36535/1104536454.py:1 []
    |  data/guthenberg.txt MapPartitionsRDD[36] at textFile at NativeMethodAccessorImpl.java:0 []
    |  data/guthenberg.txt HadoopRDD[35] at textFile at NativeMethodAccessorImpl.java:0 []


# Count words
which words Shakespeare used most often?

In [None]:
top10word = wc.takeOrdered(10, key = lambda x: x[0])
top10count = wc.takeOrdered(10, key = lambda x: x[1])
top10countrev = wc.takeOrdered(10, key = lambda x: -x[1])
print(top10word)
print(top10count)
print(top10countrev)

[('"', 241), ('"\'Tis', 1), ('"A', 3), ('"Air,"', 1), ('"Alas,', 1), ('"Amen"', 2), ('"Amen"?', 1), ('"Amen,"', 1), ('"And', 1), ('"Aroint', 1)]
[('Author:', 1), ('[EBook', 1), ('1,', 1), ('2018', 1), ('START', 1), ('KINSMEN', 1), ('Feed’st', 1), ('fuel,', 1), ('cruel:', 1), ('buriest', 1)]
[('the', 25378), ('I', 20629), ('and', 19806), ('to', 16966), ('of', 16718), ('a', 13657), ('my', 11443), ('in', 10519), ('you', 9591), ('is', 8335)]


In [66]:
print(wc.filter(lambda x: x[0] == "Romeo").collect())
print(wc.filter(lambda x: x[0] == "Juliet").collect())


[('Romeo', 45)]
[('Juliet', 17)]


In [67]:
wc.cache()

PythonRDD[53] at RDD at PythonRDD.scala:53

In [68]:
print(wc.filter(lambda x: x[0] == "Macbeth").collect())
print(wc.filter(lambda x: x[0] == "Capulet").collect())

[('Macbeth', 30)]
[('Capulet', 9)]


# Alternative ways to achieve the same result

In [None]:
# method 1 of getting top 10 words
print(wc.map(lambda x: (x[1], x[0])).sortByKey(ascending=False).map(lambda x: (x[1], x[0])).take(10)) # 0.3s according to the Spark UI

[('the', 25378), ('I', 20629), ('and', 19806), ('to', 16966), ('of', 16718), ('a', 13657), ('my', 11443), ('in', 10519), ('you', 9591), ('is', 8335)]


In [None]:
# method 2 of getting top 10 words
print(wc.takeOrdered(10, key = lambda x: -x[1])) # 99ms according to the Spark UI

[('the', 25378), ('I', 20629), ('and', 19806), ('to', 16966), ('of', 16718), ('a', 13657), ('my', 11443), ('in', 10519), ('you', 9591), ('is', 8335)]


In [None]:
# method 3 of getting top 10 words
print(wc.top(10, key = lambda x: x[1])) # 95ms according to the Spark UI

[('the', 25378), ('I', 20629), ('and', 19806), ('to', 16966), ('of', 16718), ('a', 13657), ('my', 11443), ('in', 10519), ('you', 9591), ('is', 8335)]


In [None]:
# method 4 of getting top 10 words
print(wc.sortBy(lambda x: -x[1]).take(10)) # 0.3s according to the Spark UI

[('the', 25378), ('I', 20629), ('and', 19806), ('to', 16966), ('of', 16718), ('a', 13657), ('my', 11443), ('in', 10519), ('you', 9591), ('is', 8335)]


## Save the counted words

The result is multiple files, a result of having multiple workers.

In [83]:
words.saveAsTextFile("data/guthenberg-words")

                                                                                

## Cleaning up text data with regex

MacBeth is actually mentioned many more times than we previously found, however our original way of splitting lines into words did not account for signs such as a fullstop and an exclamation mark.

In [114]:
wc = lines.flatMap(lambda s: s.split()).filter(lambda w: len(w) > 0).map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
wc.filter(lambda x: "macbeth" in x[0].lower()).collect()

                                                                                

[('Macbeth', 30),
 ('Macbeth.', 20),
 ('MACBETH.', 205),
 ('Macbeth,', 16),
 ('Macbeth.]', 1),
 ('Macbeth!', 7),
 ('MACBETH', 2),
 ('MACBETH,', 2),
 ("Macbeth's", 7),
 ('Macbeth;', 2)]

To fix this we can use a regex to filter out any signs outside of the english alphabet (a-z).

In [103]:
import re
words = lines.flatMap(lambda l: l.split(" ")) \
                .map(lambda w: re.sub( r"(^[^a-z]+|[^a-z]+$)", "", w.lower())) \
                .filter(lambda w: len(w) > 0) \
                .map(lambda w: (w, 1)) \
                .reduceByKey(lambda a, b: a + b)

In [104]:
mbcount = words.filter(lambda x: x[0] == "macbeth").collect()
print("Macbeth count: %d" % mbcount[0][1])

[Stage 178:>                                                        (0 + 2) / 2]

Macbeth count: 285


                                                                                