# PySpark

![Logo](https://github.com/pnavaro/big-data/blob/master/notebooks/images/apache_spark_logo.png?raw=1)

- [Apache Spark](https://spark.apache.org) was first released in 2014.
- It was originally developed by [Matei Zaharia](http://people.csail.mit.edu/matei) as a class project, and later a PhD dissertation, at University of California, Berkeley.
- Spark is written in [Scala](https://www.scala-lang.org).
- All images come from [Databricks](https://databricks.com/product/getting-started-guide).

- Apache Spark is a fast and general-purpose cluster computing system.
- It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
- Spark can manage "big data" collections with a small set of high-level primitives like `map`, `filter`, `groupby`, and `join`.  With these common patterns we can often handle computations that are more complex than map, but are still structured.
- It also supports a rich set of higher-level tools including [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) for SQL and structured data processing, [MLlib](https://spark.apache.org/docs/latest/ml-guide.html) for machine learning, [GraphX](https://spark.apache.org/docs/latest/graphx-programming-guide.html) for graph processing, and Spark Streaming.

## Resilient distributed datasets

- The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).
- RDDs behave a bit like Python collections (e.g. lists).
- When working with Apache Spark we iteratively apply functions to every item of these collections in parallel to produce *new* RDDs.
- The data is distributed across nodes in a cluster of computers.
- Functions implemented in Spark can work in parallel across elements of the collection.
- The  Spark framework allocates data and processing to different nodes, without any intervention from the programmer.
- RDDs automatically rebuilt on machine failure.

## Lifecycle of a Spark Program

1. Create some input RDDs from external data or parallelize a collection in your driver program.
2. Lazily transform them to define new RDDs using transformations like `filter()` or `map()`
3. Ask Spark to cache() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and collect() to kick off a parallel computation, which is then optimized and executed by Spark.

## Operations on Distributed Data

- Two types of operations: **transformations** and **actions**
- Transformations are *lazy* (not computed immediately)
- Transformations are executed when an action is run

## [Transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) (lazy)

```
map() flatMap()
filter()
mapPartitions() mapPartitionsWithIndex()
sample()
union() intersection() distinct()
groupBy() groupByKey()
reduceBy() reduceByKey()
sortBy() sortByKey()
join()
cogroup()
cartesian()
pipe()
coalesce()
repartition()
partitionBy()
...
```

## [Actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)

```
reduce()
collect()
count()
first()
take()
takeSample()
saveToCassandra()
takeOrdered()
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
foreach()
```

## Python API

PySpark uses Py4J that enables Python programs to dynamically access Java objects.

![PySpark Internals](https://github.com/pnavaro/big-data/blob/master/notebooks/images/YlI8AqEl.png?raw=1)

## The `SparkContext` class

- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.SparkContext` context.

- Typically, an instance of this object will be created automatically for you and assigned to the variable `sc`.

- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD;
    - normally we would create an RDD from a large file or an HBase table.

## First example

PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. [findspark](https://github.com/minrk/findspark) does the latter.

We have a spark context sc to use with a tiny local spark cluster with 4 nodes (will work just fine on a multicore machine).

In [1]:
import os, sys
sys.executable

'/Library/Frameworks/Python.framework/Versions/3.13/bin/python3'

In [97]:
#os.environ["SPARK_HOME"] = "/opt/spark-3.0.1-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = sys.executable

In [2]:
import pyspark

sc = pyspark.SparkContext(master="local[*]", appName="FirstExample").getOrCreate()
sc.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/18 18:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/18 18:12:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/18 18:12:44 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/11/18 18:12:44 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/11/18 18:12:44 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/11/18 18:12:44 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


In [None]:
#sc.stop()

In [99]:
print(sc) # it is like a Pool Processor executor

<SparkContext master=local[*] appName=FirstExample>


## Create your first RDD

In [100]:
data = list(range(8))
data

[0, 1, 2, 3, 4, 5, 6, 7]

In [101]:

rdd = sc.parallelize(data) # create collection
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7]

### Exercise

Create a file `./Datasets/WhoAreWe.txt` with the text bellow. Read and load it into a RDD with the `textFile` spark function.

The African Institute for Mathematical Sciences (AIMS) is a pan-African network of Centres of Excellence for postgraduate training in mathematical sciences, research, and public engagement in Science, Technology, Engineering, and Mathematics. Founded in 2003 in South Africa by acclaimed physicist Prof Neil Turok and later replicated in Senegal, Ghana, Cameroon and Rwanda, AIMS is leading Africa’s socio-economic transformation through:

Innovative scientific training (the development of human capital);

Technological advances and cutting-edge scientific discoveries; and

Public engagement for the continent’s scientific emergence.

Africa’s youth are at the heart of the AIMS innovation and transformation ecosystem which consists of a set of academic and non-academic programs expertly tailored to provide AIMS learners with a unique postgraduate training experience on the continent.

AIMS offers a Master’s in mathematical sciences, including a co-operative option with a direct link to industry, the African Master’s in Machine Intelligence (AMMI), as well as research programs, with over 100 researchers conducting studies across the network. In addition to the AIMS Industry Initiative and a gender-responsive Teacher Training Program currently implemented in Cameroon and Rwanda, AIMS equally created two critical initiatives: Quantum Leap Africa, a think tank looking into the coming quantum revolution and the Next Einstein Forum to propel Africa on to the global scientific stage. 

Now read the file using 

Read `textFile` from the `Datasets/` folder using the function `textFile`.

In [102]:
# TODO
# rdd =  uncomment and write your code here.
rdd_file = sc.textFile("Datasets/WhoAreWe.txt")

### Collect

Action / To Driver: Return all items in the RDD to the driver in a single list

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/DUO6ygB.png?raw=1)

Source: https://i.imgur.com/DUO6ygB.png

### Exercise

Collect the text you read before from the `WhoAreWe.txt`file.

In [103]:
rdd_file.collect()

['The African Institute for Mathematical Sciences (AIMS) is a pan-African network of Centres of Excellence for postgraduate training in mathematical sciences, research, and public engagement in Science, Technology, Engineering, and Mathematics. Founded in 2003 in South Africa by acclaimed physicist Prof Neil Turok and later replicated in Senegal, Ghana, Cameroon and Rwanda, AIMS is leading Africa’s socio-economic transformation through:',
 '',
 'Innovative scientific training (the development of human capital);',
 '',
 'Technological advances and cutting-edge scientific discoveries; and',
 '',
 'Public engagement for the continent’s scientific emergence.',
 '',
 'Africa’s youth are at the heart of the AIMS innovation and transformation ecosystem which consists of a set of academic and non-academic programs expertly tailored to provide AIMS learners with a unique postgraduate training experience on the continent.',
 '',
 'AIMS offers a Master’s in mathematical sciences, including a co-o

### Map

Transformation / Narrow: Return a new RDD by applying a function to each element of this RDD

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/PxNJf0U.png?raw=1)

Source: http://i.imgur.com/PxNJf0U.png

In [104]:
rdd = sc.parallelize(list(range(8)))
rdd.map(lambda x: x ** 2).collect() # Square each element

                                                                                

[0, 1, 4, 9, 16, 25, 36, 49]

In [105]:
def calculate(x):
    return(x**2)
calculate(4)

16

In [106]:
f = lambda x: x**2
print(f(4))

16


In [107]:
rdd = sc.parallelize(list(range(8)))
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7]

In [108]:
a = list(range(21))
a

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [109]:
list(map(lambda l: l**2, a))

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400]

In [110]:
rdd.getNumPartitions()

4

### Exercise

Replace the lambda function by a function that contains a pause (sleep(1)) and check if the `map` operation is parallelized.

In [73]:
#f = lambda x: 

### Filter

Transformation / Narrow: Return a new RDD containing only the elements that satisfy a predicate

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/GFyji4U.png?raw=1)
Source: http://i.imgur.com/GFyji4U.png

In [111]:
# Select only the even elements
rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6]

In [112]:
list(filter(lambda a: a%2 == 0, a))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

In [113]:
list(filter(lambda a: a%5 == 0, a))

[0, 5, 10, 15, 20]

In [114]:
list(filter(lambda a: a%2 != 0, a))

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

### FlatMap

Transformation / Narrow: Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/TsSUex8.png?raw=1)

In [115]:
rdd = sc.parallelize([1,2,3])
rdd.flatMap(lambda x: (x, x*100, 20)).collect()

[1, 100, 20, 2, 200, 20, 3, 300, 20]

In [116]:
text = 'I am learning big data analysis with python'
len(text)

43

In [117]:
split_text=text.split(" ")

In [118]:
len(text.split(" "))

8

In [119]:
remove_dot = rdd_file.flatMap(lambda x: x.replace(".", " ").split(" "))
remove_dot.collect()

['The',
 'African',
 'Institute',
 'for',
 'Mathematical',
 'Sciences',
 '(AIMS)',
 'is',
 'a',
 'pan-African',
 'network',
 'of',
 'Centres',
 'of',
 'Excellence',
 'for',
 'postgraduate',
 'training',
 'in',
 'mathematical',
 'sciences,',
 'research,',
 'and',
 'public',
 'engagement',
 'in',
 'Science,',
 'Technology,',
 'Engineering,',
 'and',
 'Mathematics',
 '',
 'Founded',
 'in',
 '2003',
 'in',
 'South',
 'Africa',
 'by',
 'acclaimed',
 'physicist',
 'Prof',
 'Neil',
 'Turok',
 'and',
 'later',
 'replicated',
 'in',
 'Senegal,',
 'Ghana,',
 'Cameroon',
 'and',
 'Rwanda,',
 'AIMS',
 'is',
 'leading',
 'Africa’s',
 'socio-economic',
 'transformation',
 'through:',
 '',
 'Innovative',
 'scientific',
 'training',
 '(the',
 'development',
 'of',
 'human',
 'capital);',
 '',
 'Technological',
 'advances',
 'and',
 'cutting-edge',
 'scientific',
 'discoveries;',
 'and',
 '',
 'Public',
 'engagement',
 'for',
 'the',
 'continent’s',
 'scientific',
 'emergence',
 '',
 '',
 'Africa’s',
 

In [83]:
#split_file = rdd_file.flatMap(lambda x : x.split())
#split_file

PythonRDD[10] at RDD at PythonRDD.scala:56

In [84]:
#length = remove_dot.map(lambda x: count(x))
#length

In [85]:
!cat Datasets/WhoAreWe.txt

The African Institute for Mathematical Sciences (AIMS) is a pan-African network of Centres of Excellence for postgraduate training in mathematical sciences, research, and public engagement in Science, Technology, Engineering, and Mathematics. Founded in 2003 in South Africa by acclaimed physicist Prof Neil Turok and later replicated in Senegal, Ghana, Cameroon and Rwanda, AIMS is leading Africa’s socio-economic transformation through:

Innovative scientific training (the development of human capital);

Technological advances and cutting-edge scientific discoveries; and

Public engagement for the continent’s scientific emergence.

Africa’s youth are at the heart of the AIMS innovation and transformation ecosystem which consists of a set of academic and non-academic programs expertly tailored to provide AIMS learners with a unique postgraduate training experience on the continent.

AIMS offers a Master’s in mathematical sciences, including a co-operative option with a direct link to indu

In [None]:
rdd = sc.parallelize(['John', 'Blanche', 'Bob', 'Ben', 'Kenny'])

### Exercise

Use FlatMap to clean the text from `WhoAreWe.txt`file. Lower, remove dots and split into words.

### GroupBy

Transformation / Wide: Group the data in the original RDD. Create pairs where the key is the output of a user function, and the value is all items for which the function yields this key.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/gdj0Ey8.png?raw=1)

In [86]:
rdd = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd = rdd.groupBy(lambda w: w[0])
[(k, list(v)) for (k, v) in rdd.collect()]

[('J', ['John', 'James']), ('F', ['Fred']), ('A', ['Anna'])]

### GroupByKey

Transformation / Wide: Group the values for each key in the original RDD. Create a new pair where the original key corresponds to this collected group of values.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/TlWRGr2.png?raw=1)

In [87]:
rdd = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
rdd = rdd.groupByKey()
[(j[0], list(j[1])) for j in rdd.collect()]

[('B', [5, 4]), ('A', [3, 2, 1])]

### Join

Transformation / Wide: Return a new RDD containing all pairs of elements having the same key in the original RDDs

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/YXL42Nl.png?raw=1)

In [88]:
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
x.join(y).collect()

[('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5))]

### Distinct

Transformation / Wide: Return a new RDD containing distinct items from the original RDD (omitting all duplicates)

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/Vqgy2a4.png?raw=1)

In [89]:
rdd = sc.parallelize([1,2,3,3,4])
rdd.distinct().collect()

[4, 1, 2, 3]

### KeyBy

Transformation / Narrow: Create a Pair RDD, forming one pair for each item in the original RDD. The pair’s key is calculated from the value via a user-supplied function.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/nqYhDW5.png?raw=1)

In [90]:
rdd = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd.keyBy(lambda w: w[0]).collect()

[('J', 'John'), ('F', 'Fred'), ('A', 'Anna'), ('J', 'James')]

## Actions

### Map-Reduce operation

Action / To Driver: Aggregate all the elements of the RDD by applying a user function pairwise to elements and partial results, and return a result to the driver

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/R72uzwX.png?raw=1)

In [91]:
from operator import add
rdd = sc.parallelize(list(range(8)))
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7]

In [92]:
rdd.map(lambda x: x ** 2).reduce(add) # reduce is an action!

140

### Max, Min, Sum, Mean, Variance, Stdev

Action / To Driver: Compute the respective function (maximum value, minimum value, sum, mean, variance, or standard deviation) from a numeric RDD

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/HUCtib1.png?raw=1)

### CountByKey

Action / To Driver: Return a map of keys and counts of their occurrences in the RDD

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/jvQTGv6.png?raw=1)

In [93]:
rdd = sc.parallelize([('J', 'James'), ('F','Fred'),
                    ('A','Anna'), ('J','John')])

rdd.countByKey()

defaultdict(int, {'J': 2, 'F': 1, 'A': 1})

In [94]:
# Stop the local spark cluster
sc.stop()

### Exercise: Word-count in Apache Spark

- Write the sample text file