# 📥 DataSens E1 — Notebook 3 : Ingestion des 5 Sources

**🎯 Objectif** : Ingérer réellement les 5 types de sources avec traçabilité complète

---

## 📋 Plan d'ingestion

1. **Fichier plat CSV** : Kaggle (50% → Postgres, 50% → raw)
2. **Base de données** : Kaggle SQLite → Postgres
3. **API** : OpenWeatherMap → meteo + flux
4. **Web Scraping** : MonAvisCitoyen (dry-run) → document
5. **Big Data** : GDELT GKG → evenement + document_evenement

**Traçabilité** : Manifest JSON par run avec chemins, compteurs, horodatages

---

## 🔒 RGPD & Gouvernance

⚠️ **Rappel** : Pas de données personnelles directes (hash SHA-256), respect robots.txt



In [None]:
# Configuration et imports (architecture pipeline complète)
import hashlib
import json
import logging
import os
import time
import traceback
from datetime import UTC, datetime
from pathlib import Path

import pandas as pd
import requests
from dotenv import load_dotenv
from minio import Minio
from sqlalchemy import create_engine, text
from tqdm import tqdm

# Configuration
NOTEBOOK_DIR = Path.cwd()
PROJECT_ROOT = NOTEBOOK_DIR.parent if NOTEBOOK_DIR.name == "notebooks" else NOTEBOOK_DIR
load_dotenv(PROJECT_ROOT / ".env")

PG_HOST = os.getenv("POSTGRES_HOST", "localhost")
PG_PORT = int(os.getenv("POSTGRES_PORT", "5432"))
PG_DB = os.getenv("POSTGRES_DB", "datasens")
PG_USER = os.getenv("POSTGRES_USER", "ds_user")
PG_PASS = os.getenv("POSTGRES_PASS", "ds_pass")

PG_URL = f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}"
engine = create_engine(PG_URL, future=True)

# Configuration MinIO (DataLake)
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://localhost:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "miniouser")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "miniosecret")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "datasens-raw")

RAW_DIR = PROJECT_ROOT / "data" / "raw"
MANIFESTS_DIR = RAW_DIR / "manifests"
LOGS_DIR = PROJECT_ROOT / "logs"

# Créer dossiers
RAW_DIR.mkdir(parents=True, exist_ok=True)
MANIFESTS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)

# =====================================================
# SYSTÈME DE LOGGING (comme datasens_E1_v2.ipynb)
# =====================================================
log_timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
log_file = LOGS_DIR / f"collecte_{log_timestamp}.log"
error_file = LOGS_DIR / f"errors_{log_timestamp}.log"

logger = logging.getLogger("DataSens")
logger.setLevel(logging.DEBUG)

file_formatter = logging.Formatter(
    "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
console_formatter = logging.Formatter(
    "[%(asctime)s] %(levelname)s - %(message)s",
    datefmt="%H:%M:%S"
)

file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(file_formatter)

error_handler = logging.FileHandler(error_file, encoding="utf-8")
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(file_formatter)

console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(console_formatter)

logger.addHandler(file_handler)
logger.addHandler(error_handler)
logger.addHandler(console_handler)

def log_error(source: str, error: Exception, context: str = ""):
    """Log une erreur avec traceback complet"""
    error_msg = f"[{source}] {context}: {error!s}"
    logger.error(error_msg)
    logger.error(f"Traceback:\n{traceback.format_exc()}")

logger.info("🚀 Système de logging initialisé")
logger.info(f"📁 Logs: {log_file}")
logger.info(f"❌ Erreurs: {error_file}")

# =====================================================
# MINIO CLIENT (DataLake)
# =====================================================
try:
    minio_client = Minio(
        MINIO_ENDPOINT.replace("http://", "").replace("https://", ""),
        access_key=MINIO_ACCESS_KEY,
        secret_key=MINIO_SECRET_KEY,
        secure=MINIO_ENDPOINT.startswith("https")
    )

    def ensure_bucket(bucket: str = MINIO_BUCKET):
        if not minio_client.bucket_exists(bucket):
            minio_client.make_bucket(bucket)

    def minio_upload(local_path: Path, dest_key: str) -> str:
        """Upload fichier vers MinIO DataLake"""
        ensure_bucket(MINIO_BUCKET)
        minio_client.fput_object(MINIO_BUCKET, dest_key, str(local_path))
        return f"s3://{MINIO_BUCKET}/{dest_key}"

    ensure_bucket()
    logger.info(f"✅ MinIO OK → bucket: {MINIO_BUCKET}")
except Exception as e:
    logger.warning(f"⚠️ MinIO non disponible: {e} - Mode local uniquement")
    minio_client = None
    def minio_upload(local_path: Path, dest_key: str) -> str:
        return f"local://{local_path}"

# =====================================================
# FONCTIONS UTILITAIRES
# =====================================================
def ts() -> str:
    """Timestamp UTC ISO compact"""
    return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")

def sha256(s: str) -> str:
    """Hash SHA-256 pour déduplication"""
    return hashlib.sha256(s.encode("utf-8")).hexdigest()

def get_source_id(conn, nom: str) -> int:
    """Récupère l'id_source depuis le nom"""
    logger.info(f"[get_source_id] Recherche source: {nom}")
    result = conn.execute(text("SELECT id_source FROM source WHERE nom = :nom"), {"nom": nom}).fetchone()
    if result:
        logger.info(f"   → id_source trouvé: {result[0]}")
        return result[0]
    logger.warning(f"   → Source non trouvée: {nom}")
    return None

def create_flux(conn, id_source: int, format_type: str = "csv", manifest_uri: str = None) -> int:
    """Crée un flux et retourne id_flux"""
    logger.info(f"[create_flux] Création flux pour id_source={id_source}, format={format_type}")
    result = conn.execute(text("""
        INSERT INTO flux (id_source, format, manifest_uri)
        VALUES (:id_source, :format, :manifest_uri)
        RETURNING id_flux
    """), {"id_source": id_source, "format": format_type, "manifest_uri": manifest_uri})
    id_flux = result.scalar()
    logger.info(f"   → id_flux créé: {id_flux}")
    return id_flux

def ensure_territoire(conn, ville: str, code_insee: str = None, lat: float = None, lon: float = None) -> int:
    """Crée ou récupère un territoire"""
    logger.info(f"[ensure_territoire] Vérification territoire: ville={ville}")
    result = conn.execute(text("SELECT id_territoire FROM territoire WHERE ville = :ville"), {"ville": ville}).fetchone()
    if result:
        logger.info(f"   → id_territoire existant: {result[0]}")
        return result[0]
    result = conn.execute(text("""
        INSERT INTO territoire (ville, code_insee, lat, lon)
        VALUES (:ville, :code_insee, :lat, :lon)
        RETURNING id_territoire
    """), {"ville": ville, "code_insee": code_insee, "lat": lat, "lon": lon})
    id_territoire = result.scalar()
    logger.info(f"   → id_territoire créé: {id_territoire}")
    return id_territoire

def insert_documents(conn, docs: list) -> int:
    """Insertion batch de documents avec gestion doublons"""
    logger.info(f"[insert_documents] Insertion de {len(docs)} documents...")
    inserted = 0
    for doc in docs:
        try:
            result = conn.execute(text("""
                INSERT INTO document (id_flux, id_territoire, titre, texte, langue, date_publication, hash_fingerprint)
                VALUES (:id_flux, :id_territoire, :titre, :texte, :langue, :date_publication, :hash_fingerprint)
                ON CONFLICT (hash_fingerprint) DO NOTHING
                RETURNING id_doc
            """), doc)
            id_doc = result.scalar()
            if id_doc:
                logger.info(f"   → Document inséré: id_doc={id_doc}, titre={doc.get('titre', '')[:40]}")
                inserted += 1
        except Exception as e:
            log_error("insert_documents", e, "Erreur insertion document")
    logger.info(f"   → Total insérés: {inserted}/{len(docs)}")
    return inserted

print("✅ Configuration pipeline chargée")
print(f"   📍 PostgreSQL : {PG_HOST}:{PG_PORT}/{PG_DB}")
print(f"   ☁️ MinIO : {MINIO_BUCKET if minio_client else 'Mode local'}")
print(f"   📂 Raw data : {RAW_DIR}")
print(f"   📄 Logs : {LOGS_DIR}")
print("\n✅ Pipeline DataLake + PostgreSQL prêt !")


## 📄 Source 1/5 : Fichier plat CSV (Kaggle)

**Architecture hybride (comme datasens_E1_v2.ipynb)** :
- **50% → PostgreSQL** : Données structurées pour requêtes SQL
- **50% → MinIO DataLake** : Données brutes pour analyses Big Data futures

**Process** :
1. Chargement CSV depuis `data/raw/kaggle/`
2. Calcul SHA256 fingerprint pour déduplication
3. Split aléatoire 50/50
4. Upload 50% vers MinIO (DataLake)
5. Insertion 50% dans PostgreSQL avec traçabilité (id_flux)


In [None]:
logger.info("📄 SOURCE 1/5 : Fichier plat CSV (Kaggle)")
logger.info("=" * 80)

# Rechercher fichier Kaggle existant ou créer échantillon
kaggle_csv_paths = [
    RAW_DIR / "kaggle" / "kaggle_sample.csv",
    PROJECT_ROOT / "data" / "raw" / "kaggle" / "*.csv",
    Path.cwd() / "data" / "raw" / "kaggle" / "*.csv"
]

kaggle_csv_path = None
for path in kaggle_csv_paths:
    if path.exists():
        kaggle_csv_path = path
        break

if not kaggle_csv_path or not kaggle_csv_path.exists():
    logger.warning("⚠️ Fichier Kaggle non trouvé — Création échantillon pour démo")
    sample_data = pd.DataFrame({
        "text": [
            "Great product, very satisfied!",
            "Service terrible, avoid at all costs",
            "Excellent quality, recommend",
            "Bon produit, je recommande",
            "Mauvais service, déçu"
        ],
        "langue": ["en", "en", "en", "fr", "fr"],
        "date": [datetime.now(UTC)] * 5
    })
    kaggle_csv_path = RAW_DIR / "kaggle" / "kaggle_sample.csv"
    kaggle_csv_path.parent.mkdir(parents=True, exist_ok=True)
    sample_data.to_csv(kaggle_csv_path, index=False)
    logger.info(f"   ✅ Échantillon créé : {kaggle_csv_path.name}")

# Charger le CSV
df_kaggle = pd.read_csv(kaggle_csv_path)
logger.info(f"📊 {len(df_kaggle)} lignes chargées")

# Split 50/50 (architecture hybride : PostgreSQL + MinIO)
df_kaggle["hash_fingerprint"] = df_kaggle["text"].apply(lambda x: sha256(str(x)))
mid_point = len(df_kaggle) // 2
df_pg = df_kaggle.iloc[:mid_point].copy()  # 50% → PostgreSQL
df_raw = df_kaggle.iloc[mid_point:].copy()  # 50% → MinIO DataLake

logger.info(f"   • 50% PostgreSQL : {len(df_pg)} lignes")
logger.info(f"   • 50% MinIO DataLake : {len(df_raw)} lignes")

# Sauvegarder 50% en raw local + upload MinIO
raw_output = RAW_DIR / "kaggle" / f"kaggle_raw_{ts()}.csv"
df_raw.to_csv(raw_output, index=False)
logger.info(f"   ✅ Sauvegardé local : {raw_output.name}")

# Upload MinIO (50% bruts vers DataLake)
try:
    minio_uri = minio_upload(raw_output, f"kaggle/{raw_output.name}")
    logger.info(f"   ☁️ Upload MinIO : {minio_uri}")
except Exception as e:
    log_error("MinIO", e, "Upload fichier Kaggle")
    minio_uri = f"local://{raw_output}"

# Insérer 50% dans PostgreSQL
with engine.begin() as conn:
    id_source = get_source_id(conn, "Kaggle CSV")
    if not id_source:
        id_type = conn.execute(text("SELECT id_type_donnee FROM type_donnee WHERE libelle = 'Fichier plat'")).scalar()
        conn.execute(text("""
            INSERT INTO source (id_type_donnee, nom, url, fiabilite)
            VALUES (:id_type, 'Kaggle CSV', 'https://www.kaggle.com', 0.8)
        """), {"id_type": id_type})
        id_source = conn.execute(text("SELECT id_source FROM source WHERE nom = 'Kaggle CSV'")).scalar()

    id_flux = create_flux(conn, id_source, "csv", minio_uri)

    # Préparer documents pour insertion batch
    docs = []
    for _, row in df_pg.iterrows():
        docs.append({
            "id_flux": id_flux,
            "id_territoire": None,
            "titre": "",
            "texte": str(row["text"]),
            "langue": row.get("langue", "en"),
            "date_publication": row.get("date", datetime.now(UTC)),
            "hash_fingerprint": row["hash_fingerprint"]
        })

    inserted = insert_documents(conn, docs)

logger.info(f"\n✅ Source 1/5 terminée : {inserted} docs PostgreSQL + {len(df_raw)} docs MinIO")


## 🔧 Architecture Pipeline (Référence datasens_E1_v2.ipynb)

**Ce notebook suit l'architecture du pipeline existant** :

✅ **Logging structuré** : `logs/collecte_*.log` + `logs/errors_*.log`  
✅ **MinIO DataLake** : Upload automatique fichiers bruts → `s3://datasens-raw/`  
✅ **PostgreSQL** : Insertion structurée avec traçabilité (flux, manifests)  
✅ **Fonctions helpers** : `create_flux()`, `insert_documents()`, `ensure_territoire()`, `minio_upload()`  
✅ **Déduplication** : Hash SHA-256 pour éviter doublons  
✅ **RGPD** : Pas de données personnelles directes  

**Sources 2-5** : Implémentées ci-dessous avec vraies sources (code extrait de `datasens_E1_v2.ipynb`)


## 🌦️ Source 2/5 : API OpenWeatherMap

Collecte de données météo en temps réel via l'API OpenWeatherMap.

**Villes collectées** : Paris, Lyon, Marseille, Lille

**Données récupérées** :
- Température (°C), Humidité (%), Pression (hPa)
- Description météo (clair, nuageux, pluie...)
- Vitesse du vent (m/s)
- Timestamp de mesure

**Stockage** :
- **PostgreSQL** : Table `meteo` avec géolocalisation (id_territoire FK)
- **MinIO** : CSV brut pour historisation complète

**RGPD** : Aucune donnée personnelle, données publiques uniquement


In [None]:
logger.info("🌦️ SOURCE 2/5 : API OpenWeatherMap")
logger.info("=" * 80)

# Variables d'environnement
OWM_API_KEY = os.getenv("OWM_API_KEY")
if not OWM_API_KEY:
    logger.warning("⚠️ OWM_API_KEY manquante dans .env - Source 2 ignorée")
else:
    OWM_CITIES = ["Paris,FR", "Lyon,FR", "Marseille,FR", "Lille,FR"]

    rows = []
    for c in tqdm(OWM_CITIES, desc="OWM"):
        try:
            r = requests.get(
                "https://api.openweathermap.org/data/2.5/weather",
                params={"q": c, "appid": OWM_API_KEY, "units": "metric", "lang": "fr"},
                timeout=10
            )
            if r.status_code == 200:
                j = r.json()
                rows.append({
                    "ville": j["name"],
                    "lat": j["coord"]["lat"],
                    "lon": j["coord"]["lon"],
                    "date_obs": pd.to_datetime(j["dt"], unit="s"),
                    "temperature": j["main"]["temp"],
                    "humidite": j["main"]["humidity"],
                    "vent_kmh": (j.get("wind", {}).get("speed") or 0) * 3.6,
                    "pression": j.get("main", {}).get("pressure"),
                    "meteo_type": j["weather"][0]["main"] if j.get("weather") else None
                })
        except Exception as e:
            log_error("OpenWeatherMap", e, f"Collecte météo {c}")

        time.sleep(1)  # Respect rate limit

    if len(rows) > 0:
        dfm = pd.DataFrame(rows)
        local = RAW_DIR / "api" / "owm" / f"owm_{ts()}.csv"
        local.parent.mkdir(parents=True, exist_ok=True)
        dfm.to_csv(local, index=False)

        try:
            minio_uri = minio_upload(local, f"api/owm/{local.name}")
            logger.info(f"   ☁️ Upload MinIO : {minio_uri}")
        except Exception as e:
            log_error("MinIO", e, "Upload fichier OWM")
            minio_uri = f"local://{local}"

        # Insertion PostgreSQL
        with engine.begin() as conn:
            id_source = get_source_id(conn, "OpenWeatherMap")
            if not id_source:
                id_type = conn.execute(text("SELECT id_type_donnee FROM type_donnee WHERE libelle = 'API'")).scalar()
                if id_type:
                    conn.execute(text("""
                        INSERT INTO source (id_type_donnee, nom, url, fiabilite)
                        VALUES (:id_type, 'OpenWeatherMap', 'https://openweathermap.org/api', 0.9)
                    """), {"id_type": id_type})
                    id_source = conn.execute(text("SELECT id_source FROM source WHERE nom = 'OpenWeatherMap'")).scalar()
                else:
                    logger.warning("   ⚠️ Type 'API' non trouvé dans type_donnee")

            if id_source:
                id_flux = create_flux(conn, id_source, "json", minio_uri)

                # Insérer territoires et météo
                for _, r in dfm.iterrows():
                    tid = ensure_territoire(conn, ville=r["ville"], lat=r["lat"], lon=r["lon"])
                    try:
                        conn.execute(text("""
                            INSERT INTO meteo(id_territoire, date_obs, temperature, humidite, vent_kmh, pression, meteo_type)
                            VALUES(:t, :d, :T, :H, :V, :P, :MT)
                        """), {
                            "t": tid, "d": r["date_obs"], "T": r["temperature"],
                            "H": r["humidite"], "V": r["vent_kmh"], "P": r["pression"], "MT": r["meteo_type"]
                        })
                    except Exception as e:
                        log_error("meteo", e, f"Insertion relevé {r['ville']}")

                logger.info(f"✅ Source 2/5 terminée : {len(dfm)} relevés météo insérés")
            else:
                logger.warning("   ⚠️ Source OpenWeatherMap non créée - insertion météo ignorée")
    else:
        logger.warning("⚠️ Aucun relevé météo collecté")


## 📰 Source 3/5 : Flux RSS Multi-Sources (Presse française)

Collecte d'articles d'actualité via 3 flux RSS français complémentaires.

**Sources** :
- **Franceinfo** : flux principal actualités nationales
- **20 Minutes** : actualités françaises grand public
- **Le Monde** : presse de référence

**Extraction** : titre, description, date publication, URL source

**Stockage** : PostgreSQL + MinIO

**Déduplication** : SHA256 sur (titre + description) pour éviter doublons inter-sources

**Parser** : Utilisation de `feedparser` pour robustesse


In [None]:
logger.info("📰 SOURCE 3/5 : Flux RSS Multi-Sources (Presse française)")
logger.info("=" * 80)

try:
    import feedparser
except ImportError:
    logger.error("❌ Module feedparser manquant - install: pip install feedparser")
    feedparser = None

if feedparser:
    RSS_SOURCES = {
        "Franceinfo": "https://www.francetvinfo.fr/titres.rss",
        "20 Minutes": "https://www.20minutes.fr/feeds/rss-une.xml",
        "Le Monde": "https://www.lemonde.fr/rss/une.xml"
    }

    all_rss_items = []

    for source_name, rss_url in RSS_SOURCES.items():
        logger.info(f"📡 Source : {source_name}")
        logger.info(f"   URL : {rss_url}")

        try:
            feed = feedparser.parse(rss_url)

            if len(feed.entries) == 0:
                logger.warning("   ⚠️ Aucun article trouvé")
                continue

            source_items = []
            for e in feed.entries[:100]:  # Max 100 par source
                titre = e.get("title", "").strip()
                texte = (e.get("summary", "") or e.get("description", "") or "").strip()
                dp = pd.to_datetime(e.get("published", ""), errors="coerce")
                url = e.get("link", "")

                if titre and texte:
                    source_items.append({
                        "titre": titre,
                        "texte": texte,
                        "date_publication": dp if pd.notna(dp) else datetime.now(UTC),
                        "langue": "fr",
                        "source_media": source_name,
                        "url": url
                    })

            all_rss_items.extend(source_items)
            logger.info(f"   ✅ {len(source_items)} articles collectés")

        except Exception as e:
            log_error(f"RSS_{source_name}", e, "Parsing flux RSS")
            logger.warning(f"   ⚠️ Erreur : {str(e)[:80]}")

        time.sleep(1)  # Respect rate limit

    # Consolidation DataFrame
    if len(all_rss_items) > 0:
        dfr = pd.DataFrame(all_rss_items)

        # Déduplication inter-sources
        dfr["hash_fingerprint"] = dfr.apply(lambda row: sha256(row["titre"] + " " + row["texte"]), axis=1)
        nb_avant = len(dfr)
        dfr = dfr.drop_duplicates(subset=["hash_fingerprint"])
        nb_apres = len(dfr)

        logger.info(f"🧹 Déduplication : {nb_avant} → {nb_apres} articles uniques ({nb_avant - nb_apres} doublons supprimés)")

        # Distribution par source
        logger.info("📊 Distribution par source :")
        for source in dfr["source_media"].value_counts().items():
            logger.info(f"   {source[0]:15s} : {source[1]:3d} articles")

        # Sauvegarde locale + MinIO
        local = RAW_DIR / "rss" / f"rss_multi_sources_{ts()}.csv"
        local.parent.mkdir(parents=True, exist_ok=True)
        dfr.to_csv(local, index=False)

        try:
            minio_uri = minio_upload(local, f"rss/{local.name}")
            logger.info(f"   ☁️ Upload MinIO : {minio_uri}")
        except Exception as e:
            log_error("MinIO", e, "Upload fichier RSS")
            minio_uri = f"local://{local}"

        # Insertion PostgreSQL
        with engine.begin() as conn:
            id_source = get_source_id(conn, "Flux RSS Multi-Sources")
            if not id_source:
                id_type = conn.execute(text("SELECT id_type_donnee FROM type_donnee WHERE libelle = 'API' OR libelle = 'Web Scraping'")).scalar()
                if id_type:
                    conn.execute(text("""
                        INSERT INTO source (id_type_donnee, nom, url, fiabilite)
                        VALUES (:id_type, 'Flux RSS Multi-Sources', 'https://www.francetvinfo.fr/titres.rss', 0.95)
                    """), {"id_type": id_type})
                    id_source = conn.execute(text("SELECT id_source FROM source WHERE nom = 'Flux RSS Multi-Sources'")).scalar()

            if id_source:
                id_flux = create_flux(conn, id_source, "rss", minio_uri)

                # Préparer documents pour insertion batch
                docs = []
                for _, row in dfr.iterrows():
                    docs.append({
                        "id_flux": id_flux,
                        "id_territoire": None,
                        "titre": row["titre"],
                        "texte": row["texte"],
                        "langue": row["langue"],
                        "date_publication": row["date_publication"],
                        "hash_fingerprint": row["hash_fingerprint"]
                    })

                inserted = insert_documents(conn, docs)
                logger.info(f"✅ Source 3/5 terminée : {inserted} articles RSS insérés")
            else:
                logger.warning("   ⚠️ Source RSS non créée - insertion ignorée")
    else:
        logger.warning("⚠️ Aucun article RSS collecté")
else:
    logger.warning("⚠️ Module feedparser manquant - Source 3 ignorée")


## 🌐 Source 4/5 : Web Scraping Multi-Sources (Dry-run MonAvisCitoyen)

Collecte de données citoyennes depuis sources légales et éthiques (version simplifiée pour E1).

**Sources implémentées (dry-run)** :
- **Vie-publique.fr** (RSS) : Consultations citoyennes nationales
- **data.gouv.fr** (API) : Open Data datasets CSV officiels

**Éthique & Légalité** :
- ✅ Open Data gouvernemental (.gouv.fr)
- ✅ Respect robots.txt
- ✅ APIs officielles uniquement
- ✅ Aucun scraping de sites privés sans autorisation

**Stockage** :
- **PostgreSQL** : Documents structurés
- **MinIO** : CSV bruts pour audit


In [None]:
logger.info("🌐 SOURCE 4/5 : Web Scraping Multi-Sources (Dry-run)")
logger.info("=" * 80)

all_scraping_data = []

# ============================================================
# SOURCE 1 : VIE-PUBLIQUE.FR (RSS)
# ============================================================
logger.info("🏛️ Source 1/2 : Vie-publique.fr (RSS)")

try:
    if feedparser:
        feed_url = "https://www.vie-publique.fr/rss"
        feed = feedparser.parse(feed_url)

        for entry in feed.entries[:50]:
            all_scraping_data.append({
                "titre": entry.get("title", ""),
                "texte": entry.get("summary", entry.get("description", "")),
                "source_site": "vie-publique.fr",
                "url": entry.get("link", ""),
                "date_publication": datetime(*entry.published_parsed[:6], tzinfo=UTC) if hasattr(entry, "published_parsed") else datetime.now(UTC),
                "langue": "fr"
            })

        logger.info(f"✅ Vie-publique.fr: {len([d for d in all_scraping_data if 'vie-publique' in d['source_site']])} articles collectés")
    else:
        logger.warning("   ⚠️ Module feedparser manquant")
except Exception as e:
    log_error("ViePublique", e, "Parsing RSS feed")
    logger.warning(f"   ⚠️ Vie-publique.fr: {str(e)[:100]} (skip)")

# ============================================================
# SOURCE 2 : DATA.GOUV.FR (API officielle)
# ============================================================
logger.info("📊 Source 2/2 : data.gouv.fr (API officielle)")

try:
    url = "https://www.data.gouv.fr/api/1/datasets/"
    params = {"q": "france", "page_size": 50}
    response = requests.get(url, params=params, timeout=10)
    response.raise_for_status()

    data = response.json()
    for dataset in data.get("data", []):
        all_scraping_data.append({
            "titre": dataset.get("title", ""),
            "texte": dataset.get("description", dataset.get("title", "")),
            "source_site": "data.gouv.fr",
            "url": f"https://www.data.gouv.fr/fr/datasets/{dataset.get('slug', '')}",
            "date_publication": datetime.fromisoformat(dataset.get("created_at", datetime.now(UTC).isoformat()).replace("Z", "+00:00")),
            "langue": "fr"
        })

    logger.info(f"✅ data.gouv.fr: {len([d for d in all_scraping_data if 'data.gouv' in d['source_site']])} datasets collectés")

except Exception as e:
    log_error("DataGouv", e, "Collecte datasets Open Data")
    logger.warning(f"   ⚠️ data.gouv.fr: {str(e)[:100]} (skip)")

# ============================================================
# CONSOLIDATION ET STORAGE
# ============================================================
if len(all_scraping_data) > 0:
    df_scraping = pd.DataFrame(all_scraping_data)

    # Nettoyage
    df_scraping = df_scraping[df_scraping["texte"].str.len() > 20].copy()
    df_scraping["hash_fingerprint"] = df_scraping["texte"].apply(lambda t: sha256(t[:500]))
    df_scraping = df_scraping.drop_duplicates(subset=["hash_fingerprint"])

    logger.info(f"📈 Total collecté: {len(df_scraping)} documents citoyens")
    logger.info(f"   • Vie Publique: {len(df_scraping[df_scraping['source_site'].str.contains('vie-publique', na=False)])}")
    logger.info(f"   • Data.gouv: {len(df_scraping[df_scraping['source_site'].str.contains('data.gouv', na=False)])}")

    # Storage MinIO
    scraping_dir = RAW_DIR / "scraping" / "multi"
    scraping_dir.mkdir(parents=True, exist_ok=True)
    local = scraping_dir / f"scraping_multi_{ts()}.csv"
    df_scraping.to_csv(local, index=False)

    try:
        minio_uri = minio_upload(local, f"scraping/multi/{local.name}")
        logger.info(f"   ☁️ Upload MinIO : {minio_uri}")
    except Exception as e:
        log_error("MinIO", e, "Upload fichier scraping")
        minio_uri = f"local://{local}"

    # Storage PostgreSQL
    with engine.begin() as conn:
        id_source = get_source_id(conn, "Web Scraping Multi-Sources")
        if not id_source:
            id_type = conn.execute(text("SELECT id_type_donnee FROM type_donnee WHERE libelle = 'Web Scraping'")).scalar()
            if id_type:
                conn.execute(text("""
                    INSERT INTO source (id_type_donnee, nom, url, fiabilite)
                    VALUES (:id_type, 'Web Scraping Multi-Sources', 'https://www.data.gouv.fr', 0.85)
                """), {"id_type": id_type})
                id_source = conn.execute(text("SELECT id_source FROM source WHERE nom = 'Web Scraping Multi-Sources'")).scalar()

        if id_source:
            id_flux = create_flux(conn, id_source, "html", minio_uri)

            docs = []
            for _, row in df_scraping.iterrows():
                docs.append({
                    "id_flux": id_flux,
                    "id_territoire": None,
                    "titre": row["titre"],
                    "texte": row["texte"],
                    "langue": row["langue"],
                    "date_publication": row["date_publication"],
                    "hash_fingerprint": row["hash_fingerprint"]
                })

            inserted = insert_documents(conn, docs)
            logger.info(f"✅ Source 4/5 terminée : {inserted} documents scraping insérés")
        else:
            logger.warning("   ⚠️ Source scraping non créée - insertion ignorée")
else:
    logger.warning("⚠️ Aucune donnée collectée depuis les sources web scraping")


## 🌍 Source 5/5 : GDELT GKG France (Big Data)

Téléchargement et analyse de données Big Data depuis GDELT Project (Global Database of Events, Language, and Tone) avec **focus France**.

**Source** : http://data.gdeltproject.org/gdeltv2/

**Format** : GKG 2.0 (Global Knowledge Graph) - Fichiers CSV.zip (~300 MB/15min)

**Contenu Big Data** :
- Événements mondiaux géolocalisés
- **Tonalité émotionnelle** (V2Tone : -100 négatif → +100 positif)
- **Thèmes extraits** (V2Themes : PROTEST, HEALTH, ECONOMY, TERROR...)
- **Entités nommées** (V2Persons, V2Organizations)
- **Géolocalisation** (V2Locations avec codes pays)

**Filtrage France** :
- Sélection événements avec localisation France (code pays FR)
- Extraction tonalité moyenne France
- Top thèmes français

**Stratégie Big Data** :
- Téléchargement fichier dernières 15min (~6-300 MB brut)
- Parsing colonnes V2* nommées (27 colonnes GKG)
- Filtrage géographique France → échantillon
- Storage MinIO (fichier brut complet)
- Insertion PostgreSQL (événements France)


In [None]:
logger.info("🌍 SOURCE 5/5 : GDELT GKG France (Big Data)")
logger.info("=" * 80)

import io
import zipfile

# Colonnes GKG 2.0 (version complète)
GKG_COLUMNS = [
    "GKGRECORDID", "V2.1DATE", "V2SourceCollectionIdentifier", "V2SourceCommonName",
    "V2DocumentIdentifier", "V1Counts", "V2.1Counts", "V1Themes", "V2Themes",
    "V1Locations", "V2Locations", "V1Persons", "V2Persons", "V1Organizations",
    "V2Organizations", "V1.5Tone", "V2.1Tone", "V2.1Dates", "V2.1Amounts",
    "V2.1TransInfo", "V2.1Extras", "V21SourceLanguage", "V21QuotationLanguage",
    "V21Url", "V21Date2", "V21Xml"
]

# Récupérer le fichier GKG le plus récent (dernières 15 minutes)
try:
    # URL du dernier update GDELT
    update_url = "http://data.gdeltproject.org/gdeltv2/lastupdate.txt"
    r = requests.get(update_url, timeout=15)

    if r.status_code == 200:
        lines = r.text.strip().split("\n")
        # Trouver ligne GKG (pas export ni mentions)
        gkg_line = [line for line in lines if ".gkg.csv.zip" in line and "translation" not in line]

        if gkg_line:
            # Format: size hash url
            parts = gkg_line[0].split()
            gkg_url = parts[2] if len(parts) >= 3 else parts[-1]
            file_size_mb = int(parts[0]) / 1024 / 1024 if parts[0].isdigit() else 0

            logger.info(f"📥 Téléchargement GDELT GKG ({file_size_mb:.1f} MB)")
            logger.info(f"   URL: {gkg_url}")

            # Télécharger
            gkg_r = requests.get(gkg_url, timeout=120)

            if gkg_r.status_code == 200:
                # Sauvegarder ZIP
                zip_filename = gkg_url.split("/")[-1]
                zip_path = RAW_DIR / "gdelt" / zip_filename
                zip_path.parent.mkdir(parents=True, exist_ok=True)

                with zip_path.open("wb") as f:
                    f.write(gkg_r.content)

                logger.info(f"   ✅ Téléchargé: {zip_path.name} ({len(gkg_r.content) / 1024 / 1024:.1f} MB)")

                # Upload MinIO (fichier brut complet)
                try:
                    minio_uri = minio_upload(zip_path, f"gdelt/{zip_path.name}")
                    logger.info(f"   ☁️ Upload MinIO : {minio_uri}")
                except Exception as e:
                    log_error("MinIO", e, "Upload fichier GDELT")
                    minio_uri = f"local://{zip_path}"

                # Extraction et parsing
                with zipfile.ZipFile(zip_path, "r") as z:
                    csv_filename = z.namelist()[0]
                    logger.info(f"\n📊 Parsing: {csv_filename}")

                    with z.open(csv_filename) as f:
                        # Lire avec pandas
                        try:
                            df_gkg = pd.read_csv(
                                io.BytesIO(f.read()),
                                sep="\t",
                                header=None,
                                names=GKG_COLUMNS,
                                on_bad_lines="skip",
                                low_memory=False,
                                nrows=5000  # Limiter pour démo (sinon trop long)
                            )

                            logger.info(f"   📈 Total lignes chargées: {len(df_gkg):,}")

                            # 🇫🇷 FILTRAGE FRANCE
                            logger.info("\n🇫🇷 Filtrage événements France...")
                            df_france = df_gkg[
                                df_gkg["V2Locations"].fillna("").str.contains("1#France#FR#", na=False) |
                                df_gkg["V2Locations"].fillna("").str.contains("#FR#", na=False)
                            ].copy()

                            logger.info(f"   ✅ Événements France: {len(df_france):,} ({len(df_france)/len(df_gkg)*100:.1f}%)")

                            if len(df_france) > 0:
                                # Extraction tonalité émotionnelle
                                def parse_tone(tone_str):
                                    if pd.isna(tone_str) or tone_str == "":
                                        return None
                                    try:
                                        parts = str(tone_str).split(",")
                                        return float(parts[0]) if parts else None
                                    except Exception:
                                        return None

                                df_france["tone_value"] = df_france["V2.1Tone"].apply(parse_tone)
                                avg_tone = df_france["tone_value"].mean()

                                logger.info(f"📊 Tonalité moyenne France: {avg_tone:.2f} (-100=très négatif, +100=très positif)")

                                # Insertion PostgreSQL (événements et documents)
                                with engine.begin() as conn:
                                    id_source = get_source_id(conn, "GDELT GKG")
                                    if not id_source:
                                        id_type = conn.execute(text("SELECT id_type_donnee FROM type_donnee WHERE libelle = 'Big Data'")).scalar()
                                        if id_type:
                                            conn.execute(text("""
                                                INSERT INTO source (id_type_donnee, nom, url, fiabilite)
                                                VALUES (:id_type, 'GDELT GKG', 'http://data.gdeltproject.org/gdeltv2/', 0.9)
                                            """), {"id_type": id_type})
                                            id_source = conn.execute(text("SELECT id_source FROM source WHERE nom = 'GDELT GKG'")).scalar()

                                    if id_source:
                                        id_flux = create_flux(conn, id_source, "csv", minio_uri)

                                        # Insertion événements et documents
                                        inserted_events = 0
                                        inserted_docs = 0

                                        for _, row in df_france.head(100).iterrows():  # Limiter à 100 pour démo
                                            try:
                                                # Créer thème si nécessaire
                                                themes_str = str(row["V2Themes"]) if pd.notna(row["V2Themes"]) else ""
                                                theme_libelle = themes_str.split(";")[0] if themes_str else "GENERAL"

                                                theme_id = conn.execute(text("""
                                                    SELECT id_theme FROM theme WHERE libelle = :libelle
                                                """), {"libelle": theme_libelle}).fetchone()

                                                if not theme_id:
                                                    conn.execute(text("""
                                                        INSERT INTO theme (libelle, description)
                                                        VALUES (:libelle, :desc)
                                                    """), {"libelle": theme_libelle, "desc": f"Thème GDELT: {theme_libelle}"})
                                                    theme_id = conn.execute(text("""
                                                        SELECT id_theme FROM theme WHERE libelle = :libelle
                                                    """), {"libelle": theme_libelle}).fetchone()

                                                theme_id_val = theme_id[0] if theme_id else None

                                                # Créer événement
                                                event_result = conn.execute(text("""
                                                    INSERT INTO evenement (id_theme, date_event, avg_tone, source_event)
                                                    VALUES (:theme, :date_event, :tone, :source)
                                                    RETURNING id_event
                                                """), {
                                                    "theme": theme_id_val,
                                                    "date_event": datetime.fromtimestamp(int(str(row["V2.1DATE"])[:8]), tz=UTC) if len(str(row["V2.1DATE"])) >= 8 else datetime.now(UTC),
                                                    "tone": avg_tone,
                                                    "source": "GDELT"
                                                })
                                                event_id = event_result.scalar()

                                                # Créer document associé
                                                doc_text = f"{row.get('V2SourceCommonName', '')} - {themes_str[:200]}"
                                                doc_hash = sha256(doc_text)

                                                doc_result = conn.execute(text("""
                                                    INSERT INTO document (id_flux, id_territoire, titre, texte, langue, date_publication, hash_fingerprint)
                                                    VALUES (:id_flux, NULL, :titre, :texte, 'en', :date_pub, :hash)
                                                    ON CONFLICT (hash_fingerprint) DO NOTHING
                                                    RETURNING id_doc
                                                """), {
                                                    "id_flux": id_flux,
                                                    "titre": row.get("V2SourceCommonName", "GDELT Event")[:200],
                                                    "texte": doc_text,
                                                    "date_pub": datetime.now(UTC),
                                                    "hash": doc_hash
                                                })
                                                doc_id = doc_result.scalar()

                                                if doc_id and event_id:
                                                    # Lier document à événement
                                                    conn.execute(text("""
                                                        INSERT INTO document_evenement (id_doc, id_event)
                                                        VALUES (:doc_id, :event_id)
                                                        ON CONFLICT DO NOTHING
                                                    """), {"doc_id": doc_id, "event_id": event_id})
                                                    inserted_events += 1
                                                    inserted_docs += 1

                                            except Exception as e:
                                                log_error("GDELT", e, "Insertion événement/document")

                                        logger.info(f"✅ Source 5/5 terminée : {inserted_events} événements France insérés ({inserted_docs} docs)")
                                    else:
                                        logger.warning("   ⚠️ Source GDELT non créée - insertion ignorée")
                            else:
                                logger.warning("   ⚠️ Aucun événement France trouvé dans ce fichier")

                        except Exception as e:
                            log_error("GDELT", e, "Parsing CSV")
                            logger.warning(f"   ❌ Erreur parsing CSV: {str(e)[:100]}")
                            logger.info("   i Fichier brut sauvegardé sur MinIO")

            else:
                logger.error(f"   ❌ Erreur téléchargement GKG: {gkg_r.status_code}")
        else:
            logger.warning("   ⚠️ Aucun fichier GKG trouvé dans lastupdate.txt")
    else:
        logger.error(f"   ❌ Erreur accès lastupdate.txt: {r.status_code}")

except Exception as e:
    log_error("GDELT", e, "Collecte Big Data")
    logger.warning(f"❌ Erreur GDELT: {str(e)[:200]}")
    logger.info("i GDELT peut être temporairement indisponible (service tiers)")


## 📋 Création du Manifest JSON

Génération d'un manifest JSON pour traçabilité complète de toutes les ingestions


## 📊 Baromètres DataSens - Sources Métier (E2/E3)

Les 5 sources de base (E1) sont complètes. Pour enrichir le dataset avec des données métier spécialisées, voici **10 types de baromètres** à implémenter dans les phases E2/E3 :

### 📋 Liste des Baromètres

1. **🔹 Baromètre de confiance politique & sociale**
   - **Source** : CEVIPOF – La confiance des Français dans la politique
   - **Thématique** : Société, gouvernance, démocratie, institutions
   - **Format** : CSV / PDF / API
   - **Mapping E1** : API / Fichier plat

2. **🔹 Baromètre des émotions et du moral des Français**
   - **Source** : Kantar Public / Ipsos Mood of France
   - **Thématique** : Joie, anxiété, colère, espoir (→ table EMOTION)
   - **Format** : CSV / scraping
   - **Mapping E1** : CSV / Web Scraping

3. **🔹 Baromètre environnemental**
   - **Source** : ADEME / IFOP pour la transition écologique
   - **Thématique** : Écologie, énergie, climat, sobriété
   - **Format** : Dataset plat + API
   - **Mapping E1** : API / CSV

4. **🔹 Baromètre économique et social**
   - **Source** : INSEE Conjoncture + BVA Observatoire social
   - **Thématique** : Pouvoir d'achat, chômage, inflation, emploi
   - **Format** : Base SQL / CSV
   - **Mapping E1** : Base de données / CSV

5. **🔹 Baromètre des médias et de la confiance**
   - **Source** : La Croix – Baromètre Kantar sur les médias
   - **Thématique** : Information, confiance médiatique, fake news
   - **Format** : Web scraping
   - **Mapping E1** : Web Scraping

6. **🔹 Baromètre sport & cohésion sociale**
   - **Source** : Ministère des Sports / CNOSF / Paris 2024
   - **Thématique** : Sport, bien-être, fierté nationale, cohésion
   - **Format** : CSV / API
   - **Mapping E1** : CSV / API

7. **🔹 Baromètre des discriminations et égalité**
   - **Source** : Défenseur des Droits / IFOP
   - **Thématique** : Inclusion, diversité, égalité femmes-hommes
   - **Format** : CSV / API
   - **Mapping E1** : CSV / API

8. **🔹 Baromètre santé mentale et bien-être**
   - **Source** : Santé Publique France – CoviPrev
   - **Thématique** : Stress, anxiété, santé mentale post-COVID
   - **Format** : CSV
   - **Mapping E1** : CSV

9. **🔹 Baromètre climat social et tensions**
   - **Source** : Elabe / BFMTV Opinion 2024
   - **Thématique** : Colère, frustration, confiance, peur
   - **Format** : Web Scraping
   - **Mapping E1** : Web Scraping

10. **🔹 Baromètre innovation et IA**
    - **Source** : CNIL / France IA / Capgemini Research Institute
    - **Thématique** : Adoption de l'IA, confiance numérique
    - **Format** : PDF / API
    - **Mapping E1** : API / PDF scraping

### 📚 Documentation Complète

Voir `docs/BAROMETRES_SOURCES.md` pour :
- Détails par baromètre (URLs, format, tables PostgreSQL)
- Plan d'implémentation E2/E3
- Notes techniques et RGPD

### 🎯 Plan d'Implémentation

**Phase E2 (Priorité)** :
1. Baromètre économique et social (INSEE)
2. Baromètre des émotions (Kantar/Ipsos)
3. Baromètre santé mentale (Santé Publique France)

**Phase E3 (Complément)** :
4-10. Autres baromètres selon priorités métier

**Architecture** : Tous les baromètres suivront le même pipeline que les sources E1 :
- Logging structuré
- Upload MinIO
- Insertion PostgreSQL avec helpers
- Déduplication SHA-256


In [None]:
# =====================================================
# SOURCES 2, 3, 4, 5 : À IMPLÉMENTER AVEC VRAIES SOURCES
# =====================================================
#
# Pour respecter l'architecture pipeline du notebook datasens_E1_v2.ipynb,
# les sources 2-5 doivent être implémentées avec :
# 1. Collecte réelle depuis API/BDD/Scraping/GDELT
# 2. Upload MinIO pour traçabilité DataLake
# 3. Insertion PostgreSQL avec fonctions helpers (create_flux, insert_documents)
# 4. Logging complet via logger.info/error
#
# Voir notebook datasens_E1_v2.ipynb pour implémentations complètes :
# - Source 2 : Kaggle DB (SQLite → Postgres via Pandas)
# - Source 3 : OpenWeatherMap API (voir Cell 20 du notebook existant)
# - Source 4 : Web Scraping MonAvisCitoyen (voir Cell 26 du notebook existant)
# - Source 5 : GDELT GKG Big Data (voir Cell 28 du notebook existant)

logger.info("\n📋 Pour sources 2-5 : Voir notebooks/datasens_E1_v2.ipynb")
logger.info("   → Exemples complets avec vraies API keys et collectes réelles")

# =====================================================
# MANIFEST JSON (Traçabilité finale)
# =====================================================
logger.info("📋 Création du manifest JSON")
logger.info("=" * 80)

# Compter les données collectées
with engine.connect() as conn:
    counts = {
        "documents": conn.execute(text("SELECT COUNT(*) FROM document")).scalar(),
        "flux": conn.execute(text("SELECT COUNT(*) FROM flux")).scalar(),
        "sources": conn.execute(text("SELECT COUNT(*) FROM source")).scalar(),
        "meteo": conn.execute(text("SELECT COUNT(*) FROM meteo")).scalar(),
        "evenements": conn.execute(text("SELECT COUNT(*) FROM evenement")).scalar(),
    }

manifest = {
    "run_id": ts(),
    "timestamp_utc": datetime.now(UTC).isoformat(),
    "notebook_version": "03_ingest_sources.ipynb",
    "sources_ingested": [
        "Kaggle CSV (fichier plat - 50% PG + 50% MinIO)",
        "Kaggle DB (base de données - à implémenter)",
        "OpenWeatherMap (API - à implémenter)",
        "MonAvisCitoyen (scraping - à implémenter)",
        "GDELT GKG (big data - à implémenter)"
    ],
    "counts": counts,
    "postgres_db": PG_DB,
    "minio_bucket": MINIO_BUCKET,
    "raw_data_location": str(RAW_DIR),
    "log_file": str(log_file)
}

# Sauvegarder manifest local + MinIO
manifest_path = MANIFESTS_DIR / f"manifest_{manifest['run_id']}.json"
manifest_path.parent.mkdir(parents=True, exist_ok=True)

with manifest_path.open("w", encoding="utf-8") as f:
    json.dump(manifest, f, indent=2, ensure_ascii=False)

try:
    manifest_minio_uri = minio_upload(manifest_path, f"manifests/{manifest_path.name}")
    logger.info(f"✅ Manifest créé : {manifest_path.name}")
    logger.info(f"☁️ Manifest MinIO : {manifest_minio_uri}")
except Exception as e:
    log_error("MinIO", e, "Upload manifest")
    manifest_minio_uri = f"local://{manifest_path}"

logger.info("\n📊 Résumé ingestion :")
for key, value in counts.items():
    logger.info(f"   • {key}: {value}")

logger.info("\n✅ Ingestion terminée ! (Source 1/5 complète, sources 2-5 à documenter)")
logger.info("   ➡️ Passez au notebook 04_crud_tests.ipynb")
