<a href="https://colab.research.google.com/github/asmamest/tp_bigdata/blob/main/tp_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

# Création de la session Spark
spark = SparkSession.builder \
    .appName("TP_Spark") \
    .getOrCreate()

# sc est le SparkContext (important pour les RDD)
sc = spark.sparkContext

In [4]:
# Installer Spark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark

# Configurer les variables d'environnement
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connected to cloud.r-project.org (108.157.173.97)] [C                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
                                                                               Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [2 InRelease 15.6 kB/128 kB 12%] [3 InRelease 28.7 kB/129 kB 22%] [Waiting f                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
0% [2 InRelease 15.6 kB/128 kB 12%] [3 InRelease 43.1 kB/129 kB 33%] [Connected                                                                               Get:5 https://cli.github.com/packages stable InRelease [3,917 B]
0% [2 InRelease 15.6 kB/128 kB 12%] [3 InRelease 43.1 kB/

In [None]:
# Télécharger le fichier Shakespeare
!wget -q https://raw.githubusercontent.com/apache/spark/master/data/mllib/shakespeare.txt

# Vérifier que le fichier est bien téléchargé
!ls -la shakespeare.txt

In [None]:
from pyspark.sql import SparkSession

# Création de la session Spark
spark = SparkSession.builder \
    .appName("TP_Spark_Colab") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

# Step 3 : Lecture du fichier (CORRECTION - utiliser le bon nom de fichier)
shakespeare_rdd = sc.textFile("shakespeare.txt")

# Vérification du chargement
print("Nombre de lignes:", shakespeare_rdd.count())
print("Type de l'opération textFile:", type(shakespeare_rdd))
print("Premières lignes:")
shakespeare_rdd.take(5)

In [None]:
# Step 4 : Division en mots
words_rdd = shakespeare_rdd.flatMap(lambda line: line.split())
print("Nombre total de mots:", words_rdd.count())

# Step 5 : Transformation en paires (clé, valeur)
word_pairs_rdd = words_rdd.map(lambda word: (word.lower().strip('.,!?;:"()'), 1))
print("Exemples de paires mot-valeur:")
word_pairs_rdd.take(10)

# Step 6 : Réduction par clé
word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)
print("10 mots les plus fréquents:")
sorted_counts = word_counts_rdd.sortBy(lambda x: x[1], ascending=False)
sorted_counts.take(10)

# Step 7 : Sauvegarde du résultat
!rm -rf word_counts_output  # Nettoyer si existe déjà

word_counts_rdd.coalesce(1).saveAsTextFile("word_counts_output")

# Vérification du résultat
!ls word_counts_output/
result_rdd = sc.textFile("word_counts_output/part-00000")
print("Résultat sauvegardé:")
result_rdd.take(20)

In [None]:
# Télécharger le dataset daily_weather
!wget -q https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily_weather.csv

# Vérification
!ls -la daily_weather.csv

In [None]:
# Step 1 : Chargement des données
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Chargement du fichier CSV
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("daily_weather.csv")

print("Colonnes:", df.columns)
print("Nombre de lignes:", df.count())

# Step 2 : Visualisation des données
print("\nSchéma:")
df.printSchema()

print("\n5 premiers éléments:")
df.show(5)

# Step 3 : Statistiques récapitulatives
print("Statistiques récapitulatives:")
df.describe().show()

# Step 4 : Manipulations avancées
# 1. Création de la colonne ratio (avec gestion des divisions par zéro)
df1 = df.withColumn("ratio",
                   when(col("rain_accumulation_9am") != 0,
                        col("rain_duration_9am") / col("rain_accumulation_9am"))
                   .otherwise(None))

# 2. Affichage de la nouvelle colonne
print("Colonne ratio:")
df1.select("rain_duration_9am", "rain_accumulation_9am", "ratio").show(10)

# 3. Maximum de rain_duration_9am
from pyspark.sql.functions import max, min, mean, count

max_duration = df.agg(max("rain_duration_9am")).first()[0]
print(f"Maximum rain_duration_9am: {max_duration}")

# 4. Moyenne de rain_accumulation_9am
mean_accumulation = df.agg(mean("rain_accumulation_9am")).first()[0]
print(f"Moyenne rain_accumulation_9am: {mean_accumulation}")

# 5. Max et min de rain_duration_9am
max_min = df.agg(max("rain_duration_9am"), min("rain_duration_9am")).first()
print(f"Max rain_duration_9am: {max_min[0]}, Min: {max_min[1]}")

# 6. Compter les lignes où air_temp_9am > 70
count_high_temp = df.filter(col("air_temp_9am") > 70).count()
print(f"Nombre de fois où air_temp_9am > 70: {count_high_temp}")

In [None]:
# Step 5 : Suppression des valeurs manquantes
print(f"Lignes avant nettoyage: {df.count()}")

# Supprimer les lignes avec des valeurs manquantes dans air_pressure_9am
df_clean = df.filter(col("air_pressure_9am").isNotNull())
print(f"Lignes après nettoyage air_pressure_9am: {df_clean.count()}")

# Step 6 : Calcul de corrélation
correlation = df.corr("rain_accumulation_9am", "rain_duration_9am")
print(f"Corrélation entre rain_accumulation_9am et rain_duration_9am: {correlation:.4f}")

# Interprétation :
# Une corrélation proche de 1 ou -1 indique une forte relation linéaire
# Une corrélation proche de 0 indique une faible relation linéaire

In [None]:
# Faire une copie du DataFrame
df_imputed = df.alias("df_imputed")

# Liste des colonnes numériques pour l'imputation
numeric_columns = [col for col, dtype in df.dtypes if dtype in ['int', 'double', 'float']]

print("Colonnes numériques pour imputation:", numeric_columns)

# Imputer les valeurs manquantes avec la moyenne
for column in numeric_columns:
    # Calculer la moyenne
    mean_value = df.agg(avg(col(column))).first()[0]

    # Imputer les valeurs manquantes
    df_imputed = df_imputed.fillna(mean_value, subset=[column])
    print(f"Colonne {column}: valeur d'imputation = {mean_value:.2f}")

# Comparaison des statistiques
print("=== AVANT IMPUTATION ===")
df.describe().show()

print("=== APRÈS IMPUTATION ===")
df_imputed.describe().show()

# Observation :
# Les counts devraient être égaux après imputation
# Les moyennes peuvent légèrement changer

In [None]:
# Arrêter la session Spark
spark.stop()

# Vérifier les fichiers créés
print("Fichiers créés:")
!ls -la