## Pipeline de Transformation & Fusion des Données Météo
Ce script constitue le cœur du pipeline : il normalise les données issues des stations personnelles, enrichit chaque mesure avec des métadonnées, intègre les données InfoClimat, fusionne les deux sources en comblant les valeurs manquantes, puis génère un fichier final prêt pour ingestion dans MongoDB.

 1. Métadonnées des stations personnelles
Deux stations personnelles sont définies :

ILAMAD25 → La Madeleine (FR)

IICHTE19 → Ichtegem (BE)

Chaque station possède : nom, coordonnées, altitude, matériel, logiciel et type de source.

2. Fonctions utilitaires
Le script inclut plusieurs fonctions essentielles :
   - Chargement & sauvegarde JSON
   - Conversion sécurisée
   - Normalisation du schéma
   - Enrichissement des mesures
   - Test d’intégrité

3. Pipeline stations personnelles
process_personal_station(input_file, station_id) :
  - Charge le JSON brut
  - Normalise chaque mesure
  - Enrichit avec les métadonnées station
  - Ajoute au tableau global ALL_DOCS

4. Pipeline InfoClimat
process_infoclimat(input_file) :
  - Ajoute dynamiquement les stations InfoClimat au dictionnaire STATIONS_META
  - Normalise toutes les mesures horaires
  - Enrichit avec les métadonnées InfoClimat
  - Retourne une liste complète de documents prêts à fusionner

5. Fusion des données InfoClimat → Stations personnelles
merge_infoclimat(personal_docs, infoclimat_docs, mapping) :
 - Associe chaque station personnelle à une station InfoClimat via un mapping
 - Pour chaque mesure personnelle :
 - recherche la mesure InfoClimat la plus proche dans le temps
 - remplace uniquement les champs manquants ou nuls
 - Conserve l’intégrité des données d’origine

6. Exécution principale
Le bloc MAIN :
 - Charge et traite les deux stations personnelles
 - Charge et traite InfoClimat
 - Applique le mapping :
        * ILAMAD25 → 07015
        * IICHTE19 → 07016
 - Fusionne les données
 - Exécute les tests d’intégrité
 - Sauvegarde le fichier final :
output/final_mongo.json

In [1]:
import json
import re
from pathlib import Path
import pandas as pd
from uuid import uuid4
import math
from datetime import datetime
# =====================================================
# MÉTADONNÉES STATIONS PERSONNELLES
# =====================================================
STATIONS_META = {
    "ILAMAD25": {
        "name": "La Madeleine",
        "lat": 50.659,
        "lon": 3.07,
        "elevation": 23,
        "city": "La Madeleine",
        "hardware": "other",
        "software": "EasyWeatherPro_V5.1.6",
        "source": "Personal Station"
    },
    "IICHTE19": {
        "name": "WeerstationBS",
        "lat": 51.092,
        "lon": 2.999,
        "elevation": 15,
        "city": "Ichtegem",
        "hardware": "other",
        "software": "EasyWeatherV1.6.6",
        "source": "Personal Station"
    }
}

# =====================================================
# FONCTIONS UTILITAIRES
# =====================================================
def is_missing(x):
    return x is None or (isinstance(x, float) and math.isnan(x))

def load_json(path):
    with open(path, encoding="utf-8") as f:
        return json.load(f)

def save_json(path, content):
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(content, f, indent=4, ensure_ascii=False)
def to_float(v):
    if v is None:
        return None

    if isinstance(v, (int, float)):
        return float(v)

    if isinstance(v, str):
        # extraire le premier nombre (gère "0.01 in", "65.0 °F", "72 %")
        match = re.search(r"-?\d+(\.\d+)?", v)
        if match:
            return float(match.group())

    return None


#def to_float(v):
 #   try:
  #      return float(v)
   # except:
    #    return None


def parse_personal_datetime(date_str, time_str):
    """
    Convertit '071024' + '14:44:00' → '2024-10-07T14:44:00Z'
    Format perso = DDMMYY
    """
    try:
        dt = datetime.strptime(date_str + " " + time_str, "%d%m%y %H:%M:%S")
        return dt.isoformat() + "Z"
    except:
        return None


def normalize_measure(record):
    if not isinstance(record, dict):
        return {
            "timestamp": None,
            "temperature": None,
            "humidity": None,
            "pressure": None,
            "wind_speed": None,
            "wind_gust": None,
            "rain_rate": 0.0,
            "rain_total": 0.0,
            "uv": None,
            "solar": None
        }

    def get(*keys):
        for k in keys:
            if k in record and record[k] not in ["", None, " "]:
                return record[k]
        return None

    # ====== CORRECTION DATE / TIME ======
    ts = None

    date_str = record.get("date")      # ex: "071024"
    time_str = record.get("time")      # ex: "14:44:00"

    if date_str and time_str and len(date_str) == 6:
        day = date_str[:2]
        month = date_str[2:4]
        year = "20" + date_str[4:6]

        ts_str = f"{year}-{month}-{day} {time_str}"
        ts = pd.to_datetime(ts_str, utc=True, errors="coerce")

    # fallback (InfoClimat)
    if ts is None:
        raw_ts = get("timestamp", "datetime", "dh_utc")
        if raw_ts:
            ts = pd.to_datetime(raw_ts, utc=True, errors="coerce")

    ts = ts.isoformat() if ts is not None and not pd.isna(ts) else None


   


    return {
        "timestamp": ts,
        "temperature": to_float(get("temperature", "temp")),
        "humidity": to_float(get("humidity", "humidite")),
        "pressure": to_float(get("pressure", "pression")),
        "wind_speed": to_float(get("wind_speed", "vent_moyen")),
        "wind_gust": to_float(get("wind_gust", "vent_rafales")),
        "rain_rate": to_float(get("pluie_1h", "precip_1h", "precip_rate")) or 0.0,
        "rain_total": to_float(get("pluie_3h", "precip_3h", "precip_accum")) or 0.0,
        "uv": to_float(get("uv")),
        "solar": to_float(get("solar"))
    }

def enrich_measure(station_id, rec):
    """Ajout des métadonnées station + record_id"""
    meta = STATIONS_META.get(station_id, {})
    return {
        "record_id": uuid4().hex,
        "station_id": station_id,
        "name": meta.get("name"),
        "city": meta.get("city"),
        "location": {
            "latitude": meta.get("lat"),
            "longitude": meta.get("lon"),
            "elevation": meta.get("elevation")
        },
        "hardware": meta.get("hardware"),
        "software": meta.get("software"),
        "source": meta.get("source"),
        "timestamp": rec["timestamp"],
        "temperature": rec["temperature"],
        "humidity": rec["humidity"],
        "pressure": rec["pressure"],
        "wind_speed": rec["wind_speed"],
        "wind_gust": rec["wind_gust"],
        "rain_rate": rec["rain_rate"],
        "rain_total": rec["rain_total"],
        "uv": rec["uv"],
        "solar": rec["solar"]
    }

def test_integrity(records, label):
    """Test automatique de l'intégrité des données"""
    df = pd.DataFrame(records)
    print(f"\n===== TEST → {label} =====")
    print(f"Documents : {len(df)}")
    if df.empty:
        return
    missing = df.isna().sum()
    print("Valeurs manquantes :")
    print(missing[missing > 0] if missing.sum() else "RAS")
    if "station_id" in df.columns and "timestamp" in df.columns:
        dup = df.duplicated(["station_id", "timestamp"]).sum()
        print(f"Doublons détectés : {dup}")


def is_missing(x):
    return x is None or (isinstance(x, float) and math.isnan(x))

#Fusionne les valeurs InfoClimat pour remplacer les null des stations perso.
   # On cherche la mesure InfoClimat la plus proche de chaque timestamp perso.

def merge_infoclimat(personal_docs, infoclimat_docs, mapping):
    # Index InfoClimat par station
    ic_index = {}
    for ic in infoclimat_docs:
        ic_station = ic.get("station_id")
        if ic_station:
            ic_index.setdefault(ic_station, []).append(ic)

    merged = []

    for r in personal_docs:
        station = r.get("station_id")
        ic_station = mapping.get(station)

        # Pas de station InfoClimat correspondante
        if not ic_station or ic_station not in ic_index:
            merged.append(r)
            continue

        # Timestamp perso
        t_r = pd.to_datetime(r.get("timestamp"), errors="coerce")
        # Si timestamp invalide → on ne fusionne pas
        if pd.isna(t_r):
            merged.append(r)
            continue
            # On fusionne InfoClimat uniquement pour octobre
        if t_r.month != 10:
            merged.append(r)
            continue

        # Chercher la mesure IC la plus proche
        closest = None
        min_diff = pd.Timedelta("1h")  # fenêtre max 1h

        for ic_r in ic_index[ic_station]:
            ts_ic = ic_r.get("timestamp")
            if ts_ic is None:
                continue

            t_ic = pd.to_datetime(ts_ic, errors="coerce")
            if pd.isna(t_ic):
                continue

            diff = abs(t_r - t_ic)
            if diff < min_diff:
                min_diff = diff
                closest = ic_r

        # Fusion
        if closest:
            normal_fields = ["temperature", "humidity", "pressure", "wind_speed", "wind_gust"]
            rain_fields = ["rain_rate", "rain_total"]

            # Champs météo classiques
            for f in normal_fields:
                if is_missing(r.get(f)) and not is_missing(closest.get(f)):
                    r[f] = closest[f]

            # Pluie : 0.0 est une valeur valide → on ne remplace que les None
            for f in rain_fields:
                if is_missing(r.get(f)) and not is_missing(closest.get(f)):
                    r[f] = closest[f]

        merged.append(r)

    return merged


# =====================================================
# PIPELINE STATIONS PERSONNELLES
# =====================================================
ALL_DOCS = []

def process_personal_station(input_file, station_id):
    raw = load_json(input_file)
    measures = raw if isinstance(raw, list) else raw.get("data", [])
    normalized = [normalize_measure(m) for m in measures]
    #  FILTRAGE DES LIGNES INVALIDES (sans timestamp)
    normalized = [m for m in normalized if m["timestamp"] is not None]
    enriched = [enrich_measure(station_id, m) for m in normalized]
    ALL_DOCS.extend(enriched)

# =====================================================
# PIPELINE INFOCLIMAT
# =====================================================
def process_infoclimat(input_file):
    infoclimat_docs = []
    raw = load_json(input_file)
    # Ajouter les stations au meta
    for s in raw.get("stations", []):
        STATIONS_META[s["id"]] = {
            "name": s["name"],
            "lat": s["latitude"],
            "lon": s["longitude"],
            "elevation": s["elevation"],
            "city": s["name"],
            "hardware": "professional",
            "software": "InfoClimat",
            "source": "InfoClimat"
        }

    for station_id, measures in raw.get("hourly", {}).items():
        measures = [normalize_measure(m) for m in measures if isinstance(m, dict)]
        enriched = [enrich_measure(station_id, m) for m in measures]
        infoclimat_docs.extend(enriched)

    return infoclimat_docs

# =====================================================
# MAIN
# =====================================================
if __name__ == "__main__":
    print("===== PIPELINE DE TRANSFORMATION & FUSION =====")

    # Stations perso
    process_personal_station("data/la_madeleine.json", "ILAMAD25")
    process_personal_station("data/ichtegem.json", "IICHTE19")
    print("=== échantillon IICHTE19 (perso) ===")
    for r in ALL_DOCS:
        if r["station_id"] == "IICHTE19":
            print(r) 
            break

    
    # InfoClimat
    infoclimat_docs = process_infoclimat("data/InfoClimat.json")

    print("=== Stations InfoClimat disponibles ===")
    print(sorted({r["station_id"] for r in infoclimat_docs}))
    print("=== Exemple données InfoClimat pour 07016 ===")
    for r in infoclimat_docs:
        if r["station_id"] == "07015":
            print(r)
            break


    # Mapping perso → InfoClimat
    mapping = {
        "ILAMAD25": "07015",
        "IICHTE19": "000R5"
        #"IICHTE19": "07016"
    }
    
    # Fusion des données InfoClimat dans stations perso
    ALL_DOCS[:] = merge_infoclimat(ALL_DOCS, infoclimat_docs, mapping)

    # Tests d’intégrité
    test_integrity(ALL_DOCS, "FINAL JSON FUSIONNÉ")

    # Sauvegarde finale
    save_json("output/final_mongo.json", ALL_DOCS)
    print(f"\n FINI — {len(ALL_DOCS)} documents prêts pour MongoDB")


===== PIPELINE DE TRANSFORMATION & FUSION =====
=== échantillon IICHTE19 (perso) ===
{'record_id': 'ef17bc9731094d35a3469b0afd8ad446', 'station_id': 'IICHTE19', 'name': 'WeerstationBS', 'city': 'Ichtegem', 'location': {'latitude': 51.092, 'longitude': 2.999, 'elevation': 15}, 'hardware': 'other', 'software': 'EasyWeatherV1.6.6', 'source': 'Personal Station', 'timestamp': '2024-10-01T00:04:00+00:00', 'temperature': 56.8, 'humidity': 87.0, 'pressure': 29.48, 'wind_speed': 8.2, 'wind_gust': 10.4, 'rain_rate': 0.0, 'rain_total': 0.0, 'uv': 0.0, 'solar': 0.0}
=== Stations InfoClimat disponibles ===
['00052', '000R5', '07015', 'STATIC0010']
=== Exemple données InfoClimat pour 07016 ===
{'record_id': '903adc1eb5e04500989463d800f5466a', 'station_id': '07015', 'name': 'Lille-Lesquin', 'city': 'Lille-Lesquin', 'location': {'latitude': 50.575, 'longitude': 3.092, 'elevation': 47}, 'hardware': 'professional', 'software': 'InfoClimat', 'source': 'InfoClimat', 'timestamp': '2024-10-05T00:00:00+00:00