<a href="https://colab.research.google.com/github/besherh/BigDataManagement/blob/main/SparkNotebooks/PySpark_Trasformation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark

![Logo](https://github.com/pnavaro/big-data/blob/master/notebooks/images/apache_spark_logo.png?raw=1)

- [Apache Spark](https://spark.apache.org) was first released in 2014. 
- It was originally developed by [Matei Zaharia](http://people.csail.mit.edu/matei) as a class project, and later a PhD dissertation, at University of California, Berkeley.
- Spark is written in [Scala](https://www.scala-lang.org).
- All images come from [Databricks](https://databricks.com/product/getting-started-guide).

- Apache Spark is a fast and general-purpose cluster computing system. 
- It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
- Spark can manage "big data" collections with a small set of high-level primitives like `map`, `filter`, `groupby`, and `join`.  With these common patterns we can often handle computations that are more complex than map, but are still structured.
- It also supports a rich set of higher-level tools including [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) for SQL and structured data processing, [MLlib](https://spark.apache.org/docs/latest/ml-guide.html) for machine learning, [GraphX](https://spark.apache.org/docs/latest/graphx-programming-guide.html) for graph processing, and Spark Streaming.

## Resilient distributed datasets

- The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).
- RDDs behave a bit like Python collections (e.g. lists).
- When working with Apache Spark we iteratively apply functions to every item of these collections in parallel to produce *new* RDDs.
- The data is distributed across nodes in a cluster of computers.
- Functions implemented in Spark can work in parallel across elements of the collection.
- The  Spark framework allocates data and processing to different nodes, without any intervention from the programmer.
- RDDs automatically rebuilt on machine failure.

## Lifecycle of a Spark Program

1. Create some input RDDs from external data or parallelize a collection in your driver program.
2. Lazily transform them to define new RDDs using transformations like `filter()` or `map()`
3. Ask Spark to cache() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and collect() to kick off a parallel computation, which is then optimized and executed by Spark.

## Operations on Distributed Data

- Two types of operations: **transformations** and **actions**
- Transformations are *lazy* (not computed immediately) 
- Transformations are executed when an action is run

## [Transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) (lazy)

```
map() flatMap()
filter() 
mapPartitions() mapPartitionsWithIndex() 
sample()
union() intersection() distinct()
groupBy() groupByKey()
reduceBy() reduceByKey()
sortBy() sortByKey()
join()
cogroup()
cartesian()
pipe()
coalesce()
repartition()
partitionBy()
...
```

## [Actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)

```
reduce()
collect()
count()
first()
take()
takeSample()
saveToCassandra()
takeOrdered()
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
foreach()
```

## Python API

PySpark uses Py4J that enables Python programs to dynamically access Java objects.

![PySpark Internals](https://github.com/pnavaro/big-data/blob/master/notebooks/images/YlI8AqEl.png?raw=1)

## The `SparkContext` class

- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.SparkContext` context.

- Typically, an instance of this object will be created automatically for you and assigned to the variable `sc`.

- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD;
    - normally we would create an RDD from a large file or an HBase table.

## First example

PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. [findspark](https://github.com/minrk/findspark) does the latter.

We have a spark context sc to use with a tiny local spark cluster with 4 nodes (will work just fine on a multicore machine).

In [6]:
import os, sys
sys.executable

'/usr/bin/python3'

In [7]:
!apt-get install openjdk-8-jdk-headless


Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-470
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 39 not upgraded.
Need to get 36.5 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u312-b07-0ubuntu1~18.04 [28.2 MB]
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jdk-headless amd64 8u312-b07-0ubuntu1~18.

In [8]:
!wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf /content/spark-3.2.1-bin-hadoop2.7.tgz


--2022-02-27 22:53:54--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 272637746 (260M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop2.7.tgz’


2022-02-27 22:54:04 (26.8 MB/s) - ‘spark-3.2.1-bin-hadoop2.7.tgz’ saved [272637746/272637746]



In [9]:
!pip install -q findspark



In [10]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

In [11]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop2.7'

In [12]:
import pyspark

sc = pyspark.SparkContext(master="local[*]", appName="FirstExample")
sc.setLogLevel("ERROR")

In [13]:
print(sc) # it is like a Pool Processor executor

<SparkContext master=local[*] appName=FirstExample>


## Create your first RDD

In [14]:
data = list(range(8))
rdd = sc.parallelize(data) # create collection
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

### Exercise

Create a file `sample.txt`with lorem package. Read and load it into a RDD with the `textFile` spark function.

In [18]:
#you could read more about Faker here
#https://faker.readthedocs.io/en/master/providers/faker.providers.lorem.html
!pip install Faker


Collecting Faker
  Downloading Faker-13.2.0-py3-none-any.whl (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 5.1 MB/s 
Installing collected packages: Faker
Successfully installed Faker-13.2.0


In [59]:
from faker import Faker
fake = Faker()
Faker.seed(0)

with open("sample.txt","w") as f:
    f.write(fake.text(max_nb_chars=1000))
    
rdd_from_file = sc.textFile("sample.txt")

### Collect

Action / To Driver: Return all items in the RDD to the driver in a single list

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/DUO6ygB.png?raw=1)

Source: https://i.imgur.com/DUO6ygB.png

# Exercise 

Collect the text you read before from the `sample.txt`file.

In [60]:
print(rdd_from_file.collect())


['American whole magazine truth stop whose. On traditional measure example sense peace. Would mouth relate own chair.', 'Together range line beyond. First policy daughter need kind miss.', 'Trouble behavior style report size personal partner. During foot that course nothing draw.', 'Language ball floor meet usually board necessary. Natural sport music white.', 'Onto knowledge other his offer face country. Almost wonder employee attorney. Theory type successful together. Raise study modern miss dog Democrat quickly.', 'Every manage political record word group food break. Picture suddenly drug rule bring determine some forward. Beyond chair recently and.', 'Own available buy country store build before. Already against which continue. Look road article quickly.', 'Per structure attorney author feeling job. Mean always beyond write. Employee toward like total now.', 'Small citizen class morning. Others kind company likely.']


### Map

Transformation / Narrow: Return a new RDD by applying a function to each element of this RDD

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/PxNJf0U.png?raw=1)

Source: http://i.imgur.com/PxNJf0U.png

In [61]:
rdd1 = sc.parallelize(list(range(8)))
rdd1.map(lambda x: x ** 2).collect() # Square each element

[0, 1, 4, 9, 16, 25, 36, 49]

### Filter

Transformation / Narrow: Return a new RDD containing only the elements that satisfy a predicate

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/GFyji4U.png?raw=1)
Source: http://i.imgur.com/GFyji4U.png

In [62]:
# Select only the even elements
rdd1.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6]

### FlatMap

Transformation / Narrow: Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/TsSUex8.png?raw=1)

In [63]:
rdd2 = sc.parallelize([1,2,3])
rdd2.flatMap(lambda x: (x, x*100, 42)).collect()

[1, 100, 42, 2, 200, 42, 3, 300, 42]

### Exercise

Use FlatMap to clean the text from `sample.txt`file. Lower, remove dots and split into words.



In [56]:
#hint use 
def clean_row(line):
    output = line.lower()
    output = output.replace('.', ' ')
    output = output.split(' ')
    return output

rdd_from_file.flatMap(clean_row).collect()


['american',
 'whole',
 'magazine',
 'truth',
 'stop',
 'whose',
 '',
 'on',
 'traditional',
 'measure',
 'example',
 'sense',
 'peace',
 '',
 'would',
 'mouth',
 'relate',
 'own',
 'chair',
 '',
 'together',
 'range',
 'line',
 'beyond',
 '',
 'first',
 'policy',
 'daughter',
 'need',
 'kind',
 'miss',
 '',
 'trouble',
 'behavior',
 'style',
 'report',
 'size',
 'personal',
 'partner',
 '',
 'during',
 'foot',
 'that',
 'course',
 'nothing',
 'draw',
 '',
 'language',
 'ball',
 'floor',
 'meet',
 'usually',
 'board',
 'necessary',
 '',
 'natural',
 'sport',
 'music',
 'white',
 '',
 'onto',
 'knowledge',
 'other',
 'his',
 'offer',
 'face',
 'country',
 '',
 'almost',
 'wonder',
 'employee',
 'attorney',
 '',
 'theory',
 'type',
 'successful',
 'together',
 '',
 'raise',
 'study',
 'modern',
 'miss',
 'dog',
 'democrat',
 'quickly',
 '',
 'every',
 'manage',
 'political',
 'record',
 'word',
 'group',
 'food',
 'break',
 '',
 'picture',
 'suddenly',
 'drug',
 'rule',
 'bring',
 'deter

### GroupBy

Transformation / Wide: Group the data in the original RDD. Create pairs where the key is the output of a user function, and the value is all items for which the function yields this key.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/gdj0Ey8.png?raw=1)

In [64]:
rdd3 = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd3 = rdd3.groupBy(lambda w: w[0])
[(k, list(v)) for (k, v) in rdd3.collect()]

[('J', ['John', 'James']), ('F', ['Fred']), ('A', ['Anna'])]

### GroupByKey

Transformation / Wide: Group the values for each key in the original RDD. Create a new pair where the original key corresponds to this collected group of values.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/TlWRGr2.png?raw=1)

In [65]:
rdd4 = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
rdd4 = rdd4.groupByKey()
[(j[0], list(j[1])) for j in rdd4.collect()]

[('B', [5, 4]), ('A', [3, 2, 1])]

### Join

Transformation / Wide: Return a new RDD containing all pairs of elements having the same key in the original RDDs

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/YXL42Nl.png?raw=1)

In [66]:
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
x.join(y).collect()

[('b', (2, 5)), ('a', (1, 3)), ('a', (1, 4))]

### Distinct

Transformation / Wide: Return a new RDD containing distinct items from the original RDD (omitting all duplicates)

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/Vqgy2a4.png?raw=1)

In [67]:
rdd5 = sc.parallelize([1,2,3,3,4])
rdd5.distinct().collect()

[2, 4, 1, 3]

### KeyBy

Transformation / Narrow: Create a Pair RDD, forming one pair for each item in the original RDD. The pair’s key is calculated from the value via a user-supplied function.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/nqYhDW5.png?raw=1)

In [68]:
rdd6 = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd6.keyBy(lambda w: w[0]).collect()

[('J', 'John'), ('F', 'Fred'), ('A', 'Anna'), ('J', 'James')]

## Actions

### Map-Reduce operation 

Action / To Driver: Aggregate all the elements of the RDD by applying a user function pairwise to elements and partial results, and return a result to the driver

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/R72uzwX.png?raw=1)

In [69]:
from operator import add
rdd7 = sc.parallelize(list(range(8)))
rdd7.map(lambda x: x ** 2).reduce(add) # reduce is an action!

140

### CountByKey

Action / To Driver: Return a map of keys and counts of their occurrences in the RDD

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/jvQTGv6.png?raw=1)

In [70]:
rdd = sc.parallelize([('J', 'James'), ('F','Fred'), 
                    ('A','Anna'), ('J','John')])

rdd.countByKey()

defaultdict(int, {'A': 1, 'F': 1, 'J': 2})

In [None]:
# Stop the local spark cluster
sc.stop()

### ExerciseWord-count in Apache Spark

- Write the sample text file

- Create the rdd with `SparkContext.textFile method`
- lower, remove dots and split using `rdd.flatMap`
- use `rdd.map` to create the list of key/value pair (word, 1)
- `rdd.reduceByKey` to get all occurences
- `rdd.takeOrdered`to get sorted frequencies of words

All documentation is available [here](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.SparkContext) for textFile and [here](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.RDD) for RDD. 

For a global overview see the Transformations section of the [programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
