In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
from sqlalchemy import create_engine
import logging
from tqdm import tqdm

In [7]:
# Configuration
DATA_DIR = "data_part1/Data_All_Variables_2019_2024"  # Remplacer par votre chemin
DB_CONFIG = {
    'postgresql': 'postgresql://user:password@localhost:5432/db_name',
    'mysql': 'mysql+pymysql://user:password@localhost:3306/db_name'
}
OUTPUT_CSV = "./meteo_data_cleaned.csv"
TABLE_NAME = "meteo_data"

# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MeteoETL')

In [8]:
# --------------------------------------------------
# √âtape 2 : Chargement des donn√©es
# --------------------------------------------------
def load_data(directory):
    """Charge les donn√©es CSV depuis les sous-dossiers annuels"""
    logger.info("D√©marrage du chargement des donn√©es...")
    
    all_dfs = []
    years = [str(y) for y in range(2019, 2025)]  # 2019 √† 2024
    
    for year in tqdm(years, desc="Ann√©es"):
        year_path = os.path.join(directory, year)
        if not os.path.exists(year_path):
            logger.warning(f"Dossier {year} non trouv√© - Ignor√©")
            continue
            
        for file in tqdm(os.listdir(year_path), desc=f"Fichiers {year}", leave=False):
            if file.endswith(".csv"):
                file_path = os.path.join(year_path, file)
                try:
                    # Chargement en sp√©cifiant les colonnes
                    df = pd.read_csv(
                        file_path,
                        header=None,
                        usecols=[1, 2, 3, 4, 5],
                        names=['datetime', 'temperature', 'irradiance', 'humidity', 'wind_speed']
                    )
                    all_dfs.append(df)
                except Exception as e:
                    logger.error(f"Erreur sur {file_path}: {str(e)}")
    
    if not all_dfs:
        raise ValueError("Aucune donn√©e charg√©e - V√©rifiez le chemin des donn√©es")
    
    full_df = pd.concat(all_dfs, ignore_index=True)
    logger.info(f"Chargement termin√© : {len(full_df)} lignes charg√©es")
    return full_df

# --------------------------------------------------
# √âtape 3 : Nettoyage des donn√©es
# --------------------------------------------------
def clean_data(df):
    """Nettoie et transforme le dataframe"""
    logger.info("D√©marrage du nettoyage des donn√©es...")
    
    # Conversion des dates (format jour/mois)
    df['datetime'] = pd.to_datetime(
        df['datetime'],
        format='%d/%m/%Y %H:%M:%S.%f',
        errors='coerce'
    )
    
    # Suppression des dates invalides
    initial_count = len(df)
    df = df.dropna(subset=['datetime'])
    logger.info(f"Dates invalides supprim√©es : {initial_count - len(df)} lignes")
    
    # Filtrage des valeurs aberrantes
    df = df[
        (df['temperature'].between(-50, 60)) &
        (df['irradiance'] >= 0) &
        (df['humidity'].between(0, 100)) &
        (df['wind_speed'] >= 0)
    ]
    logger.info(f"Valeurs aberrantes supprim√©es : {initial_count - len(df)} lignes")
    
    # Suppression des doublons temporels
    df = df.sort_values('datetime')
    df = df.drop_duplicates(subset=['datetime'], keep='last')
    logger.info(f"Doublons temporels supprim√©s : {initial_count - len(df)} lignes")
    
    # Arrondissement des valeurs
    df = df.round({
        'temperature': 1,
        'irradiance': 3,
        'humidity': 1,
        'wind_speed': 2
    })
    
    logger.info(f"Donn√©es nettoy√©es : {len(df)} lignes restantes")
    return df


# --------------------------------------------------
# √âtape 4 : Sauvegarde dans les bases de donn√©es
# --------------------------------------------------
def save_to_db(df, db_type, table_name=TABLE_NAME):
    """Sauvegarde les donn√©es dans la base sp√©cifi√©e"""
    logger.info(f"D√©marrage de la sauvegarde dans {db_type.upper()}...")
    
    try:
        engine = create_engine(DB_CONFIG[db_type])
        
        # Cr√©ation de la table avec types optimis√©s
        dtype_map = {
            'datetime': 'TIMESTAMP PRIMARY KEY',
            'temperature': 'FLOAT',
            'irradiance': 'FLOAT',
            'humidity': 'FLOAT',
            'wind_speed': 'FLOAT'
        }
        
        # Insertion par batch
        df.to_sql(
            name=table_name,
            con=engine,
            if_exists='append',
            index=False,
            chunksize=10000,
            dtype=dtype_map,
            method='multi'  # Insertion multi-lignes
        )
        
        logger.info(f"Donn√©es sauvegard√©es avec succ√®s dans {db_type.upper()}!")
        return True
    except Exception as e:
        logger.error(f"Erreur DB {db_type}: {str(e)}")
        return False

# %%
# --------------------------------------------------
# √âtape 5 : Pipeline complet
# --------------------------------------------------
def run_pipeline():
    """Ex√©cute le pipeline ETL complet"""
    try:
        # Extraction
        raw_df = load_data(DATA_DIR)
        
        # Transformation
        cleaned_df = clean_data(raw_df)
        
        # Chargement
        save_to_db(cleaned_df, 'postgresql')
        save_to_db(cleaned_df, 'mysql')
        
        # Sauvegarde locale
        cleaned_df.to_csv(OUTPUT_CSV, index=False)
        logger.info(f"Sauvegarde CSV locale : {OUTPUT_CSV}")
        
        # Rapport final
        logger.info("TRAITEMENT TERMIN√â AVEC SUCC√àS!")
        return cleaned_df
    
    except Exception as e:
        logger.exception("ERREUR CRITIQUE DANS LE PIPELINE")
        return None

---

---

In [9]:
YEARS = [str(y) for y in range(2019, 2025)]  # 2019-2024

print("‚úÖ Configuration initiale termin√©e")
print(f"üìÅ Dossier de donn√©es: {DATA_DIR}")
print(f"üìÖ Ann√©es √† traiter: {', '.join(YEARS)}")

‚úÖ Configuration initiale termin√©e
üìÅ Dossier de donn√©es: data_part1/Data_All_Variables_2019_2024
üìÖ Ann√©es √† traiter: 2019, 2020, 2021, 2022, 2023, 2024


In [10]:
# --------------------------------------------------
# 2. Exploration de la structure des fichiers
# --------------------------------------------------
print("\nüîç Exploration de la structure des fichiers...")

file_counts = {}
for year in YEARS:
    year_path = os.path.join(DATA_DIR, year)
    if os.path.exists(year_path):
        num_files = len([f for f in os.listdir(year_path) if f.endswith('.csv')])
        file_counts[year] = num_files
        print(f" - {year}: {num_files} fichiers CSV")
    else:
        print(f" - {year}: Dossier introuvable!")


üîç Exploration de la structure des fichiers...
 - 2019: 13 fichiers CSV
 - 2020: 12 fichiers CSV
 - 2021: 13 fichiers CSV
 - 2022: 12 fichiers CSV
 - 2023: 12 fichiers CSV
 - 2024: 11 fichiers CSV


In [20]:
# --------------------------------------------------
# 3. Chargement d'un fichier exemple
# --------------------------------------------------
sample_file = None
for year in YEARS:
    year_path = os.path.join(DATA_DIR, year)
    if os.path.exists(year_path) and os.listdir(year_path):
        sample_file = os.path.join(year_path, os.listdir(year_path)[0])
        break

if sample_file:
    print(f"\nüìÇ Fichier exemple: {sample_file}")
    
    # Chargement sans traitement
    sample_df = pd.read_csv(sample_file, delimiter=';')
    sample_df = sample_df.iloc[:, :-1]
    
    print("\nüìä Structure brute du fichier:")
    print(f"- Dimensions: {sample_df.shape[0]} lignes x {sample_df.shape[1]} colonnes")
    print("- Aper√ßu des donn√©es:")
    display(sample_df.head(3))
    
    # V√©rification des colonnes
    print("\nüîç Analyse des colonnes:")
    for i in range(sample_df.shape[1]):
        col_data = sample_df.iloc[:, i]
        null_count = col_data.isnull().sum()
        unique_count = col_data.nunique()
        print(f"Colonne {i}: {null_count} valeurs manquantes, {unique_count} valeurs uniques")
else:
    print("‚ùå Aucun fichier CSV trouv√©!")


üìÇ Fichier exemple: data_part1/Data_All_Variables_2019_2024/2019/140419-14052019AllVariables.csv

üìä Structure brute du fichier:
- Dimensions: 1444 lignes x 5 colonnes
- Aper√ßu des donn√©es:


Unnamed: 0,FechaHora,ROOT.meteoPDL.AirTemperature.hf,ROOT.meteoPDL.Pyr1IrradianceCompensated.hf,ROOT.meteoPDL.RelativeHumidity.hf,ROOT.meteoPDL.WindSpeed.hf
0,14/04/2019 00:00:59.000,31.700001,0.02737,17.251152,0.84757
1,14/04/2019 00:30:59.000,31.874424,0.09228,16.5,1.893272
2,14/04/2019 01:00:59.000,32.0,0.0,15.3,2.09578



üîç Analyse des colonnes:
Colonne 0: 0 valeurs manquantes, 1444 valeurs uniques
Colonne 1: 0 valeurs manquantes, 733 valeurs uniques
Colonne 2: 0 valeurs manquantes, 1317 valeurs uniques
Colonne 3: 0 valeurs manquantes, 1291 valeurs uniques
Colonne 4: 0 valeurs manquantes, 1270 valeurs uniques


In [32]:
sample_df.columns

Index(['FechaHora', 'ROOT.meteoPDL.AirTemperature.hf',
       'ROOT.meteoPDL.Pyr1IrradianceCompensated.hf',
       'ROOT.meteoPDL.RelativeHumidity.hf', 'ROOT.meteoPDL.WindSpeed.hf'],
      dtype='object')

In [31]:
# --------------------------------------------------
# 4. Chargement complet des donn√©es
# --------------------------------------------------
print("\nüöö D√©but du chargement des donn√©es...")

all_dfs = []
problem_files = []

for year in tqdm(YEARS, desc="Ann√©es"):
    year_path = os.path.join(DATA_DIR, year)
    if not os.path.exists(year_path):
        print(f"‚ö†Ô∏è Dossier {year} introuvable - Ignor√©")
        continue
        
    year_files = [f for f in os.listdir(year_path) if f.endswith('.csv')]
    for file in tqdm(year_files, desc=f"Fichiers {year}", leave=False):
        file_path = os.path.join(year_path, file)
        try:
            # Chargement des 6 colonnes attendues
            df = pd.read_csv(
                file_path,
                delimiter=';'
            )
            df = df.iloc[:, :-1]
            df.rename(columns=['datetime', 'airtemperature', 'irradiance', 'humidity', 'wind_speed'])
            all_dfs.append(df)
        except Exception as e:
            problem_files.append((file_path, str(e)))
    
if not all_dfs:
    raise ValueError("‚ùå Aucune donn√©e charg√©e!")

meteo_df = pd.concat(all_dfs, ignore_index=True)
print(f"\n‚úÖ Chargement termin√©: {len(meteo_df):,} lignes charg√©es")
print(f"‚ö†Ô∏è {len(problem_files)} fichiers avec erreurs")

# Affichage des probl√®mes
if problem_files:
    print("\nProbl√®mes rencontr√©s:")
    for i, (file, error) in enumerate(problem_files[:3]):
        print(f" - {file}: {error}")
    if len(problem_files) > 3:
        print(f" - ... et {len(problem_files)-3} autres")


üöö D√©but du chargement des donn√©es...


Ann√©es: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 6/6 [00:00<00:00, 14.95it/s]


ValueError: ‚ùå Aucune donn√©e charg√©e!

In [29]:
meteo_df.head(10)

Unnamed: 0,datetime,airtemperature,irradiance,humidity,wind_speed
0,31.700001,0.02737,17.251152,0.84757,
1,31.874424,0.09228,16.5,1.893272,
2,32.0,0.0,15.3,2.09578,
3,31.981928,0.021053,15.171184,2.0,
4,28.387892,0.57,21.265627,2.2422,
5,30.5,0.0,17.071901,2.683918,
6,29.0,0.368471,19.300001,0.697779,
7,29.961927,0.0,16.338074,1.557688,
8,30.428099,0.0,15.343803,1.1,
9,30.200001,0.096841,14.770206,1.355424,
