# 📥 DataSens E1 — Notebook 3 : Ingestion des 5 Sources

**🎯 Objectif** : Ingérer réellement les 5 types de sources avec traçabilité complète

---

## 📋 Plan d'ingestion

1. **Fichier plat CSV** : Kaggle (50% → Postgres, 50% → raw)
2. **Base de données** : Kaggle SQLite → Postgres
3. **API** : OpenWeatherMap → meteo + flux
4. **Web Scraping** : MonAvisCitoyen (dry-run) → document
5. **Big Data** : GDELT GKG → evenement + document_evenement

**Traçabilité** : Manifest JSON par run avec chemins, compteurs, horodatages

---

## 🔒 RGPD & Gouvernance

⚠️ **Rappel** : Pas de données personnelles directes (hash SHA-256), respect robots.txt



In [None]:
# Configuration et imports (architecture pipeline complète)
import os
import hashlib
import json
import time
import logging
import traceback
from datetime import datetime, timezone
from pathlib import Path

import pandas as pd
import requests
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
from minio import Minio
from tqdm import tqdm

# Configuration
NOTEBOOK_DIR = Path.cwd()
PROJECT_ROOT = NOTEBOOK_DIR.parent if NOTEBOOK_DIR.name == "notebooks" else NOTEBOOK_DIR
load_dotenv(PROJECT_ROOT / ".env")

PG_HOST = os.getenv("POSTGRES_HOST", "localhost")
PG_PORT = int(os.getenv("POSTGRES_PORT", "5432"))
PG_DB = os.getenv("POSTGRES_DB", "datasens")
PG_USER = os.getenv("POSTGRES_USER", "ds_user")
PG_PASS = os.getenv("POSTGRES_PASS", "ds_pass")

PG_URL = f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}"
engine = create_engine(PG_URL, future=True)

# Configuration MinIO (DataLake)
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://localhost:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "miniouser")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "miniosecret")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "datasens-raw")

RAW_DIR = PROJECT_ROOT / "data" / "raw"
MANIFESTS_DIR = RAW_DIR / "manifests"
LOGS_DIR = PROJECT_ROOT / "logs"

# Créer dossiers
RAW_DIR.mkdir(parents=True, exist_ok=True)
MANIFESTS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)

# =====================================================
# SYSTÈME DE LOGGING (comme datasens_E1_v2.ipynb)
# =====================================================
log_timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
log_file = LOGS_DIR / f"collecte_{log_timestamp}.log"
error_file = LOGS_DIR / f"errors_{log_timestamp}.log"

logger = logging.getLogger("DataSens")
logger.setLevel(logging.DEBUG)

file_formatter = logging.Formatter(
    "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
console_formatter = logging.Formatter(
    "[%(asctime)s] %(levelname)s - %(message)s",
    datefmt="%H:%M:%S"
)

file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(file_formatter)

error_handler = logging.FileHandler(error_file, encoding="utf-8")
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(file_formatter)

console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(console_formatter)

logger.addHandler(file_handler)
logger.addHandler(error_handler)
logger.addHandler(console_handler)

def log_error(source: str, error: Exception, context: str = ""):
    """Log une erreur avec traceback complet"""
    error_msg = f"[{source}] {context}: {error!s}"
    logger.error(error_msg)
    logger.error(f"Traceback:\n{traceback.format_exc()}")

logger.info("🚀 Système de logging initialisé")
logger.info(f"📁 Logs: {log_file}")
logger.info(f"❌ Erreurs: {error_file}")

# =====================================================
# MINIO CLIENT (DataLake)
# =====================================================
try:
    minio_client = Minio(
        MINIO_ENDPOINT.replace("http://", "").replace("https://", ""),
        access_key=MINIO_ACCESS_KEY,
        secret_key=MINIO_SECRET_KEY,
        secure=MINIO_ENDPOINT.startswith("https")
    )

    def ensure_bucket(bucket: str = MINIO_BUCKET):
        if not minio_client.bucket_exists(bucket):
            minio_client.make_bucket(bucket)

    def minio_upload(local_path: Path, dest_key: str) -> str:
        """Upload fichier vers MinIO DataLake"""
        ensure_bucket(MINIO_BUCKET)
        minio_client.fput_object(MINIO_BUCKET, dest_key, str(local_path))
        return f"s3://{MINIO_BUCKET}/{dest_key}"

    ensure_bucket()
    logger.info(f"✅ MinIO OK → bucket: {MINIO_BUCKET}")
except Exception as e:
    logger.warning(f"⚠️ MinIO non disponible: {e} - Mode local uniquement")
    minio_client = None
    def minio_upload(local_path: Path, dest_key: str) -> str:
        return f"local://{local_path}"

# =====================================================
# FONCTIONS UTILITAIRES
# =====================================================
def ts() -> str:
    """Timestamp UTC ISO compact"""
    return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")

def sha256(s: str) -> str:
    """Hash SHA-256 pour déduplication"""
    return hashlib.sha256(s.encode("utf-8")).hexdigest()

def get_source_id(conn, nom: str) -> int:
    """Récupère l'id_source depuis le nom"""
    logger.info(f"[get_source_id] Recherche source: {nom}")
    result = conn.execute(text("SELECT id_source FROM source WHERE nom = :nom"), {"nom": nom}).fetchone()
    if result:
        logger.info(f"   → id_source trouvé: {result[0]}")
        return result[0]
    logger.warning(f"   → Source non trouvée: {nom}")
    return None

def create_flux(conn, id_source: int, format_type: str = "csv", manifest_uri: str = None) -> int:
    """Crée un flux et retourne id_flux"""
    logger.info(f"[create_flux] Création flux pour id_source={id_source}, format={format_type}")
    result = conn.execute(text("""
        INSERT INTO flux (id_source, format, manifest_uri)
        VALUES (:id_source, :format, :manifest_uri)
        RETURNING id_flux
    """), {"id_source": id_source, "format": format_type, "manifest_uri": manifest_uri})
    id_flux = result.scalar()
    logger.info(f"   → id_flux créé: {id_flux}")
    return id_flux

def ensure_territoire(conn, ville: str, code_insee: str = None, lat: float = None, lon: float = None) -> int:
    """Crée ou récupère un territoire"""
    logger.info(f"[ensure_territoire] Vérification territoire: ville={ville}")
    result = conn.execute(text("SELECT id_territoire FROM territoire WHERE ville = :ville"), {"ville": ville}).fetchone()
    if result:
        logger.info(f"   → id_territoire existant: {result[0]}")
        return result[0]
    result = conn.execute(text("""
        INSERT INTO territoire (ville, code_insee, lat, lon)
        VALUES (:ville, :code_insee, :lat, :lon)
        RETURNING id_territoire
    """), {"ville": ville, "code_insee": code_insee, "lat": lat, "lon": lon})
    id_territoire = result.scalar()
    logger.info(f"   → id_territoire créé: {id_territoire}")
    return id_territoire

def insert_documents(conn, docs: list) -> int:
    """Insertion batch de documents avec gestion doublons"""
    logger.info(f"[insert_documents] Insertion de {len(docs)} documents...")
    inserted = 0
    for doc in docs:
        try:
            result = conn.execute(text("""
                INSERT INTO document (id_flux, id_territoire, titre, texte, langue, date_publication, hash_fingerprint)
                VALUES (:id_flux, :id_territoire, :titre, :texte, :langue, :date_publication, :hash_fingerprint)
                ON CONFLICT (hash_fingerprint) DO NOTHING
                RETURNING id_doc
            """), doc)
            id_doc = result.scalar()
            if id_doc:
                logger.info(f"   → Document inséré: id_doc={id_doc}, titre={doc.get('titre', '')[:40]}")
                inserted += 1
        except Exception as e:
            log_error("insert_documents", e, f"Erreur insertion document")
    logger.info(f"   → Total insérés: {inserted}/{len(docs)}")
    return inserted

print("✅ Configuration pipeline chargée")
print(f"   📍 PostgreSQL : {PG_HOST}:{PG_PORT}/{PG_DB}")
print(f"   ☁️ MinIO : {MINIO_BUCKET if minio_client else 'Mode local'}")
print(f"   📂 Raw data : {RAW_DIR}")
print(f"   📄 Logs : {LOGS_DIR}")
print("\n✅ Pipeline DataLake + PostgreSQL prêt !")


## 📄 Source 1/5 : Fichier plat CSV (Kaggle)

**Architecture hybride (comme datasens_E1_v2.ipynb)** :
- **50% → PostgreSQL** : Données structurées pour requêtes SQL
- **50% → MinIO DataLake** : Données brutes pour analyses Big Data futures

**Process** :
1. Chargement CSV depuis `data/raw/kaggle/`
2. Calcul SHA256 fingerprint pour déduplication
3. Split aléatoire 50/50
4. Upload 50% vers MinIO (DataLake)
5. Insertion 50% dans PostgreSQL avec traçabilité (id_flux)


In [None]:
logger.info("📄 SOURCE 1/5 : Fichier plat CSV (Kaggle)")
logger.info("=" * 80)

# Rechercher fichier Kaggle existant ou créer échantillon
kaggle_csv_paths = [
    RAW_DIR / "kaggle" / "kaggle_sample.csv",
    PROJECT_ROOT / "data" / "raw" / "kaggle" / "*.csv",
    Path.cwd() / "data" / "raw" / "kaggle" / "*.csv"
]

kaggle_csv_path = None
for path in kaggle_csv_paths:
    if path.exists():
        kaggle_csv_path = path
        break

if not kaggle_csv_path or not kaggle_csv_path.exists():
    logger.warning("⚠️ Fichier Kaggle non trouvé — Création échantillon pour démo")
    sample_data = pd.DataFrame({
        "text": [
            "Great product, very satisfied!",
            "Service terrible, avoid at all costs",
            "Excellent quality, recommend",
            "Bon produit, je recommande",
            "Mauvais service, déçu"
        ],
        "langue": ["en", "en", "en", "fr", "fr"],
        "date": [datetime.now(timezone.utc)] * 5
    })
    kaggle_csv_path = RAW_DIR / "kaggle" / "kaggle_sample.csv"
    kaggle_csv_path.parent.mkdir(parents=True, exist_ok=True)
    sample_data.to_csv(kaggle_csv_path, index=False)
    logger.info(f"   ✅ Échantillon créé : {kaggle_csv_path.name}")

# Charger le CSV
df_kaggle = pd.read_csv(kaggle_csv_path)
logger.info(f"📊 {len(df_kaggle)} lignes chargées")

# Split 50/50 (architecture hybride : PostgreSQL + MinIO)
df_kaggle["hash_fingerprint"] = df_kaggle["text"].apply(lambda x: sha256(str(x)))
mid_point = len(df_kaggle) // 2
df_pg = df_kaggle.iloc[:mid_point].copy()  # 50% → PostgreSQL
df_raw = df_kaggle.iloc[mid_point:].copy()  # 50% → MinIO DataLake

logger.info(f"   • 50% PostgreSQL : {len(df_pg)} lignes")
logger.info(f"   • 50% MinIO DataLake : {len(df_raw)} lignes")

# Sauvegarder 50% en raw local + upload MinIO
raw_output = RAW_DIR / "kaggle" / f"kaggle_raw_{ts()}.csv"
df_raw.to_csv(raw_output, index=False)
logger.info(f"   ✅ Sauvegardé local : {raw_output.name}")

# Upload MinIO (50% bruts vers DataLake)
try:
    minio_uri = minio_upload(raw_output, f"kaggle/{raw_output.name}")
    logger.info(f"   ☁️ Upload MinIO : {minio_uri}")
except Exception as e:
    log_error("MinIO", e, "Upload fichier Kaggle")
    minio_uri = f"local://{raw_output}"

# Insérer 50% dans PostgreSQL
with engine.begin() as conn:
    id_source = get_source_id(conn, "Kaggle CSV")
    if not id_source:
        id_type = conn.execute(text("SELECT id_type_donnee FROM type_donnee WHERE libelle = 'Fichier plat'")).scalar()
        conn.execute(text("""
            INSERT INTO source (id_type_donnee, nom, url, fiabilite)
            VALUES (:id_type, 'Kaggle CSV', 'https://www.kaggle.com', 0.8)
        """), {"id_type": id_type})
        id_source = conn.execute(text("SELECT id_source FROM source WHERE nom = 'Kaggle CSV'")).scalar()

    id_flux = create_flux(conn, id_source, "csv", minio_uri)

    # Préparer documents pour insertion batch
    docs = []
    for _, row in df_pg.iterrows():
        docs.append({
            "id_flux": id_flux,
            "id_territoire": None,
            "titre": "",
            "texte": str(row["text"]),
            "langue": row.get("langue", "en"),
            "date_publication": row.get("date", datetime.now(timezone.utc)),
            "hash_fingerprint": row["hash_fingerprint"]
        })

    inserted = insert_documents(conn, docs)

logger.info(f"\n✅ Source 1/5 terminée : {inserted} docs PostgreSQL + {len(df_raw)} docs MinIO")


## 🔧 Architecture Pipeline (Référence datasens_E1_v2.ipynb)

**Ce notebook suit l'architecture du pipeline existant** :

✅ **Logging structuré** : `logs/collecte_*.log` + `logs/errors_*.log`  
✅ **MinIO DataLake** : Upload automatique fichiers bruts → `s3://datasens-raw/`  
✅ **PostgreSQL** : Insertion structurée avec traçabilité (flux, manifests)  
✅ **Fonctions helpers** : `create_flux()`, `insert_documents()`, `ensure_territoire()`, `minio_upload()`  
✅ **Déduplication** : Hash SHA-256 pour éviter doublons  
✅ **RGPD** : Pas de données personnelles directes  

**Sources 2-5** : Implémenter avec vraies API keys (voir `datasens_E1_v2.ipynb` pour exemples complets)


## 📋 Création du Manifest JSON

Génération d'un manifest JSON pour traçabilité complète de toutes les ingestions


In [None]:
# =====================================================
# SOURCES 2, 3, 4, 5 : À IMPLÉMENTER AVEC VRAIES SOURCES
# =====================================================
#
# Pour respecter l'architecture pipeline du notebook datasens_E1_v2.ipynb,
# les sources 2-5 doivent être implémentées avec :
# 1. Collecte réelle depuis API/BDD/Scraping/GDELT
# 2. Upload MinIO pour traçabilité DataLake
# 3. Insertion PostgreSQL avec fonctions helpers (create_flux, insert_documents)
# 4. Logging complet via logger.info/error
#
# Voir notebook datasens_E1_v2.ipynb pour implémentations complètes :
# - Source 2 : Kaggle DB (SQLite → Postgres via Pandas)
# - Source 3 : OpenWeatherMap API (voir Cell 20 du notebook existant)
# - Source 4 : Web Scraping MonAvisCitoyen (voir Cell 26 du notebook existant)
# - Source 5 : GDELT GKG Big Data (voir Cell 28 du notebook existant)

logger.info("\n📋 Pour sources 2-5 : Voir notebooks/datasens_E1_v2.ipynb")
logger.info("   → Exemples complets avec vraies API keys et collectes réelles")

# =====================================================
# MANIFEST JSON (Traçabilité finale)
# =====================================================
logger.info("📋 Création du manifest JSON")
logger.info("=" * 80)

# Compter les données collectées
with engine.connect() as conn:
    counts = {
        "documents": conn.execute(text("SELECT COUNT(*) FROM document")).scalar(),
        "flux": conn.execute(text("SELECT COUNT(*) FROM flux")).scalar(),
        "sources": conn.execute(text("SELECT COUNT(*) FROM source")).scalar(),
        "meteo": conn.execute(text("SELECT COUNT(*) FROM meteo")).scalar(),
        "evenements": conn.execute(text("SELECT COUNT(*) FROM evenement")).scalar(),
    }

manifest = {
    "run_id": ts(),
    "timestamp_utc": datetime.now(timezone.utc).isoformat(),
    "notebook_version": "03_ingest_sources.ipynb",
    "sources_ingested": [
        "Kaggle CSV (fichier plat - 50% PG + 50% MinIO)",
        "Kaggle DB (base de données - à implémenter)",
        "OpenWeatherMap (API - à implémenter)",
        "MonAvisCitoyen (scraping - à implémenter)",
        "GDELT GKG (big data - à implémenter)"
    ],
    "counts": counts,
    "postgres_db": PG_DB,
    "minio_bucket": MINIO_BUCKET,
    "raw_data_location": str(RAW_DIR),
    "log_file": str(log_file)
}

# Sauvegarder manifest local + MinIO
manifest_path = MANIFESTS_DIR / f"manifest_{manifest['run_id']}.json"
manifest_path.parent.mkdir(parents=True, exist_ok=True)

with manifest_path.open("w", encoding="utf-8") as f:
    json.dump(manifest, f, indent=2, ensure_ascii=False)

try:
    manifest_minio_uri = minio_upload(manifest_path, f"manifests/{manifest_path.name}")
    logger.info(f"✅ Manifest créé : {manifest_path.name}")
    logger.info(f"☁️ Manifest MinIO : {manifest_minio_uri}")
except Exception as e:
    log_error("MinIO", e, "Upload manifest")
    manifest_minio_uri = f"local://{manifest_path}"

logger.info(f"\n📊 Résumé ingestion :")
for key, value in counts.items():
    logger.info(f"   • {key}: {value}")

logger.info(f"\n✅ Ingestion terminée ! (Source 1/5 complète, sources 2-5 à documenter)")
logger.info("   ➡️ Passez au notebook 04_crud_tests.ipynb")
