In [None]:
# Importer les bibliothèques nécessaires
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F

# Initialiser la session Spark
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .getOrCreate()





In [None]:
#Pour CO2

In [None]:
# Chemin du fichier CO2.csv (à adapter selon l'emplacement du fichier)
path_co2 = 'CO2.csv'  # Assurez-vous que le fichier est dans le même répertoire ou indiquez le chemin absolu

# Charger le fichier CSV
CO2 = spark.read.csv(path_co2, header=True, inferSchema=True)

# Afficher le schéma et un aperçu des données
print("Schéma pour CO2:")
CO2.printSchema()
CO2.show(5)

In [None]:
# Nettoyage des données
# Liste des pays à inclure
list_of_countries = [
    "United States", "China", "India", "Russia", "Germany", "France",
    "United Kingdom", "Japan", "Canada", "Australia"
]

# Colonnes à sélectionner
columns_to_select = [
    "year", "country", "co2", "co2_per_capita", "population",
    "gdp", "cement_co2", "coal_co2", "oil_co2", "share_global_co2",
]

# Filtrer les données pour l'année >= 1880 et les pays dans la liste
CO2_cleaned = CO2.filter((F.col("year") >= 1880) & (F.col("country").isin(list_of_countries)))

# Sélectionner les colonnes pertinentes
CO2_cleaned = CO2_cleaned.select(*columns_to_select)

# Remplacer les valeurs nulles par 0
CO2_cleaned = CO2_cleaned.fillna(0)

# Convertir les colonnes numériques en DoubleType si nécessaire
columns_to_cast = ["co2", "co2_per_capita", "population", "gdp", "cement_co2", "coal_co2", "oil_co2"]
for col_name in columns_to_cast:
    CO2_cleaned = CO2_cleaned.withColumn(col_name, F.col(col_name).cast(DoubleType()))

# Vérification des valeurs nulles dans les colonnes après remplacement
CO2_cleaned.select([F.count(F.col(c)).alias(c) for c in columns_to_select]).show()

# Vérification et suppression des doublons
CO2_cleaned = CO2_cleaned.dropDuplicates()

# Renommer la colonne "year" en "Year"
CO2_cleaned = CO2_cleaned.withColumnRenamed("year", "Year")

# Trier les données par année, puis par pays
CO2_cleaned = CO2_cleaned.orderBy(F.col("Year"), F.col("country"))

# Afficher les premières lignes pour vérifier le nettoyage
CO2_cleaned.show(5, truncate=False)

In [None]:
#Temperature Global

In [None]:
# Charger le fichier CSV
path_tm = 'TM.csv'  # Remplacez par le chemin correct de votre fichier TM.csv
TM = spark.read.csv(path_tm, header=True, inferSchema=True)

# Afficher le schéma initial
print("Schéma initial pour TM:")
TM.printSchema()



In [None]:
# Convertir les colonnes des mois en DoubleType
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
for month in months:
    TM = TM.withColumn(month, col(month).cast(DoubleType()))

# Convertir la colonne 'DJF' en float et remplacer '***' par 0
TM = TM.withColumn("DJF", when(col("DJF") == "***", 0).otherwise(col("DJF").cast("float")))

# Supprimer la colonne 'D-N'
TM = TM.drop("D-N")

# Renommer les colonnes pour les saisons
TM = TM.withColumnRenamed("DJF", "mean_winter") \
         .withColumnRenamed("MAM", "mean_spring") \
         .withColumnRenamed("JJA", "mean_summer") \
         .withColumnRenamed("SON", "mean_autumn") \
         .withColumnRenamed("J-D", "mean_year")

# Supprimer les lignes avec des valeurs nulles
TM_cleaned = TM.dropna()

# Vérification et suppression des doublons
TM_cleaned = TM_cleaned.dropDuplicates()

# Afficher les premières lignes pour vérifier
print("Données nettoyées:")
TM_cleaned.show(5, truncate=False)



In [None]:
#Pour l'Hémisphère Nord

In [None]:
path_thn = 'THN.csv'  # Remplacez par le chemin correct de votre fichier TM.csv
THN = spark.read.csv(path_thn, header=True, inferSchema=True)

# Afficher le schéma initial
print("Schéma initial pour THN:")
THN.printSchema()

In [None]:
# Convertir les colonnes des mois en DoubleType
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
for month in months:
    THN = THN.withColumn(month, col(month).cast(DoubleType()))

# Convertir la colonne 'DJF' en float et remplacer '***' par 0
THN = THN.withColumn("DJF", when(col("DJF") == "***", 0).otherwise(col("DJF").cast("float")))

# Supprimer la colonne 'D-N'
THN = THN.drop("D-N")

# Renommer les colonnes pour les saisons
THN = THN.withColumnRenamed("DJF", "mean_winter") \
         .withColumnRenamed("MAM", "mean_spring") \
         .withColumnRenamed("JJA", "mean_summer") \
         .withColumnRenamed("SON", "mean_autumn") \
         .withColumnRenamed("J-D", "mean_year")

# Supprimer les lignes avec des valeurs nulles
THN_cleaned = THN.dropna()

# Vérification et suppression des doublons
THN_cleaned = THN_cleaned.dropDuplicates()

# Afficher les premières lignes pour vérifier
print("Données nettoyées:")
THN_cleaned.show(5, truncate=False)



In [None]:
#Pour l'hémisphère Sud

In [None]:
# Charger le fichier CSV
path_ths = 'THS.csv'  # Remplacez par le chemin correct de votre fichier TM.csv
THS = spark.read.csv(path_ths, header=True, inferSchema=True)

# Afficher le schéma initial
print("Schéma initial pour THS:")
THS.printSchema()



In [None]:
# Convertir les colonnes des mois en DoubleType
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
for month in months:
    THS = THS.withColumn(month, col(month).cast(DoubleType()))

# Convertir la colonne 'DJF' en float et remplacer '***' par 0
THS = THS.withColumn("DJF", when(col("DJF") == "***", 0).otherwise(col("DJF").cast("float")))

# Supprimer la colonne 'D-N'
THS = THS.drop("D-N")

# Renommer les colonnes pour les saisons
THS = THS.withColumnRenamed("DJF", "mean_winter") \
         .withColumnRenamed("MAM", "mean_spring") \
         .withColumnRenamed("JJA", "mean_summer") \
         .withColumnRenamed("SON", "mean_autumn") \
         .withColumnRenamed("J-D", "mean_year")

# Supprimer les lignes avec des valeurs nulles
THS_cleaned = THS.dropna()

# Vérification et suppression des doublons
THScleaned = THS_cleaned.dropDuplicates()

# Afficher les premières lignes pour vérifier
print("Données nettoyées:")
THS_cleaned.show(5, truncate=False)



In [None]:


# Stopper la session Spark après l'exécution (optionnel)
spark.stop()

In [None]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

In [None]:
# Chargement des datasets.
datasets = {
    "CO2": pd.read_csv(r"C:\projet\ficher_traiter_spark\CO2_cleaned.csv\CO2.csv"),  
    "THN": pd.read_csv(r"C:\projet\ficher_traiter_spark\THN_cleaned.csv\THN.csv"),  
    "THS": pd.read_csv(r"C:\projet\ficher_traiter_spark\THS_cleaned.csv\THS.csv"),
    "TM": pd.read_csv(r"C:\projet\ficher_traiter_spark\TM_cleaned.csv\TM.csv"), 
}
# Affichage des premières lignes de chaque dataset.
print("Dataset CO2:")
print(datasets["CO2"].head(5))
print("\nDataset THN:")
print(datasets["THN"].head(5))
print("\nDataset THS:")
print(datasets["THS"].head(5))
print("\nDataset TM:")
print(datasets["TM"].head(5))

In [None]:
# Fonction pour générer des graphiques interactifs afin de pouvoir augmenter la précision de la visualisation.

def generate_graph(dataset_name, variable, graph_type):
    df = datasets[dataset_name]  
    title = f"{graph_type} - {variable} ({dataset_name})"
    
    if graph_type == "Line":
        fig = px.line(df, x="Year", y=variable, title=title)
        fig.update_layout(xaxis_title='Années', yaxis_title=variable)
        fig.update_traces(line_color='#456987')
        fig.update_layout(title={
            'y': 0.9,
            'x': 0.5,
            'xanchor': 'center',
            'yanchor': 'top'
        })
    elif graph_type == "Bar":
        fig = px.bar(df, x="Year", y=variable, color=variable, title=title)
        fig.update_layout(
            barmode="stack",
            xaxis_title="Années",
            yaxis_title=variable
        )
        fig.layout.coloraxis.colorbar.title = variable
        fig.update_layout(title={
            'y': 0.9,
            'x': 0.4,
            'xanchor': 'center',
            'yanchor': 'top'
        })
    elif graph_type == "Box":
        fig = px.box(df, y=variable, title=title)
        fig.update_layout(
            yaxis_title=f"{variable} (°C)" if 'Temp' in dataset_name else variable,
            title={
                'y': 0.9,
                'x': 0.5,
                'xanchor': 'center',
                'yanchor': 'top'
            }
        )
    else:
        raise ValueError("Invalid graph type. Use 'Line' or 'Bar',or Box.")

    fig.show()

# Interface pour sélectionner les paramètres.
dataset_name = input("Choisissez un dataset (CO2, THN, THS, THS): ")
variable = input(f"Choisissez une variable à visualiser dans {dataset_name} : ")
graph_type = input("Choisissez un type de graphique (Line, Bar ou Box) : ")

# Génération du graphique.
generate_graph(dataset_name, variable, graph_type)

In [None]:
#Compte tenu de certains résultats obtenus lors de la visualisation (présence d'outliers), une analyse et un traitement plus approfondis sont nécessaires 
#pour approfondir l'analyse de nos données tout en évitant les biais."