<a href="https://colab.research.google.com/github/Keizerbub/weather/blob/main/wrangling_weather.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Import framework

In [1]:
!pip install pyspark



In [2]:
import pandas as pd
import os
import gzip
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import pandas as pd

#ouverture des fichiers

In [15]:
class Wrangling:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("HandlingFile") \
            .getOrCreate()

    def open_file(self, file_path):
        try:
            self.df = self.spark.read.csv(file_path, header=True, inferSchema=True, sep=";")
            return self.df
        except Exception as e:
            print("Error occurred:", e)
            return None

    def close_spark(self):
        self.spark.stop()

    def select_columns(self, *columns):
        if not columns:
            columns = ['LAT', 'LON', 'AAAAMMJJHH', 'RR1', 'FF', 'TN50']

        try:
            self.df = self.df.select(*columns)
            return self.df
        except Exception as e:
            print("Error occurred:", e)
            return None

    def check_column_null(self, column_name):
        try:
            null_count = self.df.filter(self.df[column_name].isNull()).count()
            total_rows = self.df.count()
            null_percentage = (null_count / total_rows) * 100
            return null_percentage
        except Exception as e:
            print("Error occurred:", e)
            return None

    def check_all_columns_null(self):
        null_percentages = {}
        try:
            columns = self.df.columns
            for col in columns:
                null_percentage = self.check_column_null(col)
                null_percentages[col] = null_percentage

            return null_percentages
        except Exception as e:
            print("Error occurred:", e)
            return None

    def process_folder(self, folder_path):
        try:
            if os.path.isfile(folder_path):
                file_paths = [folder_path]
            elif os.path.isdir(folder_path):
                file_paths = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.csv')]
            else:
                print("Invalid folder path or file path provided.")
                return None

            result_dict = {}

            for file_path in file_paths:
                file_name = os.path.basename(file_path)
                self.open_file(file_path)
                self.select_columns()
                null_percentages = self.check_all_columns_null()
                null_percentages['nom'] = file_name  # Ajouter le nom de fichier dans le dictionnaire
                result_dict[file_name] = null_percentages

            # Convertir le dictionnaire en DataFrame
            df_result = pd.DataFrame.from_dict(result_dict, orient='index')

            return df_result
        except Exception as e:
            print("Error occurred:", e)
            return None

In [16]:
data=Wrangling()

In [17]:
data.process_folder(folder_path="/content/H_59__combined.csv")

Unnamed: 0,LAT,LON,AAAAMMJJHH,RR1,FF,TN50,nom
H_59__combined.csv,0.0,0.0,0.0,4.545624,17.209192,82.832443,H_59__combined.csv
