# PySpark

In [36]:
import pyspark.sql
from pyspark.sql import *
from pyspark.sql.functions import *

In [37]:
spark = SparkSession.builder.getOrCreate() # On crée la session

# On récupère le csv et on définit les paramétres - InferSchema permet de récupérer les infos comme le typage
data = spark.read.csv("Data/covid_de.csv", header=True, inferSchema=True) 

data.printSchema() # On affiche le schéma

data.createOrReplaceTempView("covid") # On crée une vue temporaire en la nommant

table = spark.sql("select * from covid") # On interroge la vu crée precédemment
table.show() # Affichage du DataFrame

root
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date: date (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- recovered: integer (nullable = true)

+------------------+------------------+---------+------+----------+-----+------+---------+
|             state|            county|age_group|gender|      date|cases|deaths|recovered|
+------------------+------------------+---------+------+----------+-----+------+---------+
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-27|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-28|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-04-03|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-04-05|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04| 

## Le select

In [38]:
display_country_and_state = table.select('state', 'county')
display_country_and_state.show()

+------------------+------------------+
|             state|            county|
+------------------+------------------+
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|
+------------------+------------------+
only showing top 20 rows



## Le distinct

In [39]:
display_country_and_state_distinct = display_country_and_state.distinct()
display_country_and_state_distinct.show()

+--------------------+--------------------+
|               state|              county|
+--------------------+--------------------+
|              Bayern|           SK Passau|
|       Niedersachsen|            SK Emden|
|              Hessen|        SK Offenbach|
|  Baden-Wuerttemberg|             LK Calw|
|              Bayern|       LK Ostallgaeu|
|              Bayern|    SK Aschaffenburg|
|         Brandenburg|         LK Prignitz|
| Nordrhein-Westfalen| LK Rhein-Sieg-Kreis|
|              Bayern| LK Nuernberger Land|
| Nordrhein-Westfalen|        LK Heinsberg|
| Nordrhein-Westfalen|            LK Wesel|
|         Brandenburg|       LK Oder-Spree|
|Mecklenburg-Vorpo...|LK Ludwigslust-Pa...|
| Nordrhein-Westfalen|       LK Euskirchen|
|              Bayern|           LK Dachau|
|              Bayern|  LK Straubing-Bogen|
|              Bayern|       LK Altoetting|
|              Bayern|           SK Amberg|
|              Hessen|  LK Lahn-Dill-Kreis|
|              Hessen|        LK

### En passant directement par la table

In [40]:
display_country_and_state_distinct = table.select('state', 'county').distinct()
display_country_and_state_distinct.show()

+--------------------+--------------------+
|               state|              county|
+--------------------+--------------------+
|              Bayern|           SK Passau|
|       Niedersachsen|            SK Emden|
|              Hessen|        SK Offenbach|
|  Baden-Wuerttemberg|             LK Calw|
|              Bayern|       LK Ostallgaeu|
|              Bayern|    SK Aschaffenburg|
|         Brandenburg|         LK Prignitz|
| Nordrhein-Westfalen| LK Rhein-Sieg-Kreis|
|              Bayern| LK Nuernberger Land|
| Nordrhein-Westfalen|        LK Heinsberg|
| Nordrhein-Westfalen|            LK Wesel|
|         Brandenburg|       LK Oder-Spree|
|Mecklenburg-Vorpo...|LK Ludwigslust-Pa...|
| Nordrhein-Westfalen|       LK Euskirchen|
|              Bayern|           LK Dachau|
|              Bayern|  LK Straubing-Bogen|
|              Bayern|       LK Altoetting|
|              Bayern|           SK Amberg|
|              Hessen|  LK Lahn-Dill-Kreis|
|              Hessen|        LK

## Utilisation des filters

### Conditions avec StartWith et ==

In [62]:
# from pyspark.sql.functions import *

show_data_start_by_b = display_country_and_state_distinct.filter(col('state').startswith('B')) # Récupération des lignes correspondant aux states commencant par la lettre 'B'
#show_data_start_by_b = display_country_and_state_distinct.filter(col('state') == 'Bayern') 

show_data_start_by_b.show()

+------------------+--------------------+
|             state|              county|
+------------------+--------------------+
|            Bayern|           SK Passau|
|Baden-Wuerttemberg|             LK Calw|
|            Bayern|       LK Ostallgaeu|
|            Bayern|    SK Aschaffenburg|
|       Brandenburg|         LK Prignitz|
|            Bayern| LK Nuernberger Land|
|       Brandenburg|       LK Oder-Spree|
|            Bayern|           LK Dachau|
|            Bayern|  LK Straubing-Bogen|
|            Bayern|       LK Altoetting|
|            Bayern|           SK Amberg|
|            Berlin| SK Berlin Neukoelln|
|Baden-Wuerttemberg|        SK Stuttgart|
|Baden-Wuerttemberg|         LK Enzkreis|
|            Bayern|          LK Bamberg|
|            Bayern|      LK Lichtenfels|
|Baden-Wuerttemberg|LK Main-Tauber-Kreis|
|            Bayern|       SK Regensburg|
|            Bayern|          LK Ansbach|
|            Bayern|     LK Unterallgaeu|
+------------------+--------------

### Conditions en AND

In [80]:
women_recovered = table.filter((col('gender') == 'F') & (col('recovered') == 1)).count() # Récupération des femmes ET qui sont soigné
print(women_recovered)

25318


### Conditions en OR

In [None]:
state_Bayern_Hessen = table.filter((col('county') == 'Bayern') | (col('state') == 'Hessen')).count() # Récupération des county Bayern OU des states Hessen
print(state_Bayern_Hessen)

## Le trie

In [78]:
#trie = table.sort(col('age_group').asc()).show()

Trie= table.filter((col('county') == 'Bayern') |\
                   (col('state') == 'Hessen')). \
            sort(col('age_group').asc(), \
                 col('date').desc()).\
            show()


+------+--------------------+---------+------+----------+-----+------+---------+
| state|              county|age_group|gender|      date|cases|deaths|recovered|
+------+--------------------+---------+------+----------+-----+------+---------+
|Hessen|LK Rheingau-Taunu...|    00-04|     M|2020-07-22|    1|     0|        0|
|Hessen|    LK Wetteraukreis|    00-04|     M|2020-07-21|    1|     0|        0|
|Hessen|    LK Wetteraukreis|    00-04|     M|2020-07-19|    1|     0|        0|
|Hessen|LK Waldeck-Franke...|    00-04|     M|2020-07-18|    1|     0|        0|
|Hessen|        SK Wiesbaden|    00-04|     F|2020-07-18|    1|     0|        0|
|Hessen|        LK Offenbach|    00-04|     F|2020-07-17|    1|     0|        0|
|Hessen|SK Frankfurt am Main|    00-04|     F|2020-07-17|    1|     0|        0|
|Hessen|LK Main-Taunus-Kreis|    00-04|     F|2020-07-16|    1|     0|        0|
|Hessen|           SK Kassel|    00-04|     M|2020-07-16|    1|     0|        0|
|Hessen|        SK Wiesbaden

## La limite

In [79]:
limite = table.limit(5).show()

+------------------+------------------+---------+------+----------+-----+------+---------+
|             state|            county|age_group|gender|      date|cases|deaths|recovered|
+------------------+------------------+---------+------+----------+-----+------+---------+
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-27|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-28|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-04-03|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-04-05|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-05-18|    1|     0|        1|
+------------------+------------------+---------+------+----------+-----+------+---------+



## Les  aggrégrations 

In [87]:
deaths_by_gender = table.select('gender', 'deaths')\
                        .groupby('gender')\
                        .agg(\
                             sum(col('deaths')).alias('sommeDeaths'))\
                        .sort(col('sommedeaths').desc())\
                        .show()

+------+-----------+
|gender|sommeDeaths|
+------+-----------+
|     M|       5030|
|     F|       4066|
|    NA|          5|
+------+-----------+



In [89]:
deaths_by_gender_sum_5k = table.select('gender', 'deaths')\
                        .groupby('gender')\
                        .agg(\
                             sum(col('deaths')).alias('sommeDeaths'))\
                        .filter(col('sommeDeaths') > 5000)\
                        .sort(col('sommedeaths').desc())\
                        .show()

+------+-----------+
|gender|sommeDeaths|
+------+-----------+
|     M|       5030|
+------+-----------+



## Ajouter une colonne

In [96]:
add_column = table.withColumn('add', lit(False)) # Lit permet de créer des constantes dans une nouvelle colonne

add_column.show()

+------------------+------------------+---------+------+----------+-----+------+---------+-----+
|             state|            county|age_group|gender|      date|cases|deaths|recovered|  add|
+------------------+------------------+---------+------+----------+-----+------+---------+-----+
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-27|    1|     0|        1|false|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-28|    1|     0|        1|false|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-04-03|    1|     0|        1|false|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-04-05|    1|     0|        1|false|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-05-18|    1|     0|        1|false|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    05-14|     F|2020-03-17|    1|     0|        1|false|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    05-14|     F|2020-03-25|    1|     0|        1|false|
|Baden-Wuerttemberg|LK Alb-Don