# TEPEDELEN Léo DELLARICA Steven BARBIN Kevin

# Projet EISI1 Capgemini MSPRTPRE813_1

Ce notebook Python est le rapport détaillé et l'ETL de notre projet. La suite du rapport se situe après l'ETL, nous y détaillons le fonctionnement du reste du système.

Il a été réalisé à l'aide de Docker afin d'en faciliter le déploiement pour tester et collaborer.

Vous trouverez tout ce qu'il vous faut pour essayer le programme sur votre machine dans le fichier README de ce repository github :

# Extraire les données dans un DataFrame pySpark

In [1]:
import pandas as pd

In [2]:
from pyspark.sql import functions as F

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Créer une session Spark
spark = SparkSession.builder.appName("example") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "50") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "2") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [4]:
delinquance_brut = pd.read_parquet("https://www.dropbox.com/scl/fi/gnuphrytsuw0d1rx2ix0i/donnee-comm-data.gouv-parquet-2023-geographie2024-produit-le2024-07-05.parquet?rlkey=hm2t98tm3vyzks7q0ehaoh94e&st=4gttfgqe&dl=1", engine='pyarrow')

In [5]:
delinquance_brut.drop(columns=['millPOP','millLOG','classe', 'unité.de.compte',	'valeur.publiée', 'complementinfoval', 'complementinfotaux', 'POP', 'LOG'], inplace=True)

In [6]:
delinquance_brut.rename(columns={'CODGEO_2024' : 'codecommune', 'faits':'crimes_delits'}, inplace=True)

In [7]:
delinquance_brut.dropna(subset=['annee'], inplace=True)

In [8]:
delinquance_brut['codecommune'] = delinquance_brut['codecommune'].str.replace('[^0-9]', '', regex=True).astype('int64')

In [9]:
delinquance_brut['annee'] = delinquance_brut['annee'].astype(str)
delinquance_brut['annee'] = delinquance_brut['annee'].str.replace('.0','')
delinquance_brut['annee'] = '20' + delinquance_brut['annee']
delinquance_brut['annee'] = delinquance_brut['annee'].str.replace('[^0-9]', '', regex=True).astype('int64')

In [10]:
delinquance_brut = delinquance_brut.groupby(['codecommune','annee'])['crimes_delits'].sum().reset_index()

In [11]:
delinquance_brut['crimes_delits'] = delinquance_brut['crimes_delits'].astype('int64')

In [12]:
delinquance_brut = spark.createDataFrame(delinquance_brut)

In [13]:
delinquance_brut.printSchema()

root
 |-- codecommune: long (nullable = true)
 |-- annee: long (nullable = true)
 |-- crimes_delits: long (nullable = true)



In [14]:
# candidats_results.parquet
df_election = pd.read_parquet('https://www.dropbox.com/scl/fi/04jpbz2gqcax0y8ubetz8/candidats_results.parquet?rlkey=cvs3816ot12wxg41ss02q4aag&st=0r4ilpa8&dl=1')

In [15]:
def fillNuance(df, col, value, nuance):
    df.loc[df[col] == value, ["Nuance"]] = nuance

In [16]:
missing_nuances_libelle = {
    "ALLIANCE JAUNE":"EGAUCHE",
    "ALLONS ENFANTS":"GAUCHE",
    "DEBOUT LA FRANCE":"EDROITE",
    "DÉCROISSANCE 2019":"EGAUCHE",
    "DÉMOCRATIE REPRÉSENTATIVE":"EGAUCHE",
    "ENSEMBLE PATRIOTES":"EDROITE",
    "ENSEMBLE POUR LE FREXIT":"EDROITE",
    "ENVIE D'EUROPE":"GAUCHE",
    "ESPERANTO":"EGAUCHE",
    "EUROPE AU SERVICE PEUPLES":"GAUCHE",
    "EUROPE ÉCOLOGIE":"GAUCHE",
    "INITIATIVE CITOYENNE":"EGAUCHE",
    "LA FRANCE INSOUMISE":"GAUCHE",
    "LA LIGNE CLAIRE":"EDROITE",
    "LES EUROPÉENS":"CENTRE",
    "LES OUBLIES DE L'EUROPE":"CENTRE",
    "LISTE CITOYENNE":"GAUCHE",
    "LISTE DE LA RECONQUÊTE":"EDROITE",
    "LUTTE OUVRIÈRE":"EGAUCHE",
    "NEUTRE ET ACTIF":"EDROITE",
    "PACE":"GAUCHE",
    "PARTI ANIMALISTE":"CENTRE",
    "PARTI FED. EUROPÉEN":"EDROITE",
    "PARTI PIRATE":"GAUCHE",
    "POUR L'EUROPE DES GENS":"EGAUCHE",
    "PRENEZ LE POUVOIR":"EDROITE",
    "RENAISSANCE":"CENTRE",
    "RÉVOLUTIONNAIRE":"EGAUCHE",
    "UDLEF":"DROITE",
    "UNE FRANCE ROYALE":"EDROITE",
    "UNION DROITE-CENTRE":"CDROIT",
    "URGENCE ÉCOLOGIE":"EGAUCHE",
    "À VOIX ÉGALES":"GAUCHE",
    "ÉVOLUTION CITOYENNE":"GAUCHE"
}

In [17]:
missing_nuances_nom = {
    "ARTHAUD":"EGAUCHE",
    "ASSELINEAU":"EDROITE",
    "CHEMINADE":"EDROITE",
    "DUPONT-AIGNAN":"EDROITE",
    "FILLON":"DROITE",
    "HAMON":"GAUCHE",
    "HIDALGO":"GAUCHE",
    "JADOT":"GAUCHE",
    "LASSALLE":"DROITE",
    "LE PEN":"EDROITE",
    "MACRON":"CENTRE",
    "MÉLENCHON":"GAUCHE",
    "POUTOU":"EGAUCHE",
    "PÉCRESSE":"DROITE",
    "ROUSSEL":"EGAUCHE",
    "ZEMMOUR":"EDROITE"
}

In [18]:
for libelle, nuance in missing_nuances_libelle.items():
    fillNuance(df_election, "Libellé Abrégé Liste", libelle, nuance)

for nom, nuance in missing_nuances_nom.items():
    fillNuance(df_election, "Nom", nom, nuance)

In [19]:
import urllib.request
import json
with urllib.request.urlopen('https://www.dropbox.com/scl/fi/trxktezbq12phyqptj545/nuances.json?rlkey=5u4368x6evhct81slh20ffzii&st=mw31zc2i&dl=1') as outfile:
     dic_nuances = json.load(outfile)

In [20]:
def replaceValue(nom_nuance, dic_nuances):
    for nuance, list_nom_nuance in dic_nuances.items():
        if nom_nuance in list_nom_nuance:
            return nuance
    return nom_nuance

df_election['Nuance'] = df_election['Nuance'].apply(lambda nom_nuance: replaceValue(nom_nuance, dic_nuances))

In [21]:
del missing_nuances_nom
del missing_nuances_libelle

In [22]:
df_election = df_election[~df_election["id_election"].str.contains(r'muni|dpmt|regi|cant', na=False)]

In [23]:
df_election = df_election[["id_election","Code du département","Code de la commune", "Voix","Code du b.vote", "% Voix/Ins", "Nuance"]]

In [24]:
df_election[["annee", "type_election", "tour"]] = df_election["id_election"].str.split("_", expand=True)

In [25]:
# df_election["Code de la commune"] = df_election.apply(lambda row: row["Code du département"] + row["Code de la commune"], axis=1)
df_election['Code de la commune'] = df_election['Code du département'] + df_election['Code de la commune']

In [26]:
df_election["Code de la commune"] = pd.to_numeric(df_election["Code de la commune"], errors='coerce')

In [27]:
df_election.dropna(subset=["Code du département", "Code de la commune"], inplace=True)
df_election["Code de la commune"] = df_election["Code de la commune"].astype(int)

In [28]:
df_election.rename(columns={
    "Code du département":"dep",
    "Code de la commune":"codecommune",
    "Code du b.vote":"bureau",
    "% Voix/Ins":"pourcentage_voix_inscrits",
    "Nuance":"nuance",
    "Voix":"nb_voix"
}, inplace=True)

In [29]:
df_election = spark.createDataFrame(df_election).cache()

In [30]:
df_election = df_election.groupBy("id_election", "codecommune", "bureau", "annee", "type_election", "tour").pivot("nuance").agg(F.sum("pourcentage_voix_inscrits")) \
                        .select(
                            "id_election",
                            "codecommune",
                            "bureau",
                            "annee",
                            "type_election",
                            "tour",
                            col("AUTRE").alias("autre_pourcentage_voix_inscrits"),
                            col("EGAUCHE").alias("egauche_pourcentage_voix_inscrits"),
                            col("GAUCHE").alias("gauche_pourcentage_voix_inscrits"),
                            col("CGAUCHE").alias("cgauche_pourcentage_voix_inscrits"),
                            col("CENTRE").alias("centre_pourcentage_voix_inscrits"),
                            col("CDROIT").alias("cdroit_pourcentage_voix_inscrits"),
                            col("DROITE").alias("droite_pourcentage_voix_inscrits"),
                            col("EDROITE").alias("edroite_pourcentage_voix_inscrits")
                       )\
                        .join(
                            df_election.groupBy("id_election", "codecommune", "bureau", "annee", "type_election", "tour").pivot("nuance").agg(F.sum("nb_voix"))\
                            .select(
                                "id_election",
                                "codecommune",
                                "bureau",
                                "annee",
                                "type_election",
                                "tour",
                                col("AUTRE").alias("autre_nb_voix"),
                                col("EGAUCHE").alias("egauche_nb_voix"),
                                col("GAUCHE").alias("gauche_nb_voix"),
                                col("CGAUCHE").alias("cgauche_nb_voix"),
                                col("CENTRE").alias("centre_nb_voix"),
                                col("CDROIT").alias("cdroit_nb_voix"),
                                col("DROITE").alias("droite_nb_voix"),
                                col("EDROITE").alias("edroite_nb_voix")
                            ), ["id_election", "codecommune", "bureau", "annee", "type_election", "tour"]
                        )

In [31]:
# for col in df_election_test.columns[6:6+8]:
#     df_election_test = df_election_test.withColumnRenamed(col, col.lower() + "_pourcentage")


# for col in df_election_test.columns[6+8:]:
#     df_election_test = df_election_test.withColumnRenamed(col, col.lower() + "_nb_voix")

In [32]:
df_election.printSchema()

root
 |-- id_election: string (nullable = true)
 |-- codecommune: long (nullable = true)
 |-- bureau: string (nullable = true)
 |-- annee: string (nullable = true)
 |-- type_election: string (nullable = true)
 |-- tour: string (nullable = true)
 |-- autre_pourcentage_voix_inscrits: double (nullable = true)
 |-- egauche_pourcentage_voix_inscrits: double (nullable = true)
 |-- gauche_pourcentage_voix_inscrits: double (nullable = true)
 |-- cgauche_pourcentage_voix_inscrits: double (nullable = true)
 |-- centre_pourcentage_voix_inscrits: double (nullable = true)
 |-- cdroit_pourcentage_voix_inscrits: double (nullable = true)
 |-- droite_pourcentage_voix_inscrits: double (nullable = true)
 |-- edroite_pourcentage_voix_inscrits: double (nullable = true)
 |-- autre_nb_voix: double (nullable = true)
 |-- egauche_nb_voix: double (nullable = true)
 |-- gauche_nb_voix: double (nullable = true)
 |-- cgauche_nb_voix: double (nullable = true)
 |-- centre_nb_voix: double (nullable = true)
 |-- cdroi

In [33]:
colonnes_a_remplir = [
    "autre_pourcentage_voix_inscrits",
    "egauche_pourcentage_voix_inscrits",
    "gauche_pourcentage_voix_inscrits",
    "cgauche_pourcentage_voix_inscrits",
    "centre_pourcentage_voix_inscrits",
    "cdroit_pourcentage_voix_inscrits",
    "droite_pourcentage_voix_inscrits",
    "edroite_pourcentage_voix_inscrits",
    "autre_nb_voix",
    "egauche_nb_voix",
    "gauche_nb_voix",
    "cgauche_nb_voix",
    "centre_nb_voix",
    "cdroit_nb_voix",
    "droite_nb_voix",
    "edroite_nb_voix"
]
df_election = df_election.fillna(0, subset=colonnes_a_remplir)

del colonnes_a_remplir

df_election = df_election.withColumn("nb_voix", F.expr(
    "autre_nb_voix \
    + egauche_nb_voix \
    + gauche_nb_voix \
    + cgauche_nb_voix \
    + centre_nb_voix \
    + cdroit_nb_voix \
    + droite_nb_voix \
    + edroite_nb_voix"
))

df_election = df_election.withColumn("pourcentage_voix_inscrits", F.expr(
    "autre_pourcentage_voix_inscrits \
    + egauche_pourcentage_voix_inscrits \
    + gauche_pourcentage_voix_inscrits \
    + cgauche_pourcentage_voix_inscrits \
    + centre_pourcentage_voix_inscrits \
    + cdroit_pourcentage_voix_inscrits \
    + droite_pourcentage_voix_inscrits \
    + edroite_pourcentage_voix_inscrits"
))

In [34]:
df_election = df_election.withColumn("nb_inscrits", F.round(F.expr("(nb_voix * 100) / pourcentage_voix_inscrits"), 0))

In [35]:
df_election = df_election.drop("bureau").groupBy("id_election", "codecommune", "annee", "type_election", "tour") \
    .agg(
        F.sum("nb_inscrits").alias("nb_inscrits"),
        F.sum("nb_voix").alias("nb_voix"),
        F.sum("autre_nb_voix").alias("autre_nb_voix"),
        F.sum("egauche_nb_voix").alias("egauche_nb_voix"),
        F.sum("gauche_nb_voix").alias("gauche_nb_voix"),
        F.sum("cgauche_nb_voix").alias("cgauche_nb_voix"),
        F.sum("centre_nb_voix").alias("centre_nb_voix"),
        F.sum("cdroit_nb_voix").alias("cdroit_nb_voix"),
        F.sum("droite_nb_voix").alias("droite_nb_voix"),
        F.sum("edroite_nb_voix").alias("edroite_nb_voix")
    )\
    .withColumn("autre_pourcentage_voix_inscrits", (F.col("autre_nb_voix") / F.col("nb_inscrits")) * 100) \
    .withColumn("egauche_pourcentage_voix_inscrits", (F.col("egauche_nb_voix") / F.col("nb_inscrits")) * 100) \
    .withColumn("gauche_pourcentage_voix_inscrits", (F.col("gauche_nb_voix") / F.col("nb_inscrits")) * 100) \
    .withColumn("cgauche_pourcentage_voix_inscrits", (F.col("cgauche_nb_voix") / F.col("nb_inscrits")) * 100) \
    .withColumn("centre_pourcentage_voix_inscrits", (F.col("centre_nb_voix") / F.col("nb_inscrits")) * 100) \
    .withColumn("cdroit_pourcentage_voix_inscrits", (F.col("cdroit_nb_voix") / F.col("nb_inscrits")) * 100) \
    .withColumn("droite_pourcentage_voix_inscrits", (F.col("droite_nb_voix") / F.col("nb_inscrits")) * 100) \
    .withColumn("edroite_pourcentage_voix_inscrits", (F.col("edroite_nb_voix") / F.col("nb_inscrits")) * 100)\
    .withColumn("pourcentage_voix_inscrits", (F.col("nb_voix") / F.col("nb_inscrits")) * 100).withColumn("annee", col("annee").cast("long"))


In [36]:
df_election = df_election.withColumn("annee", col("annee").cast("long"))

In [37]:
# popcommunes.csv
df_demographie = pd.read_csv('https://www.dropbox.com/scl/fi/qayze0v5xxwolw4tj8qmc/popcommunes.csv.zip?rlkey=qnpfvfq0kc096dykwfvz2jrv0&st=gk60a3ff&dl=1', compression='zip', header=0, sep=',', quotechar='"')

# cspcommunes.csv
df_csp = pd.read_csv('https://www.dropbox.com/scl/fi/opg91qcti1ctwmopr4efj/cspcommunes.csv.zip?rlkey=h8pti3tzoo7tqjctc2h1rfeqx&st=1mnddf1h&dl=1', compression='zip', header=0, sep=',', quotechar='"')

# df_criminalite = pd.read_csv('https://drive.google.com/uc?export=download&id=1F9Jm0UaemicMy4MnvZ47nmZHiIdsmRwC')

  df_demographie = pd.read_csv('https://www.dropbox.com/scl/fi/qayze0v5xxwolw4tj8qmc/popcommunes.csv.zip?rlkey=qnpfvfq0kc096dykwfvz2jrv0&st=gk60a3ff&dl=1', compression='zip', header=0, sep=',', quotechar='"')
  df_csp = pd.read_csv('https://www.dropbox.com/scl/fi/opg91qcti1ctwmopr4efj/cspcommunes.csv.zip?rlkey=h8pti3tzoo7tqjctc2h1rfeqx&st=1mnddf1h&dl=1', compression='zip', header=0, sep=',', quotechar='"')


In [38]:
# Using pandas

df_demographie_filtre = df_demographie.filter(regex="^(dep|nomdep|codecommune|nomcommune|reg|nomreg)$|^pop[0-9]{4}$")

df_csp_filtre = df_csp.filter(regex="^(dep|nomdep|codecommune|nomcommune|agri|indp|cadr|pint|empl|ouvr|pact|chom)(\d{4})?$")

In [29]:
# # 1. Create the dimension table for commune data (df_dimension_commune)
# df_dimension_commune = df_demographie_filtre[['dep', 'codecommune', 'nomcommune', 'reg']].drop_duplicates()

# # 2. Extract population columns (columns starting with "pop" followed by 4 digits)
# pop_columns = [col for col in df_demographie_filtre.columns if col.startswith('pop') and len(col) == 7 and col[3:].isdigit()]

# # 3. Pivot the population columns into long format for the facts table (df_faits)
# # We use the `melt` function to transform wide to long format
# df_faits = pd.melt(df_demographie_filtre, 
#                    id_vars=['codecommune'],  # Keep 'codecommune' as ID column
#                    value_vars=pop_columns,   # These are the population columns to melt
#                    var_name='annee',          # Name of the new 'annee' column
#                    value_name='population')  # Name of the new 'population' column

# # Extract annee from 'popXXXX' column
# df_faits['annee'] = df_faits['annee'].str[-4:]

# print("Dimension Commune Table:")
# print(df_dimension_commune)

# print("\nFacts Table (Population Data):")
# print(df_faits)

In [39]:
# Using pySpark

df_demographie_filtre = spark.createDataFrame(df_demographie_filtre)

  Could not convert '28' with type str: tried to convert to int64
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [40]:
df_csp_filtre = spark.createDataFrame(df_csp_filtre)

  Could not convert '29' with type str: tried to convert to int64
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [41]:
# # Using pySpark


# # 1. Create the dimension table for commune data (df_dimension_commune)
# df_dimension_commune = df_demographie_filtre.select("dep", "codecommune", 'nomcommune', "reg").distinct()

# # 2. Extract population columns (columns starting with "pop" followed by 4 digits)
# pop_columns = [col for col in df_demographie_filtre.columns if col.startswith('pop') and len(col) == 7 and col[3:].isdigit()]

# # 3. Pivot the population columns into long format for the facts table (df_faits)
# df_faits = df_demographie_filtre \
#     .withColumn("annee_population", F.explode(F.array(
#         *[F.struct(F.lit(col[-4:]).alias("annee"), F.col(col).alias("population")) for col in pop_columns]
#     )))

# # Split the "annee_population" struct into separate columns for annee and population
# df_faits = df_faits.withColumn("annee", F.col("annee_population.annee")) \
#                    .withColumn("population", F.col("annee_population.population")) \
#                    .drop("annee_population")

# # 4. Join the facts table with the commune table to include commune_id in df_faits
# df_faits = df_faits.select("codecommune", "annee", "population")

# # Display the results
# df_dimension_commune.show()
# df_faits.show()

In [42]:
def getColumnsWithStringAndFourDigits(str, df):
    return [col for col in df.columns if col.startswith(str) and len(col) == len(str)+4 and col[-4:].isdigit()]

In [43]:
# # popcommunes

# # 1. Optimize commune dimension table creation by dropping duplicates
# df_dimension_commune = df_demographie_filtre.select("dep", "codecommune", 'nomcommune', "reg").drop_duplicates()

# # 2. Extract population columns (columns starting with "pop" followed by 4 digits)
# pop_columns = getColumnsWithStringAndFourDigits("pop", df_demographie_filtre)

# # 3. Use stack() to pivot the population columns more efficiently for large datasets
# df_faits = df_demographie_filtre.select("codecommune", 
#                                         F.expr("stack({0}, {1})".format(
#                                             len(pop_columns),
#                                             ','.join([f"'{col[-4:]}', {col}" for col in pop_columns])
#                                         )).alias("annee", "population"))

# # Display the results
# # df_dimension_commune.show()
# # df_faits.show()

In [44]:
# cspcommunes

from collections import OrderedDict

# 1. Optimize commune dimension table creation by dropping duplicates
df_dimension_commune = df_demographie_filtre.select("dep","nomdep", "codecommune", 'nomcommune', "reg", "nomreg").join(
                        df_csp_filtre.select("dep","nomdep", "codecommune", 'nomcommune'), on=["dep", "codecommune", "nomdep", 'nomcommune'], how="outer").drop_duplicates().dropna(subset=["reg", "nomreg"]).cache()

# 2. Extract population columns (columns starting with "pop" followed by 4 digits)
pop_columns = getColumnsWithStringAndFourDigits("pop", df_demographie_filtre)

csp_columns = OrderedDict()
for str in ["agri","indp","cadr","pint","empl","ouvr","pact", "chom"]:
    csp_columns[str] = getColumnsWithStringAndFourDigits(str, df_csp_filtre)

annees_csp = OrderedDict()
for key, col_list in csp_columns.items():
    for col_name in col_list:
        annees_csp.setdefault(col_name[-4:], []).append(col_name) 


In [45]:

# 3. Use stack() to pivot the population columns more efficiently for large datasets
df_faits_population = df_demographie_filtre.select("codecommune", F.expr("stack({0}, {1})".format(
                                                        len(pop_columns), ','.join([f"'{col[-4:]}', {col}" for col in pop_columns])
                                        )).alias("annee", "population"))


cast="CAST("
as_double=" AS DOUBLE)"

df_faits_csp = df_csp_filtre.select("codecommune", 
                                F.expr("stack({0}, {1})".format(len(annees_csp),
                                ','.join([f"'{annee}', {', '.join([cast + s + as_double for s in col_name])}" for annee, col_name in annees_csp.items()])
                                )).alias("annee", "agriculteurs", "independants", "cadres", "intermediaires", "employes", "ouvriers", "population_active_totale", "chomeurs"))

# df_faits = df_faits_population.join(df_faits_csp, on=["codecommune", "annee"], how="outer")
df_faits = df_faits_csp.join(df_faits_population, on=["codecommune", "annee"], how="outer").cache()

# Display the results
# print(df_dimension_commune.count())
# print(df_faits.count())

34856
8667198


In [46]:
df_dimension_commune = df_dimension_commune.withColumn("codecommune", col("codecommune").cast("long")).withColumn("dep", col("dep").cast("long"))

In [47]:
df_faits = df_faits.na.replace(float("nan"), None)
df_faits = df_faits.withColumn("codecommune", col("codecommune").cast("long")).withColumn("annee", col("annee").cast("long"))

In [48]:
df_faits = df_faits.join(df_election, on=["codecommune", "annee"], how="outer")

In [50]:
df_faits = df_faits.join(delinquance_brut, on=["codecommune", "annee"], how="outer")

In [57]:
df_faits = df_faits.join(df_dimension_commune, on=["codecommune"], how="outer")

In [56]:
df_faits.sample(0.000001).show()

+-----------+-----+------------+------------+------+--------------+--------+--------+------------------------+--------+----------+-----------+-------------+----+-----------+-------+-------------+---------------+--------------+---------------+--------------+--------------+--------------+---------------+-------------------------------+---------------------------------+--------------------------------+---------------------------------+--------------------------------+--------------------------------+--------------------------------+---------------------------------+-------------------------+-------------+
|codecommune|annee|agriculteurs|independants|cadres|intermediaires|employes|ouvriers|population_active_totale|chomeurs|population|id_election|type_election|tour|nb_inscrits|nb_voix|autre_nb_voix|egauche_nb_voix|gauche_nb_voix|cgauche_nb_voix|centre_nb_voix|cdroit_nb_voix|droite_nb_voix|edroite_nb_voix|autre_pourcentage_voix_inscrits|egauche_pourcentage_voix_inscrits|gauche_pourcentage_vo

In [54]:
df_faits = df_faits.withColumn("agriculteurs", col("agriculteurs").cast("long")) \
                    .withColumn("independants", col("independants").cast("long"))\
                    .withColumn("cadres", col("cadres").cast("long"))\
                    .withColumn("intermediaires", col("intermediaires").cast("long"))\
                    .withColumn("employes", col("employes").cast("long"))\
                    .withColumn("ouvriers", col("ouvriers").cast("long"))\
                    .withColumn("population_active_totale", col("population_active_totale").cast("long"))\
                    .withColumn("chomeurs", col("chomeurs").cast("long"))\
                    .withColumn("nb_inscrits", col("nb_inscrits").cast("long"))\
                    .withColumn("nb_voix", col("nb_voix").cast("long"))\
                    .withColumn("autre_nb_voix", col("autre_nb_voix").cast("long"))\
                    .withColumn("egauche_nb_voix", col("egauche_nb_voix").cast("long"))\
                    .withColumn("gauche_nb_voix", col("gauche_nb_voix").cast("long"))\
                    .withColumn("cgauche_nb_voix", col("cgauche_nb_voix").cast("long"))\
                    .withColumn("centre_nb_voix", col("centre_nb_voix").cast("long"))\
                    .withColumn("cdroit_nb_voix", col("cdroit_nb_voix").cast("long"))\
                    .withColumn("droite_nb_voix", col("droite_nb_voix").cast("long"))\
                    .withColumn("edroite_nb_voix", col("edroite_nb_voix").cast("long"))

In [60]:
df_faits.printSchema()

root
 |-- codecommune: long (nullable = true)
 |-- annee: long (nullable = true)
 |-- agriculteurs: long (nullable = true)
 |-- independants: long (nullable = true)
 |-- cadres: long (nullable = true)
 |-- intermediaires: long (nullable = true)
 |-- employes: long (nullable = true)
 |-- ouvriers: long (nullable = true)
 |-- population_active_totale: long (nullable = true)
 |-- chomeurs: long (nullable = true)
 |-- population: long (nullable = true)
 |-- id_election: string (nullable = true)
 |-- type_election: string (nullable = true)
 |-- tour: string (nullable = true)
 |-- nb_inscrits: long (nullable = true)
 |-- nb_voix: long (nullable = true)
 |-- autre_nb_voix: long (nullable = true)
 |-- egauche_nb_voix: long (nullable = true)
 |-- gauche_nb_voix: long (nullable = true)
 |-- cgauche_nb_voix: long (nullable = true)
 |-- centre_nb_voix: long (nullable = true)
 |-- cdroit_nb_voix: long (nullable = true)
 |-- droite_nb_voix: long (nullable = true)
 |-- edroite_nb_voix: long (nullable

In [76]:
from pyspark.sql.window import Window

df_faits = df_faits.withColumn("id_commune", F.dense_rank().over(Window.orderBy("codecommune"))) \
                    .withColumn("id_type_election", F.dense_rank().over(Window.orderBy("type_election","tour")))\
                    .withColumn("id_demographie", F.dense_rank().over(Window.orderBy("population")))\
                    .withColumn("id_criminalite", F.dense_rank().over(Window.orderBy("crimes_delits")))\
                    .withColumn("id_emploi", F.dense_rank().over(Window.orderBy("agriculteurs", "independants", "cadres", "intermediaires", "employes", "ouvriers", "population_active_totale", "chomeurs")))\
                    .withColumn("id_annee", F.dense_rank().over(Window.orderBy("annee")))

In [72]:
election_columns = ["id_election","id_commune","id_type_election","id_demographie","id_criminalite","id_emploi","id_annee","nb_inscrits","nb_voix","pourcentage_voix_inscrits","autre_nb_voix","egauche_nb_voix",
                    "gauche_nb_voix","cgauche_nb_voix","centre_nb_voix","cdroit_nb_voix","droite_nb_voix","edroite_nb_voix","autre_pourcentage_voix_inscrits","egauche_pourcentage_voix_inscrits","gauche_pourcentage_voix_inscrits",
                    "cgauche_pourcentage_voix_inscrits","centre_pourcentage_voix_inscrits","cdroit_pourcentage_voix_inscrits","droite_pourcentage_voix_inscrits","edroite_pourcentage_voix_inscrits"]
commune_columns = ["id_commune","codecommune","dep","reg","nomcommune","nomdep","nomreg"]
type_election_columns = ["id_type_election","type_election","tour"]
demographie_columns = ["id_demographie","population"]
criminalite_columns = ["id_criminalite","crimes_delits"]
emploi_columns = ["id_emploi","agriculteurs","independants","cadres", "intermediaires", "employes", "ouvriers", "population_active_totale", "chomeurs"]
annee_columns = ["id_annee","annee"]

In [77]:
df_election = df_faits.select(election_columns).distinct()
df_commune = df_faits.select(commune_columns).distinct()
df_type_election = df_faits.select(type_election_columns).distinct()
df_demographie = df_faits.select(demographie_columns).distinct()
df_criminalite = df_faits.select(criminalite_columns).distinct()
df_emploi = df_faits.select(emploi_columns).distinct()
df_annee = df_faits.select(annee_columns).distinct()

In [83]:
df_commune.count()

38354

In [81]:
df_faits.filter(col("type_election").isNull()).show()

+-----------+-----+------------+------------+------+--------------+--------+--------+------------------------+--------+----------+-----------+-------------+----+-----------+-------+-------------+---------------+--------------+---------------+--------------+--------------+--------------+---------------+-------------------------------+---------------------------------+--------------------------------+---------------------------------+--------------------------------+--------------------------------+--------------------------------+---------------------------------+-------------------------+-------------+----+------------+--------------------+---+------+----------+----------------+--------------+--------------+---------+--------+
|codecommune|annee|agriculteurs|independants|cadres|intermediaires|employes|ouvriers|population_active_totale|chomeurs|population|id_election|type_election|tour|nb_inscrits|nb_voix|autre_nb_voix|egauche_nb_voix|gauche_nb_voix|cgauche_nb_voix|centre_nb_voix|cdroit

In [41]:
# ','.join([f"'{col[-4:]}', {col}" for col in pop_columns])

In [42]:
# Tentative infructueuse en utilisant pandas

# df_faits = pd.DataFrame(columns=["population"])
# df_faits = spark.createDataFrame(df_faits)

# df_dimension_commune = pd.DataFrame(columns=["code_departement", "code_commune", "code_region"])
# df_dimension_commune = spark.createDataFrame(df_dimension_commune)

# df_dimension_annee = pd.DataFrame(columns=["annee"])
# df_dimension_annee = spark.createDataFrame(df_dimension_annee)

# for idx, row in df_demographie_filtre.iterrows():
#     df_dimension_commune.loc[len(df_dimension_commune)] = [row["dep"], row["codecommune"], row["reg"]]
#     for index, value in row.filter(regex="^pop[0-9]{4}$").items():
#         df_faits.loc[len(df_faits)] = [value]
#         df_dimension_annee.loc[len(df_dimension_annee)] = [index[-4:]]

# Écriture des données transformées en base de données avec le driver JDBC

In [84]:
properties = {
    "user": "root",
    "password": "root",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [85]:
df_election.write.jdbc(url="jdbc:mysql://mysql:3306/mspr1", table="resultats_elections_nuance", mode="overwrite", properties=properties)

In [86]:
df_commune.write.jdbc(url="jdbc:mysql://mysql:3306/mspr1", table="commune", mode="overwrite", properties=properties)

In [87]:
df_type_election.write.jdbc(url="jdbc:mysql://mysql:3306/mspr1", table="type_election", mode="overwrite", properties=properties)

In [88]:
df_demographie.write.jdbc(url="jdbc:mysql://mysql:3306/mspr1", table="demographie", mode="overwrite", properties=properties)

In [89]:
df_criminalite.write.jdbc(url="jdbc:mysql://mysql:3306/mspr1", table="criminalite", mode="overwrite", properties=properties)

In [90]:
df_emploi.write.jdbc(url="jdbc:mysql://mysql:3306/mspr1", table="emploi", mode="overwrite", properties=properties)

In [91]:
df_annee.write.jdbc(url="jdbc:mysql://mysql:3306/mspr1", table="annee", mode="overwrite", properties=properties)

In [None]:
delinquance_brut.unpersist()
del delinquance_brut

In [None]:
df_election.unpersist()
del df_election

In [None]:
df_dimension_commune.unpersist()
del df_dimension_commune
df_faits.unpersist()
del df_faits
df_faits_csp.unpersist()
del df_faits_csp
df_faits_population.unpersist()
del df_faits_population
df_csp_filtre.unpersist()
del df_csp_filtre
df_demographie_filtre.unpersist()
del df_demographie_filtre
del df_demographie
del df_csp

In [None]:
spark.stop()

# Générer une table au format CSV avec une description statistique du DataFrame

In [None]:
# df.describe().toPandas().to_csv("describe_summary.csv", index=False)

# Effectuer un échantillonage du DataFrame pour en observer les colonnes qui peuvent sembler pertinentes

In [None]:
# FRACTION_SIZE = 0.00001

In [None]:
# columnsToSample = ['quantity', 'serving_size', 'serving_quantity', 'product_quantity']

In [None]:
# samples_not_clean_df = df.select(columnsToSample).dropna(how='all').cache()

In [None]:
# samples_df = samples_not_clean_df.sample(withReplacement=False, fraction=FRACTION_SIZE).cache()

In [None]:
# samples_df.count()

In [None]:
# samples_df.show()

# Extraire les valeurs uniques d'une colonne afin d'en confirmer la pertinence (colonne ingredients_analysis_tags en exemple)

In [None]:
# def flatten_list(li):
#     flat_list = []
#     for row in li:
#         flat_list += row
#     return flat_list

# def make_list_unique(li):
#     return list(dict.fromkeys(li))

# def split_string_list_elements(li, sep):
#     return [x.split(sep) for x in li]

# def column_to_list(col):
#     return col.rdd.flatMap(lambda x: x).collect()

In [None]:
# ingredients_analysis_list = make_list_unique(flatten_list(split_string_list_elements(column_to_list(df.select('ingredients_analysis_tags').dropna()), ",")))

In [None]:
# ingredients_analysis_list

# Sélection des colonnes nécessaires après étude de celles-ci

In [None]:
# kept_columns = ["code", "product_name", "product_quantity", "energy-kcal_100g", "fat_100g", "saturated-fat_100g", "monounsaturated-fat_100g", "polyunsaturated-fat_100g", "trans-fat_100g", 
#                 "carbohydrates_100g", "sugars_100g", "starch_100g", "fiber_100g", "proteins_100g", "allergens", "traces", "vitamin-a_100g", "vitamin-c_100g", "vitamin-d_100g",
#                 "vitamin-e_100g", "vitamin-k_100g", "vitamin-b1_100g", "vitamin-b2_100g", "vitamin-b6_100g", "vitamin-b9_100g", "vitamin-b12_100g", "calcium_100g",
#                 "iron_100g", "magnesium_100g", "potassium_100g", "zinc_100g", "food_groups_tags", "serving_size", "serving_quantity", "cholesterol_100g", "salt_100g", "glycemic-index_100g"]

In [None]:
# df_kept_columns = df.select(kept_columns)

# Qualité des données

## On supprime les lignes qui ont moins de 15 colonnes très pertinentes non nulles

In [None]:
# columns_to_check = ["code", "product_name", "product_quantity", "energy-kcal_100g", "fat_100g", "saturated-fat_100g", "monounsaturated-fat_100g",
#                     "polyunsaturated-fat_100g", "trans-fat_100g", "carbohydrates_100g", "sugars_100g", "starch_100g", 
#                     "fiber_100g", "proteins_100g", "allergens", "traces", "food_groups_tags", "serving_size", "serving_quantity"]

In [None]:
# df_kept_columns = df_kept_columns.dropna(subset=columns_to_check, thresh=15)

## On supprime les lignes dupliquées et on garde le DF en cache pour éviter de le recalculer

In [None]:
# df_kept_columns = df_kept_columns.dropDuplicates().cache()

In [None]:
# print("Nombre de lignes restantes après le traitement : " + str(df_kept_columns.count()))

## Plus besoin du DataFrame de base donc on le retire de la mémoire cache

In [None]:
# df.unpersist()
# print("Mémoire libérée")