<a href="https://colab.research.google.com/github/Faresffa/Champollion-/blob/main/Spark_Fares.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Exercice 1 : Création et Manipulation de RDDs**


In [None]:
from pyspark import SparkContext

# Initialisation du contexte Spark
sc = SparkContext("local", "Exercice 1: Manipulation de RDDs")

# Création d'un RDD à partir d'une liste de nombres
nombres = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd_nombres = sc.parallelize(nombres)

# 1. Filtrer les nombres pairs
rdd_pairs = rdd_nombres.filter(lambda x: x % 2 == 0)
print("Nombres pairs:", rdd_pairs.collect())

# 2. Calculer le carré de chaque nombre filtré
rdd_carres = rdd_pairs.map(lambda x: x ** 2)
print("Carrés des nombres pairs:", rdd_carres.collect())

# 3. Calculer la somme des nombres obtenus
somme = rdd_carres.reduce(lambda x, y: x + y)
print("Somme des carrés:", somme)

# Fermeture du contexte Spark
sc.stop()

Nombres pairs: [2, 4, 6, 8, 10]
Carrés des nombres pairs: [4, 16, 36, 64, 100]
Somme des carrés: 220


## **Exercice 2 : Opérations de Base sur les DataFrames**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 2: DataFrames").getOrCreate()

# Création d'un DataFrame à partir d'une liste de tuples
employes = [
    ("Alice", "RH", 55000),
    ("Bob", "Informatique", 65000),
    ("Charlie", "Ventes", 48000),
    ("David", "Informatique", 70000),
    ("Eve", "RH", 52000),
    ("Frank", "Ventes", 49000)
]

# Création du DataFrame avec les noms de colonnes
df_employes = spark.createDataFrame(employes, ["Nom", "Departement", "Salaire"])

# 1. Afficher le schéma du DataFrame
print("Schéma du DataFrame:")
df_employes.printSchema()

# 2. Afficher les données
print("Données du DataFrame:")
df_employes.show()

# 3. Filtrer les employés dont le salaire est supérieur à 50000
print("Employés avec un salaire > 50000:")
df_employes.filter(df_employes.Salaire > 50000).show()

# 4. Calculer le salaire moyen par département
print("Salaire moyen par département:")
df_employes.groupBy("Departement").agg(avg("Salaire").alias("Salaire_Moyen")).show()

# Arrêt de la session Spark
spark.stop()

Schéma du DataFrame:
root
 |-- Nom: string (nullable = true)
 |-- Departement: string (nullable = true)
 |-- Salaire: long (nullable = true)

Données du DataFrame:
+-------+------------+-------+
|    Nom| Departement|Salaire|
+-------+------------+-------+
|  Alice|          RH|  55000|
|    Bob|Informatique|  65000|
|Charlie|      Ventes|  48000|
|  David|Informatique|  70000|
|    Eve|          RH|  52000|
|  Frank|      Ventes|  49000|
+-------+------------+-------+

Employés avec un salaire > 50000:
+-----+------------+-------+
|  Nom| Departement|Salaire|
+-----+------------+-------+
|Alice|          RH|  55000|
|  Bob|Informatique|  65000|
|David|Informatique|  70000|
|  Eve|          RH|  52000|
+-----+------------+-------+

Salaire moyen par département:
+------------+-------------+
| Departement|Salaire_Moyen|
+------------+-------------+
|Informatique|      67500.0|
|      Ventes|      48500.0|
|          RH|      53500.0|
+------------+-------------+



## **Exercice 3 : Chargement de Données à Partir d'un Fichier CSV**

In [None]:
import csv

# Nom du fichier CSV à créer
nom_fichier = "produits.csv"

# Données à écrire dans le fichier CSV
donnees = [
    ["ID", "Nom", "Categorie", "Prix"],  # En-têtes
    [1, "iPhone", "Electronique", 999.99],
    [2, "Galaxy S22", "Electronique", 899.99],
    [3, "MacBook Pro", "Electronique", 1999.99],
    [4, "T-shirt", "Vetements", 19.99],
    [5, "Jeans", "Vetements", 59.99],
    [6, "Chaussures", "Chaussures", 89.99],
    [7, "Ballon de football", "Sport", 29.99],
    [8, "Raquette de tennis", "Sport", 149.99]
]

# Création du fichier CSV
with open(nom_fichier, mode="w", newline="", encoding="utf-8") as fichier:
    writer = csv.writer(fichier)
    writer.writerows(donnees)

print(f"Le fichier {nom_fichier} a été créé avec succès !")

Le fichier produits.csv a été créé avec succès !


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 3: CSV").getOrCreate()

# Supposons que nous avons un fichier produits.csv avec cette structure:
# ID,Nom,Categorie,Prix
# 1,iPhone,Electronique,999.99
# 2,Galaxy S22,Electronique,899.99
# 3,T-shirt,Vetements,19.99
# ...



# Création du DataFrame
#df_produits = spark.createDataFrame(produits_data, ["ID", "Nom", "Categorie", "Prix"])

# Commentez les lignes ci-dessus et décommentez la ligne ci-dessous pour charger un vrai fichier CSV
df_produits = spark.read.option("header", "true").option("inferSchema", "true").csv("produits.csv")

# 1. Afficher les 5 premières lignes
print("Les 5 premières lignes:")
df_produits.show(5)

# 2. Filtrer les produits d'une catégorie spécifique (Electronique)
print("Produits électroniques:")
df_produits.filter(df_produits.Categorie == "Electronique").show()

# 3. Calculer le prix moyen des produits par catégorie
print("Prix moyen par catégorie:")
df_produits.groupBy("Categorie").agg(avg("Prix").alias("Prix_Moyen")).show()

# Arrêt de la session Spark
spark.stop()

Les 5 premières lignes:
+---+-----------+------------+-------+
| ID|        Nom|   Categorie|   Prix|
+---+-----------+------------+-------+
|  1|     iPhone|Electronique| 999.99|
|  2| Galaxy S22|Electronique| 899.99|
|  3|MacBook Pro|Electronique|1999.99|
|  4|    T-shirt|   Vetements|  19.99|
|  5|      Jeans|   Vetements|  59.99|
+---+-----------+------------+-------+
only showing top 5 rows

Produits électroniques:
+---+-----------+------------+-------+
| ID|        Nom|   Categorie|   Prix|
+---+-----------+------------+-------+
|  1|     iPhone|Electronique| 999.99|
|  2| Galaxy S22|Electronique| 899.99|
|  3|MacBook Pro|Electronique|1999.99|
+---+-----------+------------+-------+

Prix moyen par catégorie:
+------------+-----------------+
|   Categorie|       Prix_Moyen|
+------------+-----------------+
|   Vetements|            39.99|
|  Chaussures|            89.99|
|       Sport|89.99000000000001|
|Electronique|          1299.99|
+------------+-----------------+



## **Exercice 4 : Utilisation de Fonctions UDF (User Defined Functions) texte en gras**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 4: UDF").getOrCreate()

# Chargement des données depuis le fichier CSV
df_produits = spark.read.option("header", "true").option("inferSchema", "true").csv("produits.csv")

# Vérification du schéma pour s'assurer que "Prix" est bien un type numérique
df_produits.printSchema()

# Définition de la fonction UDF pour catégoriser les prix
def categorie_prix(prix, seuil=100):
    if prix >= seuil:
        return "Élevé"
    else:
        return "Bas"

# Enregistrement de la fonction UDF
categorie_prix_udf = udf(categorie_prix, StringType())

# Application de l'UDF au DataFrame
df_produits_avec_categorie = df_produits.withColumn("Categorie_Prix", categorie_prix_udf(df_produits["Prix"]))

# Affichage du résultat
print("Produits avec catégorie de prix:")
df_produits_avec_categorie.show()

# Arrêt de la session Spark
spark.stop()


root
 |-- ID: integer (nullable = true)
 |-- Nom: string (nullable = true)
 |-- Categorie: string (nullable = true)
 |-- Prix: double (nullable = true)

Produits avec catégorie de prix:
+---+------------------+------------+-------+--------------+
| ID|               Nom|   Categorie|   Prix|Categorie_Prix|
+---+------------------+------------+-------+--------------+
|  1|            iPhone|Electronique| 999.99|         Élevé|
|  2|        Galaxy S22|Electronique| 899.99|         Élevé|
|  3|       MacBook Pro|Electronique|1999.99|         Élevé|
|  4|           T-shirt|   Vetements|  19.99|           Bas|
|  5|             Jeans|   Vetements|  59.99|           Bas|
|  6|        Chaussures|  Chaussures|  89.99|           Bas|
|  7|Ballon de football|       Sport|  29.99|           Bas|
|  8|Raquette de tennis|       Sport| 149.99|         Élevé|
+---+------------------+------------+-------+--------------+



## **Exercice 5 : Jointure de Deux DataFrames**

In [None]:
from pyspark.sql import SparkSession

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 5: Jointures").getOrCreate()

# Création du DataFrame des commandes
commandes_data = [
    (1, 101, 150.0),
    (2, 102, 200.5),
    (3, 101, 300.0),
    (4, 103, 550.75),
    (5, 104, 170.25),
    (6, 102, 125.0)
]
df_commandes = spark.createDataFrame(commandes_data, ["ID_Commande", "ID_Client", "Montant"])

# Création du DataFrame des clients
clients_data = [
    (101, "Alice", "France"),
    (102, "Bob", "Belgique"),
    (103, "Charlie", "Suisse"),
    (104, "David", "France"),
    (105, "Eve", "Allemagne")
]
df_clients = spark.createDataFrame(clients_data, ["ID_Client", "Nom", "Pays"])

# Affichage des DataFrames sources
print("DataFrame des commandes:")
df_commandes.show()

print("DataFrame des clients:")
df_clients.show()

# Jointure des deux DataFrames sur ID_Client
df_commandes_clients = df_commandes.join(df_clients, "ID_Client", "inner")

# Affichage du résultat de la jointure
print("Résultat de la jointure:")
df_commandes_clients.show()

# Sélection des colonnes pertinentes
df_commandes_clients_select = df_commandes_clients.select("ID_Commande", "Nom", "Pays", "Montant")

# Affichage du résultat final
print("Résultat final avec les colonnes sélectionnées:")
df_commandes_clients_select.show()

# Arrêt de la session Spark
spark.stop()

DataFrame des commandes:
+-----------+---------+-------+
|ID_Commande|ID_Client|Montant|
+-----------+---------+-------+
|          1|      101|  150.0|
|          2|      102|  200.5|
|          3|      101|  300.0|
|          4|      103| 550.75|
|          5|      104| 170.25|
|          6|      102|  125.0|
+-----------+---------+-------+

DataFrame des clients:
+---------+-------+---------+
|ID_Client|    Nom|     Pays|
+---------+-------+---------+
|      101|  Alice|   France|
|      102|    Bob| Belgique|
|      103|Charlie|   Suisse|
|      104|  David|   France|
|      105|    Eve|Allemagne|
+---------+-------+---------+

Résultat de la jointure:
+---------+-----------+-------+-------+--------+
|ID_Client|ID_Commande|Montant|    Nom|    Pays|
+---------+-----------+-------+-------+--------+
|      101|          1|  150.0|  Alice|  France|
|      101|          3|  300.0|  Alice|  France|
|      102|          2|  200.5|    Bob|Belgique|
|      102|          6|  125.0|    Bob|Be

### ***Exercice 6 : Agrégation et Groupement***

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import desc

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 6: Agrégation").getOrCreate()

# Utilisation des mêmes DataFrames que l'exercice 5
commandes_data = [
    (1, 101, 150.0),
    (2, 102, 200.5),
    (3, 101, 300.0),
    (4, 103, 550.75),
    (5, 104, 170.25),
    (6, 102, 125.0)
]
df_commandes = spark.createDataFrame(commandes_data, ["ID_Commande", "ID_Client", "Montant"])

clients_data = [
    (101, "Alice", "France"),
    (102, "Bob", "Belgique"),
    (103, "Charlie", "Suisse"),
    (104, "David", "France"),
    (105, "Eve", "Allemagne")
]
df_clients = spark.createDataFrame(clients_data, ["ID_Client", "Nom", "Pays"])

# Jointure des DataFrames
df_joint = df_commandes.join(df_clients, "ID_Client", "inner")

# Calcul du montant total des commandes par pays
df_montant_par_pays = df_joint.groupBy("Pays").agg(spark_sum("Montant").alias("Montant_Total"))

# Tri par montant décroissant
df_montant_tri = df_montant_par_pays.orderBy(desc("Montant_Total"))

# Affichage du résultat
print("Montant total des commandes par pays (trié par montant décroissant):")
df_montant_tri.show()

# Arrêt de la session Spark
spark.stop()

Montant total des commandes par pays (trié par montant décroissant):
+--------+-------------+
|    Pays|Montant_Total|
+--------+-------------+
|  France|       620.25|
|  Suisse|       550.75|
|Belgique|        325.5|
+--------+-------------+



### ***Exercice 7 : Gestion des Valeurs Manquantes***

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, when

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 7: Valeurs Manquantes").getOrCreate()

# Création d'un DataFrame avec des valeurs manquantes
employes_data = [
    ("Alice", "RH", 55000),
    ("Bob", "Informatique", None),
    ("Charlie", "Ventes", 48000),
    ("David", "Informatique", 70000),
    ("Eve", None, 52000),
    ("Frank", "Ventes", None)
]
df_employes = spark.createDataFrame(employes_data, ["Nom", "Departement", "Salaire"])

# Affichage du DataFrame original
print("DataFrame original avec valeurs manquantes:")
df_employes.show()

# Comptage des valeurs manquantes par colonne
print("Nombre de valeurs manquantes par colonne:")
for col_name in df_employes.columns:
    null_count = df_employes.filter(col(col_name).isNull()).count()
    print(f"Colonne {col_name}: {null_count} valeurs manquantes")

# Méthode 1: Suppression des lignes avec des valeurs manquantes
df_sans_nulls = df_employes.dropna()
print("\nMéthode 1: Suppression des lignes avec des valeurs manquantes:")
df_sans_nulls.show()

# Méthode 2: Remplacement des valeurs manquantes par des constantes
df_rempli_constantes = df_employes.fillna({
    "Departement": "Non affecté",
    "Salaire": 0
})
print("\nMéthode 2: Remplacement par des constantes:")
df_rempli_constantes.show()

# Méthode 3: Remplacement des salaires manquants par la moyenne des salaires
# D'abord, calculer la moyenne des salaires non-nuls
salaire_moyen = df_employes.filter(col("Salaire").isNotNull()).select(avg("Salaire")).collect()[0][0]
print(f"\nSalaire moyen: {salaire_moyen}")

# Ensuite, remplacer les salaires manquants par cette moyenne
df_rempli_moyenne = df_employes.fillna({
    "Departement": "Non affecté"
}).withColumn(
    "Salaire",
    when(col("Salaire").isNull(), salaire_moyen).otherwise(col("Salaire"))
)
print("\nMéthode 3: Remplacement des salaires par la moyenne:")
df_rempli_moyenne.show()

# Arrêt de la session Spark
spark.stop()

DataFrame original avec valeurs manquantes:
+-------+------------+-------+
|    Nom| Departement|Salaire|
+-------+------------+-------+
|  Alice|          RH|  55000|
|    Bob|Informatique|   NULL|
|Charlie|      Ventes|  48000|
|  David|Informatique|  70000|
|    Eve|        NULL|  52000|
|  Frank|      Ventes|   NULL|
+-------+------------+-------+

Nombre de valeurs manquantes par colonne:
Colonne Nom: 0 valeurs manquantes
Colonne Departement: 1 valeurs manquantes
Colonne Salaire: 2 valeurs manquantes

Méthode 1: Suppression des lignes avec des valeurs manquantes:
+-------+------------+-------+
|    Nom| Departement|Salaire|
+-------+------------+-------+
|  Alice|          RH|  55000|
|Charlie|      Ventes|  48000|
|  David|Informatique|  70000|
+-------+------------+-------+


Méthode 2: Remplacement par des constantes:
+-------+------------+-------+
|    Nom| Departement|Salaire|
+-------+------------+-------+
|  Alice|          RH|  55000|
|    Bob|Informatique|      0|
|Charli

# ***Exercice 8 : Partitionnement et Optimisation***

In [None]:
from pyspark.sql import SparkSession
import time

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 8: Partitionnement").getOrCreate()

# Génération de données pour l'exemple
from pyspark.sql.functions import lit, expr
import string
import random

# Fonction pour générer des données aléatoires
def generate_random_data(n):
    return [(
        i,
        ''.join(random.choices(string.ascii_letters, k=10)),
        random.randint(18, 65),
        random.choice(["A", "B", "C", "D", "E"]),
        random.uniform(1000, 10000)
    ) for i in range(n)]

# Génération de 100 000 lignes de données ()
data_size = 100000
data = generate_random_data(data_size)
df = spark.createDataFrame(data, ["ID", "Nom", "Age", "Categorie", "Valeur"])

# Affichage d'un échantillon du DataFrame
print("Échantillon du DataFrame:")
df.show(5)

# Vérification du nombre de partitions par défaut
num_partitions_default = df.rdd.getNumPartitions()
print(f"Nombre de partitions par défaut: {num_partitions_default}")

# Benchmark avec le nombre de partitions par défaut
start_time = time.time()
df.groupBy("Categorie").count().show()
default_time = time.time() - start_time
print(f"Temps d'exécution avec {num_partitions_default} partitions: {default_time:.2f} secondes")

# Repartitionnement du DataFrame avec un nombre différent de partitions
num_partitions_new = 8  # Ajustez selon votre machine
df_repartitionne = df.repartition(num_partitions_new)

# Benchmark avec le nouveau nombre de partitions
start_time = time.time()
df_repartitionne.groupBy("Categorie").count().show()
new_time = time.time() - start_time
print(f"Temps d'exécution avec {num_partitions_new} partitions: {new_time:.2f} secondes")

# Optimisation avec le partitionnement par clé (ici, Categorie)
df_partitionne_par_cle = df.repartition("Categorie")

# Benchmark avec partitionnement par clé
start_time = time.time()
df_partitionne_par_cle.groupBy("Categorie").count().show()
key_time = time.time() - start_time
print(f"Temps d'exécution avec partitionnement par clé: {key_time:.2f} secondes")

# Mise en cache pour améliorer les performances
df_cache = df.cache()

# Benchmark avec mise en cache
start_time = time.time()
df_cache.groupBy("Categorie").count().show()
cache_time = time.time() - start_time
print(f"Temps d'exécution avec mise en cache: {cache_time:.2f} secondes")

# Deuxième exécution sur le DataFrame en cache
start_time = time.time()
df_cache.groupBy("Categorie").count().show()
cache_time2 = time.time() - start_time
print(f"Temps d'exécution pour la deuxième exécution avec cache: {cache_time2:.2f} secondes")

# Comparaison des performances
print("\nComparaison des performances:")
print(f"Partitions par défaut ({num_partitions_default}): {default_time:.2f} secondes")
print(f"Repartitionnement ({num_partitions_new}): {new_time:.2f} secondes")
print(f"Partitionnement par clé: {key_time:.2f} secondes")
print(f"Mise en cache (première exécution): {cache_time:.2f} secondes")
print(f"Mise en cache (deuxième exécution): {cache_time2:.2f} secondes")

# Arrêt de la session Spark
spark.stop()

Échantillon du DataFrame:
+---+----------+---+---------+------------------+
| ID|       Nom|Age|Categorie|            Valeur|
+---+----------+---+---------+------------------+
|  0|XXOcWqUkjR| 32|        A| 5156.828501116695|
|  1|bQDbSJOgBY| 40|        C| 5202.276294863273|
|  2|PGxcGrYnKd| 30|        D| 7573.119742411222|
|  3|SIJnjCaxdc| 36|        B|1090.9879207287124|
|  4|kmYmhlmBcx| 39|        E| 5802.764012410424|
+---+----------+---+---------+------------------+
only showing top 5 rows

Nombre de partitions par défaut: 2
+---------+-----+
|Categorie|count|
+---------+-----+
|        E|19899|
|        B|20065|
|        D|20135|
|        C|19893|
|        A|20008|
+---------+-----+

Temps d'exécution avec 2 partitions: 1.15 secondes
+---------+-----+
|Categorie|count|
+---------+-----+
|        E|19899|
|        B|20065|
|        D|20135|
|        C|19893|
|        A|20008|
+---------+-----+

Temps d'exécution avec 8 partitions: 1.97 secondes
+---------+-----+
|Categorie|count|


## ***Exercice 9 : Utilisation de Spark SQL***

In [None]:
from pyspark.sql import SparkSession

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 9: Spark SQL").getOrCreate()

# Création de DataFrames pour l'exemple
produits_data = [
    (1, "iPhone", "Electronique", 999.99),
    (2, "Galaxy S22", "Electronique", 899.99),
    (3, "MacBook Pro", "Electronique", 1999.99),
    (4, "T-shirt", "Vetements", 19.99),
    (5, "Jeans", "Vetements", 59.99),
    (6, "Chaussures", "Chaussures", 89.99),
    (7, "Ballon de football", "Sport", 29.99),
    (8, "Raquette de tennis", "Sport", 149.99)
]

ventes_data = [
    (101, 1, 2, "2023-01-15"),
    (102, 3, 1, "2023-01-16"),
    (103, 5, 3, "2023-01-17"),
    (104, 2, 1, "2023-01-18"),
    (105, 4, 5, "2023-01-19"),
    (106, 6, 2, "2023-01-20"),
    (107, 7, 1, "2023-01-21"),
    (108, 8, 1, "2023-01-22"),
    (109, 1, 1, "2023-01-23")
]

# Création des DataFrames
df_produits = spark.createDataFrame(produits_data, ["ID", "Nom", "Categorie", "Prix"])
df_ventes = spark.createDataFrame(ventes_data, ["ID_Vente", "ID_Produit", "Quantite", "Date"])

# Affichage des DataFrames
print("DataFrame des produits:")
df_produits.show()

print("DataFrame des ventes:")
df_ventes.show()

# Enregistrement des DataFrames comme tables temporaires
df_produits.createOrReplaceTempView("produits")
df_ventes.createOrReplaceTempView("ventes")

# Requête 1: Liste des produits avec leur prix
print("Requête 1: Liste des produits avec leur prix:")
spark.sql("""
SELECT ID, Nom, Prix
FROM produits
ORDER BY Prix DESC
""").show()

# Requête 2: Total des ventes par catégorie de produit
print("Requête 2: Total des ventes par catégorie de produit:")
spark.sql("""
SELECT p.Categorie, SUM(p.Prix * v.Quantite) as Total_Ventes
FROM produits p
JOIN ventes v ON p.ID = v.ID_Produit
GROUP BY p.Categorie
ORDER BY Total_Ventes DESC
""").show()

# Requête 3: Produits les plus vendus
print("Requête 3: Produits les plus vendus:")
spark.sql("""
SELECT p.Nom, SUM(v.Quantite) as Total_Quantite
FROM produits p
JOIN ventes v ON p.ID = v.ID_Produit
GROUP BY p.Nom
ORDER BY Total_Quantite DESC
""").show()

# Requête 4: Ventes quotidiennes
print("Requête 4: Ventes quotidiennes:")
spark.sql("""
SELECT v.Date, COUNT(*) as Nombre_Ventes, SUM(p.Prix * v.Quantite) as Total_Ventes
FROM ventes v
JOIN produits p ON v.ID_Produit = p.ID
GROUP BY v.Date
ORDER BY v.Date
""").show()

# Arrêt de la session Spark
spark.stop()

DataFrame des produits:
+---+------------------+------------+-------+
| ID|               Nom|   Categorie|   Prix|
+---+------------------+------------+-------+
|  1|            iPhone|Electronique| 999.99|
|  2|        Galaxy S22|Electronique| 899.99|
|  3|       MacBook Pro|Electronique|1999.99|
|  4|           T-shirt|   Vetements|  19.99|
|  5|             Jeans|   Vetements|  59.99|
|  6|        Chaussures|  Chaussures|  89.99|
|  7|Ballon de football|       Sport|  29.99|
|  8|Raquette de tennis|       Sport| 149.99|
+---+------------------+------------+-------+

DataFrame des ventes:
+--------+----------+--------+----------+
|ID_Vente|ID_Produit|Quantite|      Date|
+--------+----------+--------+----------+
|     101|         1|       2|2023-01-15|
|     102|         3|       1|2023-01-16|
|     103|         5|       3|2023-01-17|
|     104|         2|       1|2023-01-18|
|     105|         4|       5|2023-01-19|
|     106|         6|       2|2023-01-20|
|     107|         7|  

## ***Exercice 10 : Implémentation d'une Fonction de Fenêtre***

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number, col

# Initialisation de la session Spark
spark = SparkSession.builder.appName("Exercice 10: Fonctions de Fenêtre").getOrCreate()

# Création d'un DataFrame d'employés
employes_data = [
    ("Alice", "RH", 55000),
    ("Bob", "Informatique", 65000),
    ("Charlie", "Ventes", 48000),
    ("David", "Informatique", 70000),
    ("Eve", "RH", 52000),
    ("Frank", "Ventes", 49000),
    ("Grace", "Informatique", 67000),
    ("Hannah", "Ventes", 55000),
    ("Ian", "RH", 60000),
    ("Julia", "Informatique", 72000)
]
df_employes = spark.createDataFrame(employes_data, ["Nom", "Departement", "Salaire"])

# Affichage du DataFrame original
print("DataFrame des employés:")
df_employes.show()

# Définition d'une fenêtre partitionnée par département et ordonnée par salaire décroissant
window_spec = Window.partitionBy("Departement").orderBy(col("Salaire").desc())

# Calcul du rang (rank) des employés par salaire au sein de chaque département
df_avec_rank = df_employes.withColumn("Rang", rank().over(window_spec))

# Affichage du résultat avec le rang
print("Classement des employés par salaire dans chaque département (rank):")
df_avec_rank.show()

# Calcul du rang dense (dense_rank) des employés par salaire au sein de chaque département
df_avec_dense_rank = df_employes.withColumn("Dense_Rang", dense_rank().over(window_spec))

# Affichage du résultat avec le rang dense
print("Classement des employés par salaire dans chaque département (dense_rank):")
df_avec_dense_rank.show()

# Calcul du numéro de ligne (row_number) des employés par salaire au sein de chaque département
df_avec_row_number = df_employes.withColumn("Numero_Ligne", row_number().over(window_spec))

# Affichage du résultat avec le numéro de ligne
print("Classement des employés par salaire dans chaque département (row_number):")
df_avec_row_number.show()

# Combinaison des trois fonctions de fenêtre dans un seul DataFrame
df_combine = df_employes \
    .withColumn("Rang", rank().over(window_spec)) \
    .withColumn("Dense_Rang", dense_rank().over(window_spec)) \
    .withColumn("Numero_Ligne", row_number().over(window_spec))

# Affichage du résultat combiné
print("Tableau combiné avec les trois fonctions de fenêtre:")
df_combine.show()

# Arrêt de la session Spark
spark.stop()

DataFrame des employés:
+-------+------------+-------+
|    Nom| Departement|Salaire|
+-------+------------+-------+
|  Alice|          RH|  55000|
|    Bob|Informatique|  65000|
|Charlie|      Ventes|  48000|
|  David|Informatique|  70000|
|    Eve|          RH|  52000|
|  Frank|      Ventes|  49000|
|  Grace|Informatique|  67000|
| Hannah|      Ventes|  55000|
|    Ian|          RH|  60000|
|  Julia|Informatique|  72000|
+-------+------------+-------+

Classement des employés par salaire dans chaque département (rank):
+-------+------------+-------+----+
|    Nom| Departement|Salaire|Rang|
+-------+------------+-------+----+
|  Julia|Informatique|  72000|   1|
|  David|Informatique|  70000|   2|
|  Grace|Informatique|  67000|   3|
|    Bob|Informatique|  65000|   4|
|    Ian|          RH|  60000|   1|
|  Alice|          RH|  55000|   2|
|    Eve|          RH|  52000|   3|
| Hannah|      Ventes|  55000|   1|
|  Frank|      Ventes|  49000|   2|
|Charlie|      Ventes|  48000|   3|
+----

## ***Partie 2***

## ***Exercice 1 : Mise en route avec PySpark***

In [None]:
import random
import string

# Fonction pour générer des données aléatoires
def generate_random_data(n):
    data = []
    for _ in range(n):
        # Générer une ligne de texte aléatoire
        nom = ''.join(random.choices(string.ascii_letters, k=5))  # 5 caractères aléatoires
        age = random.randint(18, 65)
        ville = random.choice(["Paris", "Londres", "New York", "Berlin", "Madrid"])
        ligne = f"{nom},{age},{ville}\n"
        data.append(ligne)
    return data

# Nombre de lignes à générer
n = 100  # Générer 100 lignes

# Générer les données
data = generate_random_data(n)

# Spécifier le chemin du fichier où les données seront enregistrées
chemin_fichier = "fichier_exemple.txt"

# Écrire les données dans un fichier texte
with open(chemin_fichier, 'w') as f:
    f.writelines(data)

print(f"Le fichier '{chemin_fichier}' a été généré avec {n} lignes.")


Le fichier 'fichier_exemple.txt' a été généré avec 100 lignes.


In [None]:
from pyspark import SparkContext, SparkConf

# Configuration de Spark
conf = SparkConf().setAppName("Exercice Mise en Route").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Lecture du fichier texte
chemin_fichier = "fichier_exemple.txt"
rddStr = sc.textFile(chemin_fichier)

# Afficher le contenu
print("Contenu du fichier (10 premières lignes):")
for ligne in rddStr.take(10):
    print(ligne)

# Lecture du fichier texte
chemin_fichieR = "worldcitiespop.txt"
rddStr = sc.textFile(chemin_fichieR)

print("\nPremière 10 lignes de worldcitiespop.txt:")
for ligne in rddStr.take(10):
    print(ligne)

# Arrêt du contexte Spark
sc.stop()

Contenu du fichier (10 premières lignes):
MtJtY,32,Berlin
tnhGR,33,Londres
iOnUF,63,New York
VmKmT,56,Berlin
psiWY,54,Londres
hkyMJ,22,Berlin
dKxXQ,53,Berlin
zOJKw,28,Paris
ZNiVw,35,Berlin
ESNpK,23,Paris

Première 10 lignes de worldcitiespop.txt:
Country,City,AccentCity,Region,Population,Latitude,Longitude
ad,aixas,Aix�s,06,,42.4833333,1.4666667
ad,aixirivali,Aixirivali,06,,42.4666667,1.5
ad,aixirivall,Aixirivall,06,,42.4666667,1.5
ad,aixirvall,Aixirvall,06,,42.4666667,1.5
ad,aixovall,Aixovall,06,,42.4666667,1.4833333
ad,andorra,Andorra,07,,42.5,1.5166667
ad,andorra la vella,Andorra la Vella,07,20430,42.5,1.5166667
ad,andorra-vieille,Andorra-Vieille,07,,42.5,1.5166667
ad,andorre,Andorre,07,,42.5,1.5166667


## ***Exercice 2 : Nettoyage du fichier worldcitiespop.***

In [None]:
from pyspark import SparkContext
from math import *

# Initialisation du contexte Spark
sc = SparkContext("local", "Nettoyage worldcitiespop")

# Chemin vers le fichier - à remplacer par votre chemin
fichier_path = "worldcitiespop.txt"

# Lecture du fichier
rdd = sc.textFile(fichier_path)

# Afficher la première ligne pour comprendre la structure
premiere_ligne = rdd.first()
print("Première ligne du fichier:")
print(premiere_ligne)

# En examinant la première ligne, on s'aperçoit que les données sont séparées par des virgules
# On transforme chaque ligne en liste d'éléments
rdd_split = rdd.map(lambda line: line.split(','))

# Conservation uniquement des lignes avec une population non vide (colonne 4)
# La première ligne est souvent l'en-tête, donc on vérifie également que la colonne 4 est numérique
def est_valide(elements):
    try:
        # Vérifie si la ligne a au moins 5 colonnes et si la population est un nombre
        return len(elements) >= 5 and elements[4] and elements[4].isdigit()
    except:
        return False

rdd_valide = rdd_split.filter(est_valide)

# Comptage des lignes valides
nb_lignes_valides = rdd_valide.count()
print(f"Nombre de lignes valides avec population: {nb_lignes_valides}")

# Affichage des 5 premières lignes valides
print("5 premières lignes valides:")
for ligne in rdd_valide.take(5):
    print(ligne)

# Arrêt du contexte Spark
sc.stop()

Première ligne du fichier:
Country,City,AccentCity,Region,Population,Latitude,Longitude
Nombre de lignes valides avec population: 47980
5 premières lignes valides:
['ad', 'andorra la vella', 'Andorra la Vella', '07', '20430', '42.5', '1.5166667']
['ad', 'canillo', 'Canillo', '02', '3292', '42.5666667', '1.6']
['ad', 'encamp', 'Encamp', '03', '11224', '42.5333333', '1.5833333']
['ad', 'la massana', 'La Massana', '04', '7211', '42.55', '1.5166667']
['ad', 'les escaldes', 'Les Escaldes', '08', '15854', '42.5', '1.5333333']


## ***Exercice 3 : Statistiques***

In [None]:
from pyspark import SparkContext
from math import *
import numpy as np

# Initialisation du contexte Spark
sc = SparkContext("local", "Statistiques worldcitiespop")

# Chemin vers le fichier
fichier_path = "worldcitiespop.txt"

# Lecture du fichier
rdd = sc.textFile(fichier_path)

# Transformation en liste et filtrage des lignes valides avec population
def est_valide(elements):
    try:
        return len(elements) >= 5 and elements[4] and elements[4].isdigit()
    except:
        return False

rdd_split = rdd.map(lambda line: line.split(','))
rdd_valide = rdd_split.filter(est_valide)

# Extraction des populations (conversion en float)
rdd_populations = rdd_valide.map(lambda x: float(x[4]))

# Calcul des statistiques
min_pop = rdd_populations.min()
max_pop = rdd_populations.max()
sum_pop = rdd_populations.sum()
count_pop = rdd_populations.count()
avg_pop = sum_pop / count_pop

# Calcul de l'écart-type
# Pour calculer l'écart-type, nous avons besoin de la moyenne des carrés des différences
# On calcule d'abord la somme des carrés des différences
sum_squared_diff = rdd_populations.map(lambda x: (x - avg_pop) ** 2).sum()
# Puis on divise par le nombre d'éléments pour obtenir la variance
variance = sum_squared_diff / count_pop
# L'écart-type est la racine carrée de la variance
stdev_pop = sqrt(variance)

print(f"Statistiques sur les populations des villes:")
print(f"Nombre de villes: {count_pop}")
print(f"Population minimum: {min_pop}")
print(f"Population maximum: {max_pop}")
print(f"Somme des populations: {sum_pop}")
print(f"Population moyenne: {avg_pop}")
print(f"Écart-type des populations: {stdev_pop}")

# Format attendu dans l'exercice
print(f"(count: {count_pop}, mean: {avg_pop}, stdev: {stdev_pop}, max: {max_pop}, min: {min_pop})")

# Arrêt du contexte Spark
sc.stop()

Statistiques sur les populations des villes:
Nombre de villes: 47980
Population minimum: 7.0
Population maximum: 31480498.0
Somme des populations: 2289584999.0
Population moyenne: 47719.57063359733
Écart-type des populations: 302885.5592040371
(count: 47980, mean: 47719.57063359733, stdev: 302885.5592040371, max: 31480498.0, min: 7.0)


## ***Exercice 4 : Histogrammes***

In [None]:
from pyspark import SparkContext
from math import *

# Initialisation du contexte Spark
sc = SparkContext("local", "Histogramme worldcitiespop")

# Chemin vers le fichier
fichier_path = "worldcitiespop.txt"

# Lecture du fichier
rdd = sc.textFile(fichier_path)

# Transformation en liste et filtrage des lignes valides avec population
def est_valide(elements):
    try:
        return len(elements) >= 5 and elements[4] and elements[4].isdigit()
    except:
        return False

rdd_split = rdd.map(lambda line: line.split(','))
rdd_valide = rdd_split.filter(est_valide)

# Extraction des populations (conversion en float)
rdd_populations = rdd_valide.map(lambda x: float(x[4]))

# Fonction pour déterminer la classe logarithmique
def classe_log(population):
    if population == 0:
        return 0
    return int(log10(population))

# Calcul de l'histogramme (classe_log, count)
rdd_histo = rdd_populations.map(lambda pop: (classe_log(pop), 1)).reduceByKey(lambda a, b: a + b)

# Tri de l'histogramme par classe
histo_sorted = rdd_histo.sortByKey().collect()

print("Histogramme des populations (échelle logarithmique):")
for classe, count in histo_sorted:
    print(f"Classe {classe} [10^{classe}..10^{classe+1}[: {count} villes")

# Format attendu dans l'exercice
print(histo_sorted)

# Arrêt du contexte Spark
sc.stop()

Histogramme des populations (échelle logarithmique):
Classe 0 [10^0..10^1[: 5 villes
Classe 1 [10^1..10^2[: 174 villes
Classe 2 [10^2..10^3[: 2187 villes
Classe 3 [10^3..10^4[: 20537 villes
Classe 4 [10^4..10^5[: 21550 villes
Classe 5 [10^5..10^6[: 3248 villes
Classe 6 [10^6..10^7[: 269 villes
Classe 7 [10^7..10^8[: 10 villes
[(0, 5), (1, 174), (2, 2187), (3, 20537), (4, 21550), (5, 3248), (6, 269), (7, 10)]


## ***Exercice 5 : TopK***

In [None]:
from pyspark import SparkContext
from math import *

# Initialisation du contexte Spark
sc = SparkContext("local", "TopK worldcitiespop")

# Chemin vers le fichier
fichier_path = "worldcitiespop.txt"

# Lecture du fichier
rdd = sc.textFile(fichier_path)

# Transformation en liste et filtrage des lignes valides avec population
def est_valide(elements):
    try:
        return len(elements) >= 5 and elements[4] and elements[4].isdigit()
    except:
        return False

rdd_split = rdd.map(lambda line: line.split(','))
rdd_valide = rdd_split.filter(est_valide)

# Extraction des villes avec leurs populations
# Format: (pays, ville asciiname, ville nom, région, population, latitude, longitude)
rdd_villes = rdd_valide.map(lambda x: (x[0], x[1], x[2], x[3], float(x[4]), float(x[5]) if len(x) > 5 and x[5] else 0, float(x[6]) if len(x) > 6 and x[6] else 0))

# Tri par population décroissante
rdd_top_villes = rdd_villes.sortBy(lambda x: -x[4])

# Récupération du top 10
top10 = rdd_top_villes.take(10)

print("Top 10 des villes les plus peuplées:")
for i, ville in enumerate(top10, 1):
    print(f"{i}. {ville[2]} ({ville[0]}): {int(ville[4])} habitants")

# Affichage au format demandé dans l'exercice
print("\nFormat demandé:")
for ville in top10:
    print(f"{ville[0]},{ville[1]},{ville[2]},{ville[3]},{int(ville[4])},{ville[5]},{ville[6]}")

# Arrêt du contexte Spark
sc.stop()

Top 10 des villes les plus peuplées:
1. Tokyo (jp): 31480498 habitants
2. Shanghai (cn): 14608512 habitants
3. Bombay (in): 12692717 habitants
4. Karachi (pk): 11627378 habitants
5. Delhi (in): 10928270 habitants
6. New Delhi (in): 10928270 habitants
7. Manila (ph): 10443877 habitants
8. Moscow (ru): 10381288 habitants
9. Seoul (kr): 10323448 habitants
10. S�o Paulo (br): 10021437 habitants

Format demandé:
jp,tokyo,Tokyo,40,31480498,35.685,139.751389
cn,shanghai,Shanghai,23,14608512,31.045556,121.399722
in,bombay,Bombay,16,12692717,18.975,72.825833
pk,karachi,Karachi,05,11627378,24.9056,67.0822
in,delhi,Delhi,07,10928270,28.666667,77.216667
in,new delhi,New Delhi,07,10928270,28.6,77.2
ph,manila,Manila,D9,10443877,14.6042,120.9822
ru,moscow,Moscow,48,10381288,55.752222,37.615556
kr,seoul,Seoul,11,10323448,37.5985,126.9783
br,sao paulo,S�o Paulo,27,10021437,-23.473293,-46.665803


## ***Exercice 6 : Re-cleaning***

In [None]:
from pyspark import SparkContext
from math import *

# Initialisation du contexte Spark
sc = SparkContext("local", "Re-cleaning worldcitiespop")

# Chemin vers le fichier
fichier_path = "worldcitiespop.txt"

# Lecture du fichier
rdd = sc.textFile(fichier_path)

# Transformation en liste et filtrage des lignes valides avec population
def est_valide(elements):
    try:
        return len(elements) >= 5 and elements[4] and elements[4].isdigit()
    except:
        return False

rdd_split = rdd.map(lambda line: line.split(','))
rdd_valide = rdd_split.filter(est_valide)

# Extraction des villes avec leurs populations
# Format: ((latitude, longitude), (pays, ville asciiname, ville nom, région, population))
# On arrondit les coordonnées à 2 décimales pour regrouper les villes proches
def extraire_ville(x):
    lat = round(float(x[5]), 2) if len(x) > 5 and x[5] else 0
    lon = round(float(x[6]), 2) if len(x) > 6 and x[6] else 0
    return ((lat, lon), (x[0], x[1], x[2], x[3], float(x[4]), float(x[5]) if len(x) > 5 and x[5] else 0, float(x[6]) if len(x) > 6 and x[6] else 0))

rdd_villes = rdd_valide.map(extraire_ville)

# Élimination des doublons en gardant les villes avec la plus grande population
rdd_villes_unique = rdd_villes.reduceByKey(lambda a, b: a if a[4] > b[4] else b)

# Convertir à nouveau en format plat pour les stats
rdd_villes_clean = rdd_villes_unique.map(lambda x: x[1])

# Extraction des populations pour les statistiques
rdd_populations = rdd_villes_clean.map(lambda x: x[4])

# Calcul des statistiques
min_pop = rdd_populations.min()
max_pop = rdd_populations.max()
sum_pop = rdd_populations.sum()
count_pop = rdd_populations.count()
avg_pop = sum_pop / count_pop

# Calcul de l'écart-type
sum_squared_diff = rdd_populations.map(lambda x: (x - avg_pop) ** 2).sum()
variance = sum_squared_diff / count_pop
stdev_pop = sqrt(variance)

print(f"Statistiques après élimination des doublons:")
print(f"(count: {count_pop}, mean: {avg_pop}, stdev: {stdev_pop}, max: {max_pop}, min: {min_pop})")

# Calcul de l'histogramme
def classe_log(population):
    if population == 0:
        return 0
    return int(log10(population))

rdd_histo = rdd_populations.map(lambda pop: (classe_log(pop), 1)).reduceByKey(lambda a, b: a + b)
histo_sorted = rdd_histo.sortByKey().collect()
print(histo_sorted)

# Top 20 des villes les plus peuplées après élimination des doublons
top20 = rdd_villes_clean.sortBy(lambda x: -x[4]).take(20)

# Affichage au format demandé
for ville in top20:
    print(f"{ville[0]},{ville[1]},{ville[2]},{ville[3]},{int(ville[4])},{ville[5]},{ville[6]}")

# Arrêt du contexte Spark
sc.stop()

Statistiques après élimination des doublons:
(count: 47792, mean: 47763.707356879815, stdev: 303310.60189777106, max: 31480498.0, min: 7.0)
[(0, 5), (1, 167), (2, 2144), (3, 20446), (4, 21513), (5, 3240), (6, 267), (7, 10)]
jp,tokyo,Tokyo,40,31480498,35.685,139.751389
cn,shanghai,Shanghai,23,14608512,31.045556,121.399722
in,bombay,Bombay,16,12692717,18.975,72.825833
pk,karachi,Karachi,05,11627378,24.9056,67.0822
in,delhi,Delhi,07,10928270,28.666667,77.216667
in,new delhi,New Delhi,07,10928270,28.6,77.2
ph,manila,Manila,D9,10443877,14.6042,120.9822
ru,moscow,Moscow,48,10381288,55.752222,37.615556
kr,seoul,Seoul,11,10323448,37.5985,126.9783
br,sao paulo,S�o Paulo,27,10021437,-23.473293,-46.665803
tr,istanbul,Istanbul,34,9797536,41.018611,28.964722
ng,lagos,Lagos,05,8789133,6.453056,3.395833
mx,mexico,Mexico,09,8720916,19.434167,-99.138611
id,jakarta,Jakarta,04,8540306,-6.174444,106.829444
us,new york,New York,NY,8107916,40.7141667,-74.0063889
cd,kinshasa,Kinshasa,06,7787832,-4.3,15.3
eg,