# **PySpark**: L'api de Apache Sparck

## 1. Introduction

Ce notebook nous montre comment ce connecter au cluster de spark et faire un petit ETL

## 2. The Spark Cluster

### 2.1. Connexion

Pour ce connecter au spark cluster on utliser 
+ **appName:** Le nome de l'app affiché dans l'UI [Spark Master Web UI](http://localhost:8080/);
+ **master:** L'URL du master la meme utilisé par les workers
+ **spark.executor.memory:** doit etre égale ou supérieur SPARK_WORKER_MEMORY dans docker compose config.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

On peut ajouter plus de config à SparkSession object dans le utilisant la méthode **config**. Voir API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

## 3. The Data

### 3.1. Introduction
On va Spark Python API to lire, procéder and écrire la data. Voir API docs [here](https://spark.apache.org/docs/latest/api/python/index.html).

### 3.2. Read

On va lire UK's macroeconomic data ([source](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) du cluster simulé  **Hadoop distributed file system (HDFS)** dans Spark dataframe.

In [2]:
data = spark.read.csv(path="data/uk-macroeconomic-data.csv", sep=",", header=True)

On va affcicher dataframe metadata, comme le nombre de colonnes les lignes et le shéma

In [3]:
data.count()

841

In [4]:
len(data.columns)

77

In [None]:
data.printSchema()

### 3.3. Process

On va tout d'abord séléctionner les data pertinents

In [6]:
unemployment = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [7]:
unemployment.show(n=10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



On a séléctionné avec succès les colonnes requises et afficher les 10 premiers colonnes mais on a un problème:
+ les prmières lignes ne contiennent pas de la data mais les unité de mesure;
+ There are many years with missing population and unemployment data.

On va supprimer la première ligne

In [8]:
cols_description = unemployment.filter(unemployment['Description'] == 'Units')

In [9]:
cols_description.show()

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
+-----------+------------------+-----------------+



In [10]:
unemployment = unemployment.join(other=cols_description, on=['Description'], how='left_anti')

In [11]:
unemployment.show(n=10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
|       1218|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



Nice! Now, let's drop the dataframe rows with missing data and refactor its columns names.

In [12]:
unemployment = unemployment.dropna()

In [13]:
unemployment = unemployment.\
               withColumnRenamed("Description", 'year').\
               withColumnRenamed("Population (GB+NI)", "population").\
               withColumnRenamed("Unemployment rate", "unemployment_rate")

In [14]:
unemployment.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



### 3.4. Write

Dernière chose on écrit la data dans notre HDFS simulé

In [15]:
unemployment.repartition(1).write.csv(path="data/uk-macroeconomic-unemployment-data.csv", sep=",", header=True, mode="overwrite")