# Introduction to Spark and Python

Let's learn how to use Spark with Python by using the pyspark library! Make sure to view the video lecture explaining Spark and RDDs before continuing on with this code.

This notebook will serve as reference code for the Big Data section of the course involving Amazon Web Services. The video will provide fuller explanations for what the code is doing.

## Creating a SparkContext

First we need to create a SparkContext. We will import this from pyspark:

In [1]:
def _build_spark_session(
    app_name, driver_cores, driver_mem, max_executors, executor_cores,
    executor_mem, queue
):
    """Build Spark session."""
    return (
        SparkSession.builder
        .appName(app_name)
        .config("spark.master", "yarn")
        .config("spark.submit.deployMode", "client")
        .config("spark.driver.cores", driver_cores)
        .config("spark.driver.memory", driver_mem)
        .config("spark.executor.cores", executor_cores)
        .config("spark.executor.memory", executor_mem)
        .config("spark.shuffle.service.enabled", True)
        .config("spark.dynamicAllocation.enabled", True)
        .config("spark.dynamicAllocation.minExecutors", 0)
        .config("spark.dynamicAllocation.maxExecutors", max_executors)
        .config("spark.executor.memoryOverhead", 2048)
        .config("spark.driver.memoryOverhead", 1024)
        .config("spark.yarn.queue", queue)
        # .config("spark.sql.session.timeZone", "UTC")
        .config("spark.driver.extraClassPath", "/soft/ora1210/db/jdbc/lib/ojdbc6.jar")
        .config("spark.executor.extraClassPath", "/soft/ora1210/db/jdbc/lib/ojdbc6.jar")
        .getOrCreate()
    )


In [None]:
!pip install pyspark

In [2]:
from pyspark.sql import SparkSession


In [3]:
spark_session = SparkSession.builder\
        .appName("app_name") \
        .getOrCreate()

In [4]:
spark_context = spark_session.sparkContext

Now create the SparkContext,A SparkContext represents the connection to a Spark cluster, and can be used to create an RDD and broadcast variables on that cluster.

*Note! You can only have one SparkContext at a time the way we are running things here.*

## Basic Operations

We're going to start with a 'hello world' example, which is just reading a text file. First let's create a text file.
___

Let's write an example text file to read, we'll use some special jupyter notebook commands for this, but feel free to use any .txt file:

In [5]:
%%writefile example.txt
first line
second line
third line
fourth line

Overwriting example.txt


### Creating the RDD

Now we can take in the textfile using the **textFile** method off of the SparkContext we created. This method will read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

In [6]:
textFile = spark_context.textFile('example.txt')

Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. 

### Actions

We have just created an RDD using the textFile method and can perform operations on this object, such as counting the rows.

RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Let’s start with a few actions:

In [15]:
textFile.count()

4

In [8]:
textFile.first()

'first line'

### Transformations

Now we can use transformations, for example the filter transformation will return a new RDD with a subset of items in the file. Let's create a sample transformation using the filter() method. This method (just like Python's own filter function) will only return elements that satisfy the condition. Let's try looking for lines that contain the word 'second'. In which case, there should only be one line that has that.

In [9]:
secfind = textFile.filter(lambda line: 'second' in line)

In [10]:
# RDD
secfind

PythonRDD[4] at RDD at PythonRDD.scala:53

In [11]:
# Perform action on transformation
secfind.collect()

['second line']

In [12]:
# Perform action on transformation
secfind.count()

1

In [13]:
from pyspark.sql import functions as f

In [23]:
from pyspark.sql import Row
import datetime

l = [(datetime.date(2018,1,3), 'Ankit',25, 'F'),
     (datetime.date(2018,2,3), 'Jalfaizy',22, 'M'),
     (datetime.date(2018,1,5), 'saurabh',20, 'F'),
     (datetime.date(2018,1,12), 'Bala',26, 'F'),
     (datetime.date(2018,7,9), 'Jules',19, 'M') ,
     (datetime.date(2018,3,18), 'Arild',43, 'M'),
     (datetime.date(2018,1,5), 'sarah',20, 'F'),
     (datetime.date(2018,8,12), 'Boly',33, 'M'),
     (datetime.date(2018,4,6), 'Anita',35, 'F'),
     (datetime.date(2018,12,6), 'Jules',22, 'M'),
     (datetime.date(2018,7,24), 'Soul',20, 'M'),
     (datetime.date(2018,6,17), 'Gral',54, 'F'),
     (datetime.date(2018,9,7), 'Apoh',18, 'M'),
     (datetime.date(2018,10,4), 'Dony',32, 'M'),
     (datetime.date(2018,2,5), 'Tanoh',31, 'M'),
     (datetime.date(2018,11,12), 'Issouf',27, 'M'),
     (datetime.date(2018,10,3), 'Bilé',29, 'F'),
     (datetime.date(2018,5,3), 'Gagnon',20, 'M'),
     (datetime.date(2018,3,5), 'Papiss',28, 'F'),
     (datetime.date(2018,2,12), 'Kravitz',34, 'F'),
     (datetime.date(2018,5,9), 'Mouli',35, 'F'),
     (datetime.date(2018,8,3), 'Jacques',27, 'M'),
     (datetime.date(2018,12,5), 'soum',22, 'M'),
     (datetime.date(2018,4,12), 'MBra',36, 'F')]

rdd = spark_session.sparkContext.parallelize(l)
people = rdd.map(lambda x: Row(date=x[0], name=x[1], age=int(x[2]), sexe=x[3]))
schemaPeople = spark_session.createDataFrame(people)


In [25]:
schemaPeople.groupby("sexe").agg(f.count("*").alias('nb_sexe')).toPandas()

Unnamed: 0,sexe,nb_sexe
0,F,11
1,M,13


In [36]:
schemaPeople.filter(schemaPeople["age"] > 30).toPandas()

Unnamed: 0,date,name,age,sexe
0,2018-03-18,Arild,43,M
1,2018-08-12,Boly,33,M
2,2018-04-06,Anita,35,F
3,2018-06-17,Gral,54,F
4,2018-10-04,Dony,32,M
5,2018-02-05,Tanoh,31,M
6,2018-02-12,Kravitz,34,F
7,2018-05-09,Mouli,35,F
8,2018-04-12,MBra,36,F


In [37]:
schemaPeople.filter(schemaPeople["age"] > 30).count()

9

In [40]:
# Le nombre de personne qui sont nées après le 01-03-2018 et qui ont un âge inférieur à 40 ans et qui sont fille
date_limit = "2018-03-01"

# Filtrer les personnes selon les critères
schemaPeople.filter(
    (schemaPeople["date"] > date_limit) &  # Nées après le 01-03-2018
    (schemaPeople["age"] < 40) &          # Âge inférieur à 40 ans
    (schemaPeople["sexe"] == "F")         # Sexe féminin
).toPandas()

Unnamed: 0,date,name,age,sexe
0,2018-04-06,Anita,35,F
1,2018-10-03,Bilé,29,F
2,2018-03-05,Papiss,28,F
3,2018-05-09,Mouli,35,F
4,2018-04-12,MBra,36,F


In [42]:
# Détecer les filles qui ont moins de 25 ans 

# Filtrer uniquement les filles
filtered_girls_df = schemaPeople.filter(schemaPeople["sexe"] == "F")

# Ajouter une colonne "tag" pour différencier jeunes et vieilles filles
tagged_girls_df = filtered_girls_df.withColumn(
    "Generation",
    f.when(filtered_girls_df["age"] < 25, "jeune").otherwise("vieille")
)

# Afficher les résultats
tagged_girls_df.show()

+----------+-------+---+----+----------+
|      date|   name|age|sexe|Generation|
+----------+-------+---+----+----------+
|2018-01-03|  Ankit| 25|   F|   vieille|
|2018-01-05|saurabh| 20|   F|     jeune|
|2018-01-12|   Bala| 26|   F|   vieille|
|2018-01-05|  sarah| 20|   F|     jeune|
|2018-04-06|  Anita| 35|   F|   vieille|
|2018-06-17|   Gral| 54|   F|   vieille|
|2018-10-03|   Bilé| 29|   F|   vieille|
|2018-03-05| Papiss| 28|   F|   vieille|
|2018-02-12|Kravitz| 34|   F|   vieille|
|2018-05-09|  Mouli| 35|   F|   vieille|
|2018-04-12|   MBra| 36|   F|   vieille|
+----------+-------+---+----+----------+



In [46]:
a=schemaPeople.withColumn('Status',f.when((f.col('age') < 25) & (f.col('sexe')=='F'),'Jeune Fille').otherwise("Autre"))
a.show()

+----------+--------+---+----+-----------+
|      date|    name|age|sexe|     Status|
+----------+--------+---+----+-----------+
|2018-01-03|   Ankit| 25|   F|      Autre|
|2018-02-03|Jalfaizy| 22|   M|      Autre|
|2018-01-05| saurabh| 20|   F|Jeune Fille|
|2018-01-12|    Bala| 26|   F|      Autre|
|2018-07-09|   Jules| 19|   M|      Autre|
|2018-03-18|   Arild| 43|   M|      Autre|
|2018-01-05|   sarah| 20|   F|Jeune Fille|
|2018-08-12|    Boly| 33|   M|      Autre|
|2018-04-06|   Anita| 35|   F|      Autre|
|2018-12-06|   Jules| 22|   M|      Autre|
|2018-07-24|    Soul| 20|   M|      Autre|
|2018-06-17|    Gral| 54|   F|      Autre|
|2018-09-07|    Apoh| 18|   M|      Autre|
|2018-10-04|    Dony| 32|   M|      Autre|
|2018-02-05|   Tanoh| 31|   M|      Autre|
|2018-11-12|  Issouf| 27|   M|      Autre|
|2018-10-03|    Bilé| 29|   F|      Autre|
|2018-05-03|  Gagnon| 20|   M|      Autre|
|2018-03-05|  Papiss| 28|   F|      Autre|
|2018-02-12| Kravitz| 34|   F|      Autre|
+----------

In [53]:
a = a.withColumnRenamed('Status','Categorie')
a.show()

+----------+--------+---+----+-----------+
|      date|    name|age|sexe|  Categorie|
+----------+--------+---+----+-----------+
|2018-01-03|   Ankit| 25|   F|      Autre|
|2018-02-03|Jalfaizy| 22|   M|      Autre|
|2018-01-05| saurabh| 20|   F|Jeune Fille|
|2018-01-12|    Bala| 26|   F|      Autre|
|2018-07-09|   Jules| 19|   M|      Autre|
|2018-03-18|   Arild| 43|   M|      Autre|
|2018-01-05|   sarah| 20|   F|Jeune Fille|
|2018-08-12|    Boly| 33|   M|      Autre|
|2018-04-06|   Anita| 35|   F|      Autre|
|2018-12-06|   Jules| 22|   M|      Autre|
|2018-07-24|    Soul| 20|   M|      Autre|
|2018-06-17|    Gral| 54|   F|      Autre|
|2018-09-07|    Apoh| 18|   M|      Autre|
|2018-10-04|    Dony| 32|   M|      Autre|
|2018-02-05|   Tanoh| 31|   M|      Autre|
|2018-11-12|  Issouf| 27|   M|      Autre|
|2018-10-03|    Bilé| 29|   F|      Autre|
|2018-05-03|  Gagnon| 20|   M|      Autre|
|2018-03-05|  Papiss| 28|   F|      Autre|
|2018-02-12| Kravitz| 34|   F|      Autre|
+----------

In [26]:
schemaPeople.printSchema()
schemaPeople.show()

root
 |-- date: date (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- sexe: string (nullable = true)

+----------+--------+---+----+
|      date|    name|age|sexe|
+----------+--------+---+----+
|2018-01-03|   Ankit| 25|   F|
|2018-02-03|Jalfaizy| 22|   M|
|2018-01-05| saurabh| 20|   F|
|2018-01-12|    Bala| 26|   F|
|2018-07-09|   Jules| 19|   M|
|2018-03-18|   Arild| 43|   M|
|2018-01-05|   sarah| 20|   F|
|2018-08-12|    Boly| 33|   M|
|2018-04-06|   Anita| 35|   F|
|2018-12-06|   Jules| 22|   M|
|2018-07-24|    Soul| 20|   M|
|2018-06-17|    Gral| 54|   F|
|2018-09-07|    Apoh| 18|   M|
|2018-10-04|    Dony| 32|   M|
|2018-02-05|   Tanoh| 31|   M|
|2018-11-12|  Issouf| 27|   M|
|2018-10-03|    Bilé| 29|   F|
|2018-05-03|  Gagnon| 20|   M|
|2018-03-05|  Papiss| 28|   F|
|2018-02-12| Kravitz| 34|   F|
+----------+--------+---+----+
only showing top 20 rows



In [55]:
import os

In [56]:
output_path = os.path.join(os.getcwd(), "tagged_girls.csv")
a.coalesce(1).write.option("header", "true").csv(output_path)

In [54]:
import datetime
import os

# Date actuelle
current_date = datetime.date.today()

# Chemin du fichier
filepath = "data_repository"
filename = os.path.join(
    filepath,
    str(current_date.year),
    str(current_date.month).zfill(2),
    str(current_date.day).zfill(2)
)

# Écriture du DataFrame en format parquet
schemaPeople.coalesce(2).write.mode("overwrite").parquet(filename)


In [60]:
filename

'data_repository/2024/12/05'

In [169]:
spark = SparkSession.builder.appName("Skip Bad Lines").getOrCreate()

# Chemin du fichier CSV
file_path = "/home/jovyan/code/PBAL_Parc compteur_202409.csv"

# Lire le fichier CSV en ignorant les lignes mal formatées
df = spark.read.csv(
    file_path,
    header=True,      # Considère la première ligne comme un en-tête
    inferSchema=True, # Infère automatiquement les types des colonnes
    sep=";",
    mode="PERMISSIVE" # Tolère les erreurs de format
)

# Afficher les premières lignes
#df.show(truncate=False)  # Affiche toutes les colonnes
df.head()

Row(PB='PB ALBOU', CONTRAT_RATTACHEMENT='AQUAWASH DT - 42257', COMMUNE='BOIGNY SUR BIONNE', CODE_CYCLE_SERVICE='0DYL            ', LIBELLE_CYCLE_SERVICE='BOIGNY SUR BIONNE SEMESTRIEL (Lot 1)', CODE_TOURNEE='0DYL            ', LIBELLE_TOURNEE_SERVICE='BOIGNY SUR BIONNE SEMESTRIEL (Lot 1)', TOURNEE_OPALE=1.0, SEQ_TOURNEE=9700, ID_COMPTEUR=8361561125, NUMERO_BADGE=1581412316, NUMERO_SERIE='H24VA169824', FABRICANT='Sappel - Diehl Metering - MID', MODELE='ALTAIR V4 composite T50', DIAMETRE=15, ANNEE_FABRICATION=2024, SOLUTION_COMPACTE='Oui', SOLUTION_DEPORTEE='Oui', PASSERELLE_SITE_ISOLE='Oui', DATE_RECEPTION='04/09/2024', DATE_DEPOSE=None, MATRICULE_EQUIPEMENT='11A5241011931007', ACCESSIBLE='OUI', CODE_EMPLACEMENT='AJ  ', LIBELLE_EMPLACEMENT='Jardin - Accessible', DETAILS_EMPLACEMENTS='JARDIN CHIENS', ID_PDS=6447593, DATE_MISE_EN_SERVICE_PDS='01/01/2009', ETAT_PDS='En service', ETAT_SOURCE_PDS='Ouvert', LOGEMENT_VACANT='NON      ', GEN_DIV='Normal       ', RACCORDE_RACORDABLE='Raccordable 

In [137]:
df.printSchema()

root
 |-- PB: string (nullable = true)
 |-- CONTRAT_RATTACHEMENT: string (nullable = true)
 |-- COMMUNE: string (nullable = true)
 |-- CODE_CYCLE_SERVICE: string (nullable = true)
 |-- LIBELLE_CYCLE_SERVICE: string (nullable = true)
 |-- CODE_TOURNEE: string (nullable = true)
 |-- LIBELLE_TOURNEE_SERVICE: string (nullable = true)
 |-- TOURNEE_OPALE: double (nullable = true)
 |-- SEQ_TOURNEE: integer (nullable = true)
 |-- ID_COMPTEUR: long (nullable = true)
 |-- NUMERO_BADGE: integer (nullable = true)
 |-- NUMERO_SERIE: string (nullable = true)
 |-- FABRICANT: string (nullable = true)
 |-- MODELE: string (nullable = true)
 |-- DIAMETRE: integer (nullable = true)
 |-- ANNEE_FABRICATION: integer (nullable = true)
 |-- SOLUTION_COMPACTE: string (nullable = true)
 |-- SOLUTION_DEPORTEE: string (nullable = true)
 |-- PASSERELLE_SITE_ISOLE: string (nullable = true)
 |-- DATE_RECEPTION: string (nullable = true)
 |-- DATE_DEPOSE: string (nullable = true)
 |-- MATRICULE_EQUIPEMENT: string (null

In [92]:
df.printSchema() 

root
 |-- PB: string (nullable = true)
 |-- CONTRAT_RATTACHEMENT: string (nullable = true)
 |-- COMMUNE: string (nullable = true)
 |-- CODE_CYCLE_SERVICE: string (nullable = true)
 |-- LIBELLE_CYCLE_SERVICE: string (nullable = true)
 |-- CODE_TOURNEE: string (nullable = true)
 |-- LIBELLE_TOURNEE_SERVICE: string (nullable = true)
 |-- TOURNEE_OPALE: double (nullable = true)
 |-- SEQ_TOURNEE: integer (nullable = true)
 |-- ID_COMPTEUR: long (nullable = true)
 |-- NUMERO_BADGE: integer (nullable = true)
 |-- NUMERO_SERIE: string (nullable = true)
 |-- FABRICANT: string (nullable = true)
 |-- MODELE: string (nullable = true)
 |-- DIAMETRE: integer (nullable = true)
 |-- ANNEE_FABRICATION: integer (nullable = true)
 |-- SOLUTION_COMPACTE: string (nullable = true)
 |-- SOLUTION_DEPORTEE: string (nullable = true)
 |-- PASSERELLE_SITE_ISOLE: string (nullable = true)
 |-- DATE_RECEPTION: date (nullable = true)
 |-- DATE_DEPOSE: date (nullable = true)
 |-- MATRICULE_EQUIPEMENT: string (nullable

In [94]:
df.head()

Row(PB='PB ALBOU', CONTRAT_RATTACHEMENT='AQUAWASH DT - 42257', COMMUNE='BOIGNY SUR BIONNE', CODE_CYCLE_SERVICE='0DYL            ', LIBELLE_CYCLE_SERVICE='BOIGNY SUR BIONNE SEMESTRIEL (Lot 1)', CODE_TOURNEE='0DYL            ', LIBELLE_TOURNEE_SERVICE='BOIGNY SUR BIONNE SEMESTRIEL (Lot 1)', TOURNEE_OPALE=1.0, SEQ_TOURNEE=9700, ID_COMPTEUR=8361561125, NUMERO_BADGE=1581412316, NUMERO_SERIE='H24VA169824', FABRICANT='Sappel - Diehl Metering - MID', MODELE='ALTAIR V4 composite T50', DIAMETRE=15, ANNEE_FABRICATION=2024, SOLUTION_COMPACTE='Oui', SOLUTION_DEPORTEE='Oui', PASSERELLE_SITE_ISOLE='Oui', DATE_RECEPTION=datetime.date(2024, 9, 4), DATE_DEPOSE=None, MATRICULE_EQUIPEMENT='11A5241011931007', ACCESSIBLE='OUI', CODE_EMPLACEMENT='AJ  ', LIBELLE_EMPLACEMENT='Jardin - Accessible', DETAILS_EMPLACEMENTS='JARDIN CHIENS', ID_PDS=6447593, DATE_MISE_EN_SERVICE_PDS=datetime.date(2009, 1, 1), ETAT_PDS='En service', ETAT_SOURCE_PDS='Ouvert', LOGEMENT_VACANT='NON      ', GEN_DIV='Normal       ', RACCORD

In [None]:
# Afficher les communes uniques 
df.select('COMMUNE').distinct().collect()

[Row(COMMUNE='MARDIE'),
 Row(COMMUNE='BOU'),
 Row(COMMUNE='CHECY'),
 Row(COMMUNE='CHANTEAU'),
 Row(COMMUNE='BOIGNY SUR BIONNE'),
 Row(COMMUNE='COMBLEUX'),
 Row(COMMUNE='MARIGNY LES USAGES'),
 Row(COMMUNE='OLIVET'),
 Row(COMMUNE='ORLEANS'),
 Row(COMMUNE='ORMES'),
 Row(COMMUNE='ST JEAN DE LA RUELLE'),
 Row(COMMUNE='SAINT JEAN LE BLANC'),
 Row(COMMUNE='ST JEAN LE BLANC'),
 Row(COMMUNE='ST PRYVE ST MESMIN'),
 Row(COMMUNE='ST JEAN DE BRAYE'),
 Row(COMMUNE='SARAN'),
 Row(COMMUNE='ST CYR EN VAL')]

In [110]:
from pyspark.sql.functions import to_date, col

# Liste des colonnes qui contiennent des dates sous forme de string
date_columns = [
    "DATE_RECEPTION", "DATE_DEPOSE", "DATE_MISE_EN_SERVICE_PDS", 
    "DATE_RACCORDABILITE", "DATE_DERNIERE_RELEVE", "DT_FIN_CONTRAT_CADRE"
]

# Appliquer la conversion pour chaque colonne de type string en date
for column in date_columns:
    df = df.withColumn(column, to_date(col(column), "dd/MM/yyyy"))  # Ajustez le format si nécessaire

# Vérifier les changements
df.printSchema()

root
 |-- PB: string (nullable = true)
 |-- CONTRAT_RATTACHEMENT: string (nullable = true)
 |-- COMMUNE: string (nullable = true)
 |-- CODE_CYCLE_SERVICE: string (nullable = true)
 |-- LIBELLE_CYCLE_SERVICE: string (nullable = true)
 |-- CODE_TOURNEE: string (nullable = true)
 |-- LIBELLE_TOURNEE_SERVICE: string (nullable = true)
 |-- TOURNEE_OPALE: string (nullable = true)
 |-- SEQ_TOURNEE: string (nullable = true)
 |-- ID_COMPTEUR: string (nullable = true)
 |-- NUMERO_BADGE: string (nullable = true)
 |-- NUMERO_SERIE: string (nullable = true)
 |-- FABRICANT: string (nullable = true)
 |-- MODELE: string (nullable = true)
 |-- DIAMETRE: string (nullable = true)
 |-- ANNEE_FABRICATION: string (nullable = true)
 |-- SOLUTION_COMPACTE: string (nullable = true)
 |-- SOLUTION_DEPORTEE: string (nullable = true)
 |-- PASSERELLE_SITE_ISOLE: string (nullable = true)
 |-- DATE_RECEPTION: date (nullable = true)
 |-- DATE_DEPOSE: date (nullable = true)
 |-- MATRICULE_EQUIPEMENT: string (nullable =

In [163]:
import pyspark.sql.functions as f

date_columns = [
    "DATE_RECEPTION", "DATE_DEPOSE", "DATE_MISE_EN_SERVICE_PDS", 
    "DATE_RACCORDABILITE", "DATE_DERNIERE_RELEVE", "DT_FIN_CONTRAT_CADRE"
]

# 1ere fonction qui clean les donnees en les listants 

def clean_dataset(df,date_columns ):
    df= df.select([f.trim(f.col(c)).alias(c) for c in df.columns])
    df=df.select([f.to_date(f.col(column),"dd/MM/yyyy").alias(column) if column in date_columns else f.col(column) for column in df.columns ])
    df=df.withColumn('COMMUNE',f.when(df["COMMUNE"].isin(["SARAN", "ORLEANS"]), 'None').otherwise(df["COMMUNE"]))
    
    return df


# 2e fonction sans lister les colonnes DATES
def clean_dataset2(df):
    df= df.select([f.trim(f.col(c)).alias(c) for c in df.columns])
    df = df.select(*[f.to_date(f.col(column), "dd/MM/yyyy").alias(column) if column.startswith('DATE') or column.startswith('DT') else f.col(column) for column in df.columns])

    df=df.withColumn('COMMUNE',f.when(df["COMMUNE"].isin(["SARAN", "ORLEANS"]), 'None').otherwise(df["COMMUNE"]))
    
    return df


In [134]:
df.select('MODE_DE_RELEVE').distinct().collect()

[Row(MODE_DE_RELEVE='A pied'),
 Row(MODE_DE_RELEVE='Télé-Relevé'),
 Row(MODE_DE_RELEVE='Radio-Relevé'),
 Row(MODE_DE_RELEVE=None)]

In [180]:
df=clean_dataset(df,date_columns)
df.printSchema()

root
 |-- PB: string (nullable = true)
 |-- CONTRAT_RATTACHEMENT: string (nullable = true)
 |-- COMMUNE: string (nullable = true)
 |-- CODE_CYCLE_SERVICE: string (nullable = true)
 |-- LIBELLE_CYCLE_SERVICE: string (nullable = true)
 |-- CODE_TOURNEE: string (nullable = true)
 |-- LIBELLE_TOURNEE_SERVICE: string (nullable = true)
 |-- TOURNEE_OPALE: string (nullable = true)
 |-- SEQ_TOURNEE: string (nullable = true)
 |-- ID_COMPTEUR: string (nullable = true)
 |-- NUMERO_BADGE: string (nullable = true)
 |-- NUMERO_SERIE: string (nullable = true)
 |-- FABRICANT: string (nullable = true)
 |-- MODELE: string (nullable = true)
 |-- DIAMETRE: string (nullable = true)
 |-- ANNEE_FABRICATION: string (nullable = true)
 |-- SOLUTION_COMPACTE: string (nullable = true)
 |-- SOLUTION_DEPORTEE: string (nullable = true)
 |-- PASSERELLE_SITE_ISOLE: string (nullable = true)
 |-- DATE_RECEPTION: date (nullable = true)
 |-- DATE_DEPOSE: date (nullable = true)
 |-- MATRICULE_EQUIPEMENT: string (nullable =

In [181]:
len(df.columns)

103

In [182]:
df.select('COMMUNE').distinct().collect()

[Row(COMMUNE='MARDIE'),
 Row(COMMUNE='BOU'),
 Row(COMMUNE='CHECY'),
 Row(COMMUNE='CHANTEAU'),
 Row(COMMUNE='BOIGNY SUR BIONNE'),
 Row(COMMUNE='COMBLEUX'),
 Row(COMMUNE='MARIGNY LES USAGES'),
 Row(COMMUNE='OLIVET'),
 Row(COMMUNE='None'),
 Row(COMMUNE='ORMES'),
 Row(COMMUNE='ST JEAN DE LA RUELLE'),
 Row(COMMUNE='SAINT JEAN LE BLANC'),
 Row(COMMUNE='ST JEAN LE BLANC'),
 Row(COMMUNE='ST PRYVE ST MESMIN'),
 Row(COMMUNE='ST JEAN DE BRAYE'),
 Row(COMMUNE='ST CYR EN VAL')]

In [183]:
df.groupBy("COMMUNE", "MODE_DE_RELEVE").agg(f.count("*").alias("COUNT"))


DataFrame[COMMUNE: string, MODE_DE_RELEVE: string, COUNT: bigint]

In [184]:
# Une fonction qui permet de grouper par une colonne et utilise une colonne pivot pour fournir le nombre d'occurences

def group_and_pivot(df, group_col, pivot_col):
    """
    Regroupe les données par une colonne et effectue un pivot sur une autre colonne.
    
    Parameters:
    df (DataFrame): Le DataFrame Spark à traiter.
    group_col (str): La colonne utilisée pour le regroupement.
    pivot_col (str): La colonne utilisée pour le pivot.
    
    Returns:
    pd.DataFrame: Un DataFrame Pandas contenant les résultats agrégés.
    """
    result_df = df.groupBy(group_col).pivot(pivot_col).count()
    return result_df.toPandas()

In [185]:
group_and_pivot(df,"COMMUNE","MODE_DE_RELEVE")

Unnamed: 0,COMMUNE,null,A pied,Radio-Relevé,Télé-Relevé
0,MARDIE,,314,2.0,1126.0
1,,,621,18.0,23893.0
2,ST JEAN DE LA RUELLE,,28,46.0,
3,ORMES,,1917,73.0,18.0
4,BOU,,87,,457.0
5,MARIGNY LES USAGES,,782,1.0,11.0
6,OLIVET,,186,1.0,8058.0
7,SAINT JEAN LE BLANC,,2,,1.0
8,ST JEAN LE BLANC,,73,9.0,3278.0
9,CHECY,3.0,647,1.0,3537.0


In [186]:
# Une fonction qui retire toutes les colonnes qui n'ont que des valeurs Null
def remove_null_columns(df):
    """
    Supprime les colonnes contenant uniquement des valeurs nulles dans un DataFrame PySpark.

    :param df: Le DataFrame PySpark à nettoyer.
    :return: Le DataFrame sans les colonnes avec uniquement des valeurs nulles.
    """
    non_null_columns = [c for c in df.columns if df.filter(df[c].isNotNull()).count() > 0]
    return df.select(*non_null_columns)

In [187]:
# Une fonction qui retourne entièrement les colonnes nulles 
def colonnes_nulles(df):
    
    null_columns = [
        col_name
        for col_name in df.columns
        if df.filter(f.col(col_name).isNotNull()).count() == 0
    ]
    return null_columns

In [188]:
# Colonnes entièrement nulle
b=colonnes_nulles(df)

In [189]:
len(b)
b

['FORAGE',
 'ANC_A_FACTURER',
 'BASSINS_VERSANTS',
 'NOMBRE_UL',
 'CODE_REM4',
 'MO_RES',
 'NAT_REJ',
 'VOLUME_FORAGE_SITE',
 'CODE_REF_AGENCE_EAU',
 'L_REF_AGENCE_EAU',
 'NB_LOG',
 'NB_MIG',
 'NB_PRO',
 'NB_TOU',
 'NB_UG1',
 'NB_UG2',
 'NB_UG3',
 'NB_UG4',
 'NB_UG5']

In [190]:
df=remove_null_columns(df)

In [191]:
len(df.columns)

84

In [194]:
df.select('DATE_DEPOSE').distinct().collect()

[Row(DATE_DEPOSE=datetime.date(2024, 7, 8)),
 Row(DATE_DEPOSE=datetime.date(2023, 11, 23)),
 Row(DATE_DEPOSE=datetime.date(2024, 5, 14)),
 Row(DATE_DEPOSE=None)]

In [195]:
df.show(6)

+--------+--------------------+-----------------+------------------+---------------------+------------+-----------------------+-------------+-----------+-----------+------------+------------+--------------------+--------------------+--------+-----------------+-----------------+-----------------+---------------------+--------------+-----------+--------------------+----------+----------------+--------------------+--------------------+--------+------------------------+----------+---------------+---------------+-------+--------------------+-------------------+------+----------+-------------------+------------+-------------------+--------------+------------------+--------+-------------+-----------------+---------------+----------------+----------------+-------------------+----------+-------------+-----------------+-----------------+------------------+-----+--------------------+------------------+---------------------+---------+---------+---------+--------------------+---+---------+---------

#### Transformer les variables, DATE_RECEPTION, DATE_MISE_EN_SERVICE_PDS, DATE_DERNIERE_RELEVE en timestamp

In [196]:

def convert_to_timestamp(df, columns, date_format="dd/MM/yyyy"):
    """
    Convertir une liste de colonnes en type timestamp dans un DataFrame Spark.
    
    :param df: DataFrame Spark
    :param columns: Liste des colonnes à convertir
    :param date_format: Format des dates (par défaut "dd/MM/yyyy")
    :return: DataFrame avec les colonnes converties
    """
    return df.select(
        *[f.to_timestamp(df[col], date_format).alias(col) if col in columns else df[col] for col in df.columns]
    )

In [199]:
# Transformer les variables, DATE_RECEPTION, DATE_MISE_EN_SERVICE_PDS, DATE_DERNIERE_RELEVE en timestamp
date_columns = ["DATE_RECEPTION", "DATE_MISE_EN_SERVICE_PDS", "DATE_DERNIERE_RELEVE"]
df = convert_to_timestamp(df, date_columns)

#### Transformer les variables diametre et INDEX_DERNIERE_RELEVE en integer et la variable CMJ en double


In [200]:
# Transformer les variables diametre et INDEX_DERNIERE_RELEVE en integer et la variable CMJ en double

def convertion_typecol(df, int_columns, double_columns):
    """
    Convertir des colonnes en types spécifiques (int ou double).
    
    :param df: DataFrame Spark
    :param int_columns: Liste des colonnes à convertir en integer
    :param double_columns: Liste des colonnes à convertir en double
    :return: DataFrame avec les colonnes converties
    """
    # Appliquer les transformations pour chaque type
    df = df.select(
        *[
            f.col(c).cast("int").alias(c) if c in int_columns 
            else f.col(c).cast("double").alias(c) if c in double_columns 
            else df[c] 
            for c in df.columns
        ]
    )
    return df

In [201]:
# Colonnes à transformer
int_columns = ["DIAMETRE", "INDEX_DERNIERE_RELEVE"]
double_columns = ["CMJ"]

# Utilisation de la fonction
df = convertion_typecol(df, int_columns, double_columns)


#### Quel est le nombre de compteur *Tele-relevé* et le nombre de compteur *relevé à pied*?


In [224]:
# Quel est le nombre de compteur telereve et le nombre de compteur relevé à pied?

# Le nombre de compteurs telereleve

nb_comptreur_telerev = df.filter(f.col("MODE_DE_RELEVE") == "Télé-Relevé").count()

# Le nombre de compteur relevé à pied 

nb_compteur_apied = df.filter(f.col("MODE_DE_RELEVE") == "A pied").count()

# Affichage des nombres :

print(f"Nombre de compteurs télérelevé : {nb_comptreur_telerev}")
print(f"Nombre de compteurs relevé à pied : {nb_compteur_apied}")

Nombre de compteurs télérelevé : 44072
Nombre de compteurs relevé à pied : 6021


In [225]:
# Filtrer les PDS mis en service en 2023
# Quels est le nombre de PDS mis en service en 2023 ? Quels sont les PDS télé relevé parmi ceux la?

pds_2023 = df.filter(f.year(f.col("DATE_MISE_EN_SERVICE_PDS")) == 2023)

# Nombre total de PDS mis en service en 2023
nb_pds_2023 = pds_2023.count()

# Nombre de PDS télérelevés en 2023
nb_pds_telereleve_2023 = pds_2023.filter(f.col("MODE_DE_RELEVE") == "Télé-Relevé").count()

# Affichage des résultats
print(f"Nombre de PDS mis en service en 2023 : {nb_pds_2023}")
print(f"Nombre de PDS télérelevés en 2023 : {nb_pds_telereleve_2023}")

Nombre de PDS mis en service en 2023 : 1169
Nombre de PDS télérelevés en 2023 : 897


In [210]:
# Quel est le nombre de PDS telereleve à la commune de BOIGNY SUR BIONNE? Quels sont sont avec un Diametre > à 40cm?


# Filtrer les PDS à la commune de Boigny sur Bionne et télérelevés
pds_boigny_telereleve = df.filter((f.col("COMMUNE") == "BOIGNY SUR BIONNE") & 
                                  (f.col("MODE_DE_RELEVE") == "Télé-Relevé"))

# Nombre total de PDS télérelevés à Boigny sur Bionne
nb_pds_boigny_telereleve = pds_boigny_telereleve.count()

# Filtrer les PDS télérelevés avec un diamètre > 40 cm
pds_boigny_telereleve_diametre_40 = pds_boigny_telereleve.filter(col("DIAMETRE") > 40)

# Nombre de PDS télérelevés avec un diamètre > 40 cm
nb_pds_boigny_telereleve_diametre_40 = pds_boigny_telereleve_diametre_40.count()

# Affichage des résultats
print(f"Nombre de PDS télérelevés à Boigny sur Bionne : {nb_pds_boigny_telereleve}")
print(f"Nombre de PDS télérelevés à Boigny sur Bionne avec un diamètre > 40 cm : {nb_pds_boigny_telereleve_diametre_40}")

Nombre de PDS télérelevés à Boigny sur Bionne : 502
Nombre de PDS télérelevés à Boigny sur Bionne avec un diamètre > 40 cm : 2


In [211]:
# Compter le nombre de contrats distincts
nb_contrats = df.select("NATURE_CONTRAT_RATTACHEMENT").distinct().count()

# Nombre de communes distinctes par contrat
communes_par_contrat = df.groupBy("NATURE_CONTRAT_RATTACHEMENT").agg(
    f.countDistinct("COMMUNE").alias("nombre_communes")
)

# Affichage des résultats
print(f"Nombre de contrats distincts : {nb_contrats}")
communes_par_contrat.show()

Nombre de contrats distincts : 1
+---------------------------+---------------+
|NATURE_CONTRAT_RATTACHEMENT|nombre_communes|
+---------------------------+---------------+
|                        DSP|             16|
+---------------------------+---------------+



In [212]:
# Filtrer les compteurs avec un diamètre >= 100 cm
df_filtre = df.filter(df["DIAMETRE"] >= 100)

# Compter le nombre de compteurs par commune
commune_diametre_100 = df_filtre.groupBy("COMMUNE").agg(
    f.count("*").alias("nombre_compteurs")
)

# Trier par le nombre de compteurs et afficher la commune avec le plus grand nombre
commune_max_diametre_100 = commune_diametre_100.orderBy(f.col("nombre_compteurs").desc()).limit(1)

# Afficher le résultat
commune_max_diametre_100.show()

+-------+----------------+
|COMMUNE|nombre_compteurs|
+-------+----------------+
|   None|             125|
+-------+----------------+



In [217]:
commune_diametre_100.show()

+------------------+----------------+
|           COMMUNE|nombre_compteurs|
+------------------+----------------+
|            MARDIE|               2|
|             CHECY|               9|
| BOIGNY SUR BIONNE|               3|
|          COMBLEUX|               2|
|MARIGNY LES USAGES|               2|
|            OLIVET|              21|
|              None|             125|
|             ORMES|              21|
|  ST JEAN LE BLANC|               7|
|ST PRYVE ST MESMIN|               6|
|     ST CYR EN VAL|              12|
+------------------+----------------+



Notice how the transformations won't display an output and won't be run until an action is called. In the next lecture: Advanced Spark and Python we will begin to see many more examples of this transformation and action relationship!

# Great Job!

In [230]:
import os
import logging
import hashlib
from pyspark.sql import functions as F
import pyspark.sql.types as T
from pyspark.sql import dataframe as pydf
import pandas as pd

In [231]:
def _md5_hash(x, secret_key):
    """Return the MD5 hash of the value.

    Parameters
    ----------
    x : str
        The value to anonymize.

    Returns
    -------
    str
        The MD5 hash of the value

    """
    return hashlib.md5(x.encode("utf-8") + secret_key.encode("utf-8")).hexdigest()


In [232]:
_md5_hash("0784808907","svs")

'f55b7a8749cf57e7853c5a97b223d2d4'