In [49]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, max ,avg, min
from pyspark.sql.functions import count, when

In [50]:
#exo1
spark = SparkSession.builder.appName("Analyse des ventes").getOrCreate()
#1.	Chargez le fichier CSV dans PySpark.
data_path = "ventes.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

#2.	Calculez le chiffre d’affaires (Quantité × Prix_unitaire) pour chaque ligne.
df = df.withColumn("CA", col("Quantité") * col("Prix_unitaire"))

#3.	Calculez le chiffre d’affaires total.
Total_CA = df.agg(sum("CA").alias("Total_CA")).collect()[0]["Total_CA"]

#4.	Trouvez le produit le plus vendu
produit_plus_vendu = (
    df.groupBy("Produit")
    .agg(sum("Quantité").alias("Total_Quantité"))
    .orderBy(col("Total_Quantité").desc())
    .first()
)

spark.stop()

print(f"Chiffre d'affaires total : {Total_CA} ")
print(f"Produit le plus vendu : {produit_plus_vendu['Produit']} ({produit_plus_vendu['Total_Quantité']} unités)")

Chiffre d'affaires total : 2710 
Produit le plus vendu : Souris (10 unités)


In [51]:
#exo2
spark = SparkSession.builder.appName("Analyse des utilisateurs").getOrCreate()

#1.	Chargez le fichier JSON dans PySpark.
data_path = "utilisateurs.json"
df = spark.read.option("multiline", "true").json(data_path)

#2.	Calculez l’âge moyen des utilisateurs.
Age_moyen = df.agg(avg("age").alias("Age_moyen")).collect()[0]["Age_moyen"]

#3.	Comptez combien d’utilisateurs habitent dans chaque ville.
utilisateurs_par_ville = df.groupBy("ville").agg(count("id").alias("Nombre_utilisateurs"))

#4.	Trouvez le plus jeune utilisateur.
plus_jeune = df.orderBy(col("age").asc()).first()


print(f"age moyen des utilisateurs : {Age_moyen:.2f} ans")
print("Nombre d'utilisateurs par ville :")
for row in utilisateurs_par_ville.collect():
    print(f" - {row['ville']}: {row['Nombre_utilisateurs']} ")
print(f"Le plus jeune utilisateur est {plus_jeune['nom']} avec {plus_jeune['age']} ans")

spark.stop()




age moyen des utilisateurs : 30.40 ans
Nombre d'utilisateurs par ville :
 - Marseille: 1 
 - Paris: 2 
 - Lyon: 2 
Le plus jeune utilisateur est Emma avec 22 ans


In [52]:
#exo3
spark = SparkSession.builder.appName("Nettoyage de données").getOrCreate()

#1.	Chargez le fichier CSV dans PySpark.
data_path = "clients.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

#2.	Remplissez les valeurs manquantes pour l’âge avec la moyenne des âges.
moyenne_age = df.filter(df["Âge"].isNotNull()).agg(avg("Âge")).collect()[0][0]
df = df.withColumn("Âge", when(col("Âge").isNull(), moyenne_age).otherwise(col("Âge")))

#3.	Remplissez les valeurs manquantes pour la ville avec “Inconnue”.
df = df.withColumn("Ville", when(col("Ville").isNull(), "Inconnue").otherwise(col("Ville")))

#4.	Supprimez les lignes où le revenu est manquant.
df = df.filter(col("Revenu").isNotNull())

#5.	Affichez le DataFrame nettoyé.
df.show()

spark.stop()


+------+----+---------+------+
|   Nom| Âge|    Ville|Revenu|
+------+----+---------+------+
| Alice|25.0|    Paris| 50000|
|   Bob|30.5|     Lyon| 40000|
|Claire|35.0|Marseille| 35000|
| David|40.0| Inconnue| 45000|
+------+----+---------+------+



In [55]:
#exo4
spark = SparkSession.builder.appName("Analyse des transactions").getOrCreate()

#1.	Chargez le fichier CSV dans PySpark.
data_path = "produits.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

#2.	Identifiez les 3 produits les plus chers.
df_trie = df.orderBy(df["Prix"].desc()).limit(3)

#3.	Affichez le résultat sous forme de DataFrame.
df_trie.show()

spark.stop()

+----------+------------+----+
|   Produit|   Catégorie|Prix|
+----------+------------+----+
|Ordinateur|Électronique| 800|
|     Table|      Bureau| 150|
|Imprimante|Électronique| 120|
+----------+------------+----+



In [46]:
#exo5
spark = SparkSession.builder.appName("Analyse des transactions").getOrCreate()

#1.	Chargez le fichier CSV dans PySpark.
data_path = "transactions.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

#2.	Calculez les dépenses totales de chaque client.
df_depenses = df.groupBy("Client").agg(sum("Montant").alias("Dépenses_totales"))

#3.	Identifiez le client ayant dépensé le plus.
df_depenses = df_depenses.orderBy(col("Dépenses_totales").desc())

df_depenses.show()

spark.stop()

+------+----------------+
|Client|Dépenses_totales|
+------+----------------+
|  Emma|             350|
| Alice|             250|
| David|             250|
|   Bob|             200|
+------+----------------+



In [54]:
#exo5
spark = SparkSession.builder.appName("Agrégation de données par catégorie").getOrCreate()

#1.	Chargez le fichier CSV dans PySpark.
data_path = "produits.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

#2.	Calculez le prix moyen et le prix total par catégorie.
df = df.groupBy("Catégorie").agg(
    avg("Prix").alias("Prix_moyen"),
    sum("Prix").alias("Prix_total")
)
#3.	Affichez les résultats dans un DataFrame.
df.show()

spark.stop()

+------------+----------+----------+
|   Catégorie|Prix_moyen|Prix_total|
+------------+----------+----------+
|Électronique|    238.75|       955|
|      Bureau|     115.0|       230|
+------------+----------+----------+

