<a href="https://colab.research.google.com/github/jalorenzo/SparkNotebookColab/blob/master/BDF_06_Persistence_and_Partitioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#00 - Configuration of Apache Spark on Collaboratory


###Installing Java, Spark, and Findspark


---


This code installs Apache Spark 3.0.1, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget  http://apache.osuosl.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz  
!tar xf spark-3.3.1-bin-hadoop3.tgz  
!rm spark-3.3.1-bin-hadoop3.tgz    
!pip install -q findspark

--2021-11-26 10:50:12--  http://apache.osuosl.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
Resolving apache.osuosl.org (apache.osuosl.org)... 64.50.233.100, 64.50.236.52, 140.211.166.134, ...
Connecting to apache.osuosl.org (apache.osuosl.org)|64.50.233.100|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300965906 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.0-bin-hadoop3.2.tgz’


2021-11-26 10:51:40 (3.27 MB/s) - ‘spark-3.2.0-bin-hadoop3.2.tgz’ saved [300965906/300965906]



### Set Environment Variables
Set the locations where Spark and Java are installed.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark/"
os.environ["DRIVE_DATA"] = "/content/gdrive/My Drive/Enseignement/2022-2023/ING3/HPDA/BigDataFrameworks/data/"

!rm /content/spark
!ln -s /content/spark-3.3.1-bin-hadoop3 /content/spark
!export SPARK_HOME=/content/spark
!export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
!echo $SPARK_HOME
!env |grep  "DRIVE_DATA"

rm: cannot remove '/content/spark': No such file or directory
/content/spark/
DRIVE_DATA=/content/gdrive/My Drive/Enseignement/2021-2022/ING3/BDA/BigDataFrameworks/data/


### Start a SparkSession
This will start a local Spark session.

In [1]:
!python -V

#import findspark
#findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Example: shows the PySpark version
print("PySpark version {0}".format(sc.version))

# Example: parallelise an array and show the 2 first elements
sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)

Python 3.9.2


/usr/local/lib/python3.9/dist-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/26 02:51:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/26 02:51:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
PySpark version 3.3.1


                                                                                

[2, 3]

In [7]:
from pyspark.sql import SparkSession
# We create a SparkSession object (or we retrieve it if it is already created)
spark = SparkSession \
.builder \
.appName("My application") \
.config("spark.some.config.option", "some-value") \
.master("local[4]") \
.getOrCreate()
# We get the SparkContext
sc = spark.sparkContext

In [3]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

ModuleNotFoundError: No module named 'google'


---


# 06 - Persistence and Partitioning

We will show here two important aspects of Apache Spark

- `Persistence`: how to store DataFrames and RDDs in a way so that they do not need to be recalculated
- `Partitioning`:  how to specify and change the partitions of a DataFrame or RDD

## Persistence

Issues when reusing an RDD several times:

-   Spark recalculates the RDD as well as its dependencies every time an action is executed
-   Very costly (particularly in iterative problems)

Solution

-   Keep the RDD in memory and/or disk
-   Use `cache()` or `persist()` methods

### Persistence levels (as defined in [`pyspark.StorageLevel`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html#pyspark.StorageLevel) and [`org.apache.spark.storage.StorageLevel`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.storage.StorageLevel))
 Level                | Space used  | CPU time     | Memory/Disk   | Comments
 :------------------: | :------:    | :-----:      | :-------------: | ------------------
 MEMORY_ONLY          |   High      |   Low        |     Memory   | Stores the RDD in the JVM as a non-serialised Java object. If the RDD does not fit in memory, some partitions will not be cached in memory and will be recalculated on the fly every time they are required. Default level in Java and Scala.
 MEMORY_ONLY_SER      |   Low       |   High       |     Memory   | Stores the RDD as a serialised Java object (a *byte array* per partition). Default level in Python, using [`pickle`](http://docs.python.org/2/library/pickle.html).
 MEMORY_AND_DISK      |   High      |   Medium     |     Both     | Stores the RDD in the JVM as a non-serialised Java object. If the RDD does not fit in memory, the partitions that do not fit will be spilled to disk and read from there every time they are required.
 MEMORY_AND_DISK_SER  |   Low       |   High       |     Both     | Similar to MEMORY_AND_DISK but using serialised objects.
 DISK_ONLY            |   Low       |   High       |     Disk     | Stores the RDD partitions only on disk.
 OFF_HEAP             |   Low       |   High       |   Memory     | Stores the serialised RDD using *off-heap* memory (outside the JVM's heap) which can reduce the overhead of the garbage collector.
   


    
### Persistence levels

-   In Scala and Java, the default level is MEMORY\_ONLY

-   In Python, data are always serialised (by default as *pickled* objects)

    - MEMORY_ONLY and MEMORY_AND_DISK levels are equivalent to MEMORY_ONLY_SER and MEMORY_AND_DISK_SER
    - When creating the SparkContext it is possible to request a serialisation [`marshal`](https://docs.python.org/2/library/marshal.html#module-marshal) 
    
```python
sc = SparkContext(master="local", appName="My app", serializer=pyspark.MarshalSerializer())
```
    
### Fault tolerance

-   If a node with stored data fails, the RDD is recomputed 

    -   Adding `_2` to the persistence level, 2 copies of the RDD are stored
        
### Cache management

-   LRU algorithm to manage the cache memory

    -   For *only memory* levels, the old RDDs are deleted and recalculated
    -   For *memory and disk* levels, partitions that do not fit in memory are spilled to disk


### Persistence with DataFrames

In [8]:
import os
os.environ["DRIVE_DATA"] = "./data/"

dfFlightsData = (spark
    .read
    .option("inferSchema", "true")
    .option("header", "true")
    .csv(os.environ["DRIVE_DATA"] + "2015-summary.csv"))
print("Cached: {0}".format(dfFlightsData.is_cached))
print("Level without persistence: {0}".format(dfFlightsData.storageLevel))

Cached: False
Level without persistence: Serialized 1x Replicated


In [9]:
dfFlightsData.cache()
print("Cached: {0}".format(dfFlightsData.is_cached))
print("Persistence level by default: {0}".format(dfFlightsData.storageLevel))

Cached: True
Persistence level by default: Disk Memory Deserialized 1x Replicated


In [10]:
# To chanche the persistence level, we need first to remove it from cache
dfFlightsData.unpersist()

from pyspark import StorageLevel
dfFlightsData.persist(StorageLevel.MEMORY_ONLY_2)
print("Cached: {0}".format(dfFlightsData.is_cached))
print("New persistence level: {0}".format(dfFlightsData.storageLevel))

Cached: True
New persistence level: Memory Serialized 2x Replicated


In [11]:
# Persistence is not inherited in transformations
dfData2 = dfFlightsData.select("DEST_COUNTRY_NAME")
dfData2.show()
print("Cached: {0}".format(dfData2.is_cached))


22/12/26 02:52:17 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/12/26 02:52:17 WARN BlockManager: Block rdd_26_0 replicated to only 0 peer(s) instead of 1 peers
+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|       United States|
|       United States|
|       United States|
|               Egypt|
|       United States|
|       United States|
|       United States|
|          Costa Rica|
|             Senegal|
|             Moldova|
|       United States|
|       United States|
|              Guyana|
|               Malta|
|            Anguilla|
|             Bolivia|
|       United States|
|             Algeria|
|Turks and Caicos ...|
|       United States|
+--------------------+
only showing top 20 rows

Cached: False


### Persistence with RDDs

In [12]:
rdd = sc.parallelize(range(1000), 10)
print("Cached: {0}".format(rdd.is_cached))
print("Level without persistence: {0}".format(rdd.getStorageLevel()))

Cached: False
Level without persistence: Serialized 1x Replicated


In [13]:
rdd.cache()
print("Cached: {0}".format(rdd.is_cached))
print("Default persistence level: {0}".format(rdd.getStorageLevel()))

Cached: True
Default persistence level: Memory Serialized 1x Replicated


In [14]:
# Take rdd out of the cache memory
rdd.unpersist() 
print("Cached: {0}".format(rdd.is_cached))
print("Default persistence level: {0}".format(rdd.getStorageLevel()))

from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
print("Cached: {0}".format(rdd.is_cached))
print("New persistence level: {0}".format(rdd.getStorageLevel()))

Cached: False
Default persistence level: Serialized 1x Replicated
Cached: True
New persistence level: Disk Memory Serialized 2x Replicated


In [15]:
# Persistence is not inherited in transformations
rdd2 = rdd.map(lambda x: x*x)
print("Cached: {0}".format(rdd2.is_cached))

Cached: False


### Checkpointing with RDDs
RDDs can be checkpointed, forcing them to be stored on disk.

- I is a *lazy* operation: data are not stored on disk until an Action is dispatched
- Future references to those RDDs will load them from disk instead of recomputing them


In [16]:
!mkdir -p "$DRIVE_DATA"/CP
!ls "$DRIVE_DATA"
#just in case...
#rm -rf "$DRIVE_DATA"/CP/*

2015-summary.csv       cite75_99.txt.tar.bz2   italianPosts.csv.bz2
CP		       country_codes.txt       myscript.py
apat63_99.txt	       dfSE-partition.parquet  people.txt
apat63_99.txt.tar.bz2  dfSE.json	       quijote.txt
books		       dfSE.parquet	       sw.txt
by-day		       dfSE2.parquet	       syslog
cite75_99.txt	       italianPosts.csv


In [17]:
rdd = sc.parallelize(range(100000))
spark.sparkContext.setCheckpointDir(os.environ["DRIVE_DATA"] + "CP")
rdd.checkpoint()

In [18]:
!ls -lR "$DRIVE_DATA"CP/

./data/CP/:
total 0
drwxr-xr-x 1 root root 4096 Dec 26 02:52 ad9c2208-0254-48f8-90ba-5427572bcaa2

./data/CP/ad9c2208-0254-48f8-90ba-5427572bcaa2:
total 0


In [19]:
rdd.count()
!ls -lR "$DRIVE_DATA"CP/

                                                                                

./data/CP/:
total 0
drwxr-xr-x 1 root root 4096 Dec 26 02:52 ad9c2208-0254-48f8-90ba-5427572bcaa2

./data/CP/ad9c2208-0254-48f8-90ba-5427572bcaa2:
total 0
drwxr-xr-x 1 root root 4096 Dec 26 02:52 rdd-34

./data/CP/ad9c2208-0254-48f8-90ba-5427572bcaa2/rdd-34:
total 384
-rw-r--r-- 1 root root 37646 Dec 26 02:52 part-00000
-rw-r--r-- 1 root root 37902 Dec 26 02:52 part-00001
-rw-r--r-- 1 root root 37902 Dec 26 02:52 part-00002
-rw-r--r-- 1 root root 37902 Dec 26 02:52 part-00003
-rw-r--r-- 1 root root 37902 Dec 26 02:52 part-00004
-rw-r--r-- 1 root root 56830 Dec 26 02:52 part-00005
-rw-r--r-- 1 root root 62902 Dec 26 02:52 part-00006
-rw-r--r-- 1 root root 62902 Dec 26 02:52 part-00007


In [20]:
!rm -rf "$DRIVE_DATA"CP/
!ls -lR "$DRIVE_DATA"CP/

ls: cannot access './data/CP/': No such file or directory


## Partitioning

The number of partitions is a function of the cluster size or the number of blocks of the HDFS file

-   It can be adjusted when creating or operating on an RDD
    
    - RDDs offer a greater control on their partitioning 

-   For DataFrames it is possible to modify it once created.

-   The parallelism of RDDs derived from other ones depends on their parent's. 

-   Useful properties:
    -    `spark.default.parallelism` For RDDs, numbre of partitions by default returned by default by transformations like parallelize, join and reduceByKey
        - Fixed value for a SparkContext
        - The property `sc.defaultParallelism` indicates its value
    -    `spark.sql.shuffle.partitions` For DataFrames, number of partitions to use when using data in *wide* transformations
        - It can be modified using `spark.conf.set`

- Useful functions:
    -   `rdd.getNumPartitions()` returns the number of partitions of the RDD
    -   `rdd.glom()` returns a new RDD joining the elements on each partition into a list

    - `repartition(n)` returns a new DataFrame or RDD with exactly `n` partitions
    - `coalesce(n)` optimised version of `repartition`, allows avoiding data movement
        - But only if you are decreasing the number of partitions.
    - `partitionBy(n,[partitionFunc])` Partitioning by key, using a partitioning function (by default, a hash of the key)
        - Only for key/value RDDs
        - Ensures that pairs with the same key go to the same partition



### Partitions and RDDs

In [21]:
print("Number of partitions by default for RDDs: {0}"
       .format(sc.defaultParallelism))
rdd = sc.parallelize([1, 2, 3, 4, 2, 4, 1], 2)
pairs = rdd.map(lambda x: (x, x*x))

print("RDD pairs = {0}".format(pairs.collect()))
print("Pairs partitioning: {0}".format(pairs.glom().collect()))
print("Number of pair partitions = {0}".format(pairs.getNumPartitions()))

Number of partitions by default for RDDs: 8
RDD pairs = [(1, 1), (2, 4), (3, 9), (4, 16), (2, 4), (4, 16), (1, 1)]




Pairs partitioning: [[(1, 1), (2, 4), (3, 9)], [(4, 16), (2, 4), (4, 16), (1, 1)]]
Number of pair partitions = 2


                                                                                

In [22]:
# Reduction keeping the number of partitions
from operator import add
print("Reduction keeping partitions: {0}".format(
        pairs.reduceByKey(add).glom().collect()))

Reduction keeping partitions: [[(2, 8), (4, 32)], [(1, 2), (3, 9)]]


In [23]:
# Reduction modifying the number of partitions
print("Reduction with 3 partitions: {0}".format(
       pairs.reduceByKey(add, 3).glom().collect()))

Reduction with 3 partitions: [[(3, 9)], [(1, 2), (4, 32)], [(2, 8)]]


In [24]:
# Repartitions example
pairs4 = pairs.repartition(4)
print("pairs4 with {0} partitions: {1}".format(
        pairs4.getNumPartitions(),
        pairs4.glom().collect()))

pairs4 with 4 partitions: [[], [(4, 16), (2, 4), (4, 16), (1, 1)], [], [(1, 1), (2, 4), (3, 9)]]


                                                                                

In [25]:
# Coalesce example
pairs2 = pairs4.coalesce(2)
print("pairs2 with {0} partitions: {1}".format(
        pairs2.getNumPartitions(),
        pairs2.glom().collect()))

pairs2 with 2 partitions: [[(4, 16), (2, 4), (4, 16), (1, 1)], [(1, 1), (2, 4), (3, 9)]]


In [26]:
# Partitioning by key
pairs_key = pairs2.partitionBy(4)
print("Partitions by key ({0} partitions): {1}".format(
        pairs_key.getNumPartitions(),
        pairs_key.glom().collect())) 

Partitions by key (4 partitions): [[(4, 16), (4, 16)], [(1, 1), (1, 1)], [(2, 4), (2, 4)], [(3, 9)]]


In [27]:
# Using a partitioning function
def partitionEvenOdd(key):
    if key%2:
        return 0  # Odd keys go to partition 0
    else:
        return 1  # Even keys go to partition 1
        
pairs_evenodd = pairs2.partitionBy(2, partitionEvenOdd)
print("Partition by key ({0} partitions): {1}".format(
        pairs_evenodd.getNumPartitions(),
        pairs_evenodd.glom().collect()))

Partition by key (2 partitions): [[(1, 1), (1, 1), (3, 9)], [(4, 16), (2, 4), (4, 16), (2, 4)]]


### Partitions and DataFrames

In [28]:
# Convert the RDD to a DataFrame
dfPairs = pairs.toDF()
dfPairs.show()

+---+---+
| _1| _2|
+---+---+
|  1|  1|
|  2|  4|
|  3|  9|
|  4| 16|
|  2|  4|
|  4| 16|
|  1|  1|
+---+---+



In [29]:
# The DataFrame inherits the number of partitions from the RDD
print("Number of partitions of the DataFrame: {0}"
      .format(dfPairs.rdd.getNumPartitions()))

Number of partitions of the DataFrame: 2


In [30]:
# A narrow transformation keeps the number of partitions
print("Number of partitions after a narrow transformation: {0}"
      .format(dfPairs.replace(1, 2).rdd.getNumPartitions()))

Number of partitions after a narrow transformation: 2


In [37]:
dfPairs.replace(1, 2).show()

+---+---+
| _1| _2|
+---+---+
|  2|  2|
|  2|  4|
|  3|  9|
|  4| 16|
|  2|  4|
|  4| 16|
|  2|  2|
+---+---+



In [38]:
# A wide transformation does not keep the number of partitions
print("Number of partitions after a wide transformation: {0}"
      .format(dfPairs.sort("_1").rdd.getNumPartitions()))

Number of partitions after a wide transformation: 1


In [39]:
# It is possibe to specify the number of partitions to use in the wide transformation
spark.conf.set("spark.sql.shuffle.partitions", 2)
print("Number of partitions after a wide transformation: {0}"
      .format(dfPairs.sort("_1").rdd.getNumPartitions()))

Number of partitions after a wide transformation: 1


## Working at partition level

A `map`  operation is applied to each element of the RDD (or a `foreach` for each row of the DataFrame)

-  It may imply redundant operations (f.ex. opening a connection to a DB)

-  It may not be very efficient 

`map` and `foreach` can be called once per partition:

-   Methods `mapPartitions()`, `mapPartitionsWithIndex()` and `foreachPartition()`



In [42]:
nums = sc.parallelize([1,2,3,4,5,6,7,8,9], 4)
print(nums.glom().collect())

def addAndCount(iterator):
    addCount = [0,0]
    for i in iterator:
        addCount[0] += i
        addCount[1] += 1
    return addCount

# Call the addAndCount function once per partition
# The iterator includes the values of the partition
print(nums.mapPartitions(addAndCount).glom().collect())

[[1, 2], [3, 4], [5, 6], [7, 8, 9]]
[[3, 2], [7, 2], [11, 2], [24, 3]]


In [46]:
def addAndCountIndex(index, iterator):
    return "Partition "+str(index), addAndCount(iterator)

# index is the number of partition
print(nums.mapPartitionsWithIndex(addAndCountIndex).glom().collect())
print(nums.mapPartitionsWithIndex(addAndCountIndex).glom().collect())

[['Partition 0', [3, 2]], ['Partition 1', [7, 2]], ['Partition 2', [11, 2]], ['Partition 3', [24, 3]]]


In [34]:
import os
import tempfile

def f(iterator):
    tempfich, tempname = tempfile.mkstemp(dir=tempdir,text=True)
    for x in iterator: 
        print(list(iterator)[0]) 
        os.write(tempfich, (str(x)+'\t').encode())
    os.close(tempfich)

tempdir = "/tmp/foreachPartition"

if not os.path.exists(tempdir):
    os.mkdir(tempdir)
    # For each partition of the RDD, a temporary file is created.
    # The values of the partition are written to that file.
    nums.foreachPartition(f)

86

2
4


In [35]:
!ls -al /tmp/foreachPartition

total 28
drwxr-xr-x 2 root root 4096 Nov 21 10:19 .
drwxrwxrwt 1 root root 4096 Nov 21 10:19 ..
-rw------- 1 root root    2 Nov 21 10:19 tmp9b70_vb2
-rw------- 1 root root    2 Nov 21 10:19 tmpel0v4a21
-rw------- 1 root root    2 Nov 21 10:19 tmppdcf9rh9
-rw------- 1 root root    2 Nov 21 10:19 tmpqb489prs


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54624)
Traceback (most recent call last):
  File "/usr/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.9/socketserver.py", line 720, in __init__
    self.handle()
  File "/usr/local/lib/python3.9/dist-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/local/lib/python3.9/dist-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/local/lib/python3.9/dist-packages/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/lo