# Mini Projet : APACHE SPARK

### 1 - Creation de session

In [69]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [70]:
#https://spark.apache.org/docs/latest/sql-getting-started.html
spark = SparkSession \
        .builder \
        .appName("firstSpark") \
        .getOrCreate()

### 2 - Ingestion du CSV Donors

In [71]:
# df = spark.read.format('csv').options(header='true').load(".")
def load_dataframe(filename):
    df = spark.read.format('csv').options(header='true').load(filename)
    return df

In [72]:
#creating a dataframe
df_donors  = load_dataframe('Donors.csv')

### 3 - Affichage
Afficher les 20 premières lignes de dataset Donors (Utilisez la fonction show())

In [73]:
df_donors.show()

+--------------------+------------+--------------+----------------+---------+
|            Donor_ID|  Donor_City|   Donor_State|Donor_Is_Teacher|Donor_Zip|
+--------------------+------------+--------------+----------------+---------+
|00000ce845c00cbf0...|    Evanston|      Illinois|              No|      602|
|00002783bc5d10851...|  Appomattox|         other|              No|      245|
|00002d44003ed46b0...|      Winton|    California|             Yes|      953|
|00002eb25d60a09c3...|Indianapolis|       Indiana|              No|      462|
|0000300773fe015f8...|    Paterson|    New Jersey|              No|      075|
|00004c31ce07c2214...|        null|         other|              No|     null|
|00004e32a448b4832...|    Stamford|   Connecticut|              No|      069|
|00004fa20a986e60a...|   Green Bay|     Wisconsin|              No|      543|
|00005454366b6b914...|      Argyle|      New York|              No|      128|
|0000584b8cdaeaa6b...|  Valparaiso|       Indiana|              

Conversion en dataframe pandas (utilisez la fonction "toPandas()")

In [74]:
df_donors.limit(20).toPandas()

Unnamed: 0,Donor_ID,Donor_City,Donor_State,Donor_Is_Teacher,Donor_Zip
0,00000ce845c00cbf0686c992fc369df4,Evanston,Illinois,No,602.0
1,00002783bc5d108510f3f9666c8b1edd,Appomattox,other,No,245.0
2,00002d44003ed46b066607c5455a999a,Winton,California,Yes,953.0
3,00002eb25d60a09c318efbd0797bffb5,Indianapolis,Indiana,No,462.0
4,0000300773fe015f870914b42528541b,Paterson,New Jersey,No,75.0
5,00004c31ce07c22148ee37acd0f814b9,,other,No,
6,00004e32a448b4832e1b993500bf0731,Stamford,Connecticut,No,69.0
7,00004fa20a986e60a40262ba53d7edf1,Green Bay,Wisconsin,No,543.0
8,00005454366b6b914f9a8290f18f4aed,Argyle,New York,No,128.0
9,0000584b8cdaeaa6b3de82be509db839,Valparaiso,Indiana,No,463.0


Trouver le nombre nul dans chaque colonne

In [76]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df_donors.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_donors.columns]).show()


+--------+----------+-----------+----------------+---------+
|Donor_ID|Donor_City|Donor_State|Donor_Is_Teacher|Donor_Zip|
+--------+----------+-----------+----------------+---------+
|       0|    105086|          0|               0|    88694|
+--------+----------+-----------+----------------+---------+



Imprimer le schéma de dataset (pour imprimer le schéma, on utilise la fonction "printSchema")

In [77]:
df_donors.printSchema()

root
 |-- Donor_ID: string (nullable = true)
 |-- Donor_City: string (nullable = true)
 |-- Donor_State: string (nullable = true)
 |-- Donor_Is_Teacher: string (nullable = true)
 |-- Donor_Zip: string (nullable = true)



### 4 - Filtrage

#### Laissez que les enregistrement dont Donor City commence par A

Vous pouvez utiliser la fonction "filter" 
 
Exemple :  "My_data.filter(My_data.name_colonne.like("A%"))"

Like("A%") : le caractère "%" est un caractère joker qui remplace tous les autres caractères. Ainsi, ce modèle permet de rechercher toutes les chaines de caractère qui commence par un "A".

In [78]:
df_donors.filter(df_donors.Donor_City.like("A%"))

DataFrame[Donor_ID: string, Donor_City: string, Donor_State: string, Donor_Is_Teacher: string, Donor_Zip: string]

#### Affichez les résultats

In [79]:
df_donors.filter(df_donors.Donor_City.like("A%")).show()

+--------------------+-------------+-------------+----------------+---------+
|            Donor_ID|   Donor_City|  Donor_State|Donor_Is_Teacher|Donor_Zip|
+--------------------+-------------+-------------+----------------+---------+
|00002783bc5d10851...|   Appomattox|        other|              No|      245|
|00005454366b6b914...|       Argyle|     New York|              No|      128|
|0001ef9f64a7e1038...|         Ames|         Iowa|              No|      500|
|00024e86676fc2c3b...|    Ann Arbor|     Michigan|              No|      481|
|0002a45d0b45a78e9...|    Ann Arbor|     Michigan|              No|      481|
|0002cb56c84b1cba7...|         Avon|  Connecticut|              No|      060|
|00050297e37eb7632...|       Austin|        Texas|              No|      787|
|0005327bfe18229b9...|        Acton|   California|              No|      935|
|00054f4b278af0c8d...|American Fork|         Utah|              No|      840|
|00055ed4f4745e71d...|  Albuquerque|   New Mexico|              

## 5 - Transformation

#### Construisez une nouvelle colonne `Address` en faisant une concaténation `Donor_City, Donor_State, Donor_Zip`

In [80]:
from pyspark.sql.functions import concat_ws

Vous pouvez utiliser la fonction "withColumn" et "concat_ws" 

In [81]:
 df_donors.withColumn("Address",concat_ws(",",col("Donor_City"),col("Donor_State"),col("Donor_Zip")))
 

DataFrame[Donor_ID: string, Donor_City: string, Donor_State: string, Donor_Is_Teacher: string, Donor_Zip: string, Address: string]

Afficher les résultats

In [82]:
df_donors.withColumn("Address",concat_ws(",",col("Donor_City"),col("Donor_State"),col("Donor_Zip"))).show()

+--------------------+------------+--------------+----------------+---------+--------------------+
|            Donor_ID|  Donor_City|   Donor_State|Donor_Is_Teacher|Donor_Zip|             Address|
+--------------------+------------+--------------+----------------+---------+--------------------+
|00000ce845c00cbf0...|    Evanston|      Illinois|              No|      602|Evanston,Illinois...|
|00002783bc5d10851...|  Appomattox|         other|              No|      245|Appomattox,other,245|
|00002d44003ed46b0...|      Winton|    California|             Yes|      953|Winton,California...|
|00002eb25d60a09c3...|Indianapolis|       Indiana|              No|      462|Indianapolis,Indi...|
|0000300773fe015f8...|    Paterson|    New Jersey|              No|      075|Paterson,New Jers...|
|00004c31ce07c2214...|        null|         other|              No|     null|               other|
|00004e32a448b4832...|    Stamford|   Connecticut|              No|      069|Stamford,Connecti...|
|00004fa20

## 6 - Moteur SQL

#### Persister le dataset de départ comme une Temporary View
Vous pouvez utliser la fcontion createOrReplaceTempView

In [83]:
df_donors.createOrReplaceTempView("df_donors")

#### Comptez le nombre de professeurs ayant participé à la donation

Vous pouvez utiliser la fonction count() et le langage SQL

In [85]:
filtereddf_donors = spark.sql(""" select * from df_donors where Donor_Is_Teacher = 'Yes' """)
filtereddf_donors.count()







104650

utiliser juste 10% du dataset c'est très grand complet pour des jointures pour votre petite machine... avec la method `sample`

In [86]:
df_donors_e = df_donors.sample(fraction=0.1, seed=3)

#### Afficher que les id des donateurs qui habite à California

Vous pouvez utiliser le langage SQL qu'on vu dans le TP 5 suivant select col_x from donors where col_y ="California

In [87]:
Carlifornia_residents = spark.sql(""" select Donor_ID from df_donors where  Donor_State = 'California' """)
Carlifornia_residents.show()

+--------------------+
|            Donor_ID|
+--------------------+
|00002d44003ed46b0...|
|00010615b56ff057f...|
|000177bef7ed7b7d1...|
|00017bad30d4b5d99...|
|000181f354f1cba54...|
|0001abd0c3f256bcd...|
|0001bde8e87c867f3...|
|0002128c613edd04b...|
|00031a7e84cb620d9...|
|00032349e9b32f61f...|
|0003748b6011978a1...|
|00038313e9d8b0d93...|
|00040a049ad9f348f...|
|00041fc8b829a8135...|
|0004265c44e425d71...|
|0004298ea9ff1bf0d...|
|00047ac546738c937...|
|00049338d2a420cd9...|
|0004be01ccfd90c20...|
|0004ceb1d06fd98f0...|
+--------------------+
only showing top 20 rows



#### Ingestion des données et publication en temporary view du fichier Donations.CSV

In [88]:
df_donations = load_dataframe('Donations.csv')

In [89]:
df_donations.createOrReplaceTempView("df_donations")

#### Afficher le DF

In [90]:
df_donations.count()
df_donations.show()

+--------------------+--------------------+--------------------+-----------------------------------+---------------+-------------------+----------------------+
|          Project ID|         Donation_ID|            Donor_ID|Donation_Included_Optional_Donation|Donation_Amount|Donor_Cart Sequence|Donation_Received_Date|
+--------------------+--------------------+--------------------+-----------------------------------+---------------+-------------------+----------------------+
|000009891526c0ade...|68872912085866622...|1f4b5b6e68445c6c4...|                                 No|         178.37|                 11|   2016-08-23 13:15:57|
|000009891526c0ade...|dcf1071da3aa3561f...|4aaab6d244bf35996...|                                Yes|           25.0|                  2|   2016-06-06 20:05:23|
|000009891526c0ade...|18a234b9d1e538c43...|0b0765dc9c759adc4...|                                Yes|           20.0|                  3|   2016-06-06 14:08:46|
|000009891526c0ade...|38d2744bf9138b0b5.

#### Calculer le montant minimum, le montant maximum, le montant moyen en arrondissant à l'unité après la virgule de la colonne Donation_Amount
pour l'ensemble Donations 

Utiliser les alias "maxMontant", "minMontant", "avgMontant". et la colonne "Donation_Amount"

Pour rappel en SQL, un alias ressemble à ça : "as maxMontant".

In [91]:
spark.sql("""
select Donation_Amount, max(Donation_Amount) as maxMontant, min(Donation_Amount) as minMontant, round(avg(Donation_Amount)) as avgMontant
from df_donations GROUP BY Donation_Amount
""").show()

+---------------+----------+----------+----------+
|Donation_Amount|maxMontant|minMontant|avgMontant|
+---------------+----------+----------+----------+
|           0.02|      0.02|      0.02|       0.0|
|           0.07|      0.07|      0.07|       0.0|
|           0.11|      0.11|      0.11|       0.0|
|           0.14|      0.14|      0.14|       0.0|
|           0.15|      0.15|      0.15|       0.0|
|           0.16|      0.16|      0.16|       0.0|
|            0.2|       0.2|       0.2|       0.0|
|           0.21|      0.21|      0.21|       0.0|
|           0.22|      0.22|      0.22|       0.0|
|           0.31|      0.31|      0.31|       0.0|
|           0.32|      0.32|      0.32|       0.0|
|           0.33|      0.33|      0.33|       0.0|
|           0.35|      0.35|      0.35|       0.0|
|           0.38|      0.38|      0.38|       0.0|
|           0.39|      0.39|      0.39|       0.0|
|           0.42|      0.42|      0.42|       0.0|
|           0.45|      0.45|   

utiliser juste 10% du dataset c'est très grand complet pour des jointures... avec la method `sample`

In [92]:
df_donations_e = df_donations.sample(fraction=0.1, seed=3)

#### Faites une jointure Entre le data set des donneurs Donors, et le dataset des Donations Donations

Indication : utilisez "inner join" de langage spark.sql

In [94]:
spark.sql(""" select * from df_donors inner join df_donations on 
df_donors.Donor_ID = df_donations.Donor_ID """ )


DataFrame[Donor_ID: string, Donor_City: string, Donor_State: string, Donor_Is_Teacher: string, Donor_Zip: string, Project ID: string, Donation_ID: string, Donor_ID: string, Donation_Included_Optional_Donation: string, Donation_Amount: string, Donor_Cart Sequence: string, Donation_Received_Date: string]

#### Calculez la somme de l'argent donnée par Les Professeurs (Donor Is Teacher=Yes) et les non professeurs utilisant seulement SQL

Indication : ('select sum(dt.col4) as amountProf from donations dt inner join donors dr on dt.col2 = dr.col0 and dr.col3 = "Yes" ')

In [95]:
spark.sql("""select sum(df_donations.Donation_Amount)
 as amountProf from df_donations inner join df_donors on df_donations.Donor_ID
 = df_donors.Donor_ID and df_donors.Donor_Is_Teacher
 = "Yes" """).show()

+-----------------+
|       amountProf|
+-----------------+
|6765183.590000001|
+-----------------+

