### TP2 — Pipeline d'acquisition et transformation de données

#### Partie 1 : Choix de l'API et exploration 

##### 1.1 APIs choisi

| **OpenFoodFacts** | Alimentation | [Lien](https://openfoodfacts.github.io/openfoodfacts-server/api/) | ⭐⭐ |

##### 1.3 Explorer l'API avec l'IA

Créez `exploration_api.py` :

In [3]:
import httpx
from litellm import completion
from dotenv import load_dotenv

load_dotenv()

def ask_api_assistant(question: str, api_doc: str = "") -> str:
    """Assistant spécialisé dans les APIs."""
    response = completion(
        model="gemini/gemini-2.5-flash-lite",
        messages=[
            {
                "role": "system", 
                "content": """Tu es un expert en APIs REST et en data engineering.
                Tu aides à comprendre et utiliser des APIs Open Data.
                Génère du code Python avec httpx quand on te le demande."""
            },
            {"role": "user", "content": f"{api_doc}\n\nQuestion: {question}"}
        ]
    )
    return response.choices[0].message.content


In [4]:
API_DOC = """
API OpenFoodFacts :
- Base URL: https://world.openfoodfacts.org/api/v2
- Endpoint produits: /product/{barcode}.json
- Endpoint recherche: /search.json?categories_tags={category}&page_size={n}
- Pas d'authentification requise
- Rate limit: soyez raisonnables (1 req/sec)
"""


In [5]:
# Demander à l'IA comment utiliser l'API
print(ask_api_assistant(
    "Comment récupérer les 100 premiers produits de la catégorie 'chocolats' ?",
    API_DOC
))

Absolument ! Voici comment récupérer les 100 premiers produits de la catégorie 'chocolats' en utilisant l'API OpenFoodFacts avec Python et la librairie `httpx`.

```python
import httpx
import time

# Configuration de l'API
BASE_URL = "https://world.openfoodfacts.org/api/v2"
SEARCH_ENDPOINT = "/search.json"

# Paramètres de la recherche
CATEGORY = "chocolats"
PAGE_SIZE = 100
REQUEST_DELAY = 1  # Délai entre les requêtes en secondes pour respecter le rate limit

# Construit l'URL complète pour la recherche
url = f"{BASE_URL}{SEARCH_ENDPOINT}"
params = {
    "categories_tags": CATEGORY,
    "page_size": PAGE_SIZE
}

print(f"Récupération des {PAGE_SIZE} premiers produits de la catégorie '{CATEGORY}'...")

try:
    # Effectue la requête GET
    response = httpx.get(url, params=params)
    response.raise_for_status()  # Lève une exception pour les codes d'erreur HTTP (4xx, 5xx)

    # Parse la réponse JSON
    data = response.json()

    # Vérifie si des produits ont été trouvés
    if data 

In [6]:
import httpx
import time

# Configuration de l'API OpenFoodFacts
BASE_URL = "https://world.openfoodfacts.org/api/v2"
SEARCH_ENDPOINT = "/search.json"

# Paramètres de la requête
category = "chocolats"
page_size = 100
# On pourrait aussi spécifier une page si on voulait une autre tranche de résultats
# page = 1

# Construction de l'URL de recherche
# url = f"{BASE_URL}{SEARCH_ENDPOINT}?categories_tags={category}&page_size={page_size}&page={page}"
url = f"{BASE_URL}{SEARCH_ENDPOINT}?categories_tags={category}&page_size={page_size}"

print(f"Requête vers : {url}")

try:
    # Utilisation de httpx avec un timeout plus long
    with httpx.Client(timeout=60.0) as client:
        response = client.get(url)

    # Vérifier si la requête a réussi
    response.raise_for_status()

    # Charger la réponse JSON
    data = response.json()

    # Extraire la liste des produits
    products = data.get("products", [])

    if products:
        print(f"\n--- Les {len(products)} premiers produits de la catégorie '{category}' ---")
        for i, product in enumerate(products):
            product_name = product.get("product_name", "Nom non disponible")
            barcode = product.get("code", "Code-barres non disponible")
            print(f"{i+1}. Nom: {product_name}, Code-barres: {barcode}")
    else:
        print(f"Aucun produit trouvé pour la catégorie '{category}'.")

except httpx.HTTPStatusError as e:
    print(f"Erreur HTTP : {e.response.status_code}")
except httpx.RequestError as e:
    print(f"Erreur réseau : {e}")
except Exception as e:
    print(f"Erreur inattendue : {e}")

# Respect du rate limit (1 requête par seconde)
# Dans cet exemple, nous n'avons qu'une seule requête, donc ce n'est pas strictement
# nécessaire. Mais si vous faisiez plusieurs requêtes successives dans une boucle,
# il faudrait ajouter ceci entre chaque requête.
# time.sleep(1)


Requête vers : https://world.openfoodfacts.org/api/v2/search.json?categories_tags=chocolats&page_size=100

--- Les 6 premiers produits de la catégorie 'chocolats' ---
1. Nom: Dolca chocolate negro, Code-barres: 7613036723671
2. Nom: Haselnuss Schokolade, Code-barres: 4099200057323
3. Nom: faire Schokolade 85% Kakao, Code-barres: 4006040412892
4. Nom: , Code-barres: 7627535092121
5. Nom: Nocciolata Dark Chocolate With Whole Hazelnuts, Dark, Code-barres: 8002996303129
6. Nom: Pistole chocolat noir, Code-barres: 3760377561241


**Explication du code :**

1.  **Importation des librairies :**
    *   `httpx` : Une librairie moderne pour effectuer des requêtes HTTP asynchrones et synchrones. Elle est recommandée pour sa simplicité et ses performances.
    *   `time` : Utilisé ici pour illustrer le respect du rate limit (même si ce n'est pas strictement nécessaire pour une seule requête).

2.  **Configuration :**
    *   `BASE_URL` : L'adresse de base de l'API OpenFoodFacts.
    *   `SEARCH_ENDPOINT` : Le chemin spécifique pour effectuer des recherches.

3.  **Paramètres de la requête :**
    *   `category` : La catégorie que nous recherchons ('chocolats').
    *   `page_size` : Le nombre de résultats que nous voulons par page (ici, 100).

4.  **Construction de l'URL :**
    *   L'URL complète est créée en combinant la base URL, le endpoint de recherche et les paramètres (`categories_tags`, `page_size`).

5.  **Exécution de la requête :**
    *   `httpx.get(url)` : Envoie une requête HTTP GET à l'URL construite.
    *   `response.raise_for_status()` : Vérifie si le code de statut de la réponse indique un succès (par exemple, 200 OK). Si ce n'est pas le cas (par exemple, 404 Not Found, 500 Internal Server Error), une exception `httpx.HTTPStatusError` sera levée.
    *   `response.json()` : Parse la réponse JSON du serveur en un dictionnaire Python.

6.  **Traitement des résultats :**
    *   `data.get("products", [])` : Extrait la liste des produits du dictionnaire `data`. Si la clé `"products"` n'existe pas, elle retourne une liste vide pour éviter une erreur.
    *   La boucle `for` itère sur chaque produit trouvé et affiche son nom (`product_name`) et son code-barres (`code`). Des messages par défaut sont utilisés si ces informations sont manquantes.

7.  **Gestion des erreurs :**
    *   Les blocs `try...except` permettent de capturer différentes erreurs qui pourraient survenir :
        *   `httpx.HTTPStatusError` : Pour les erreurs liées au statut de la réponse HTTP.
        *   `httpx.RequestError` : Pour les erreurs de connexion réseau ou d'autres problèmes liés à la requête elle-même.
        *   `Exception` : Pour toute autre erreur inattendue.

8.  **Respect du rate limit :**
    *   Bien que non exécuté dans cet exemple, `time.sleep(1)` serait utilisé après chaque requête si vous effectuez plusieurs appels à l'API de manière consécutive dans une boucle, pour respecter la limite de 1 requête par seconde.

Ce script donne les informations sur les 100 premiers produits trouvés dans la catégorie 'chocolats' par l'API OpenFoodFacts.

##### 1.4 Premier appel API

In [7]:
"""Test de l'API."""
import httpx

# Exemple OpenFoodFacts - adapter selon votre API
BASE_URL = "https://world.openfoodfacts.org/api/v2"

def test_api():
    """Test un appel simple à l'API."""
    response = httpx.get(
        f"{BASE_URL}/search",
        params={
            "categories_tags": "chocolats",
            "page_size": 5,
            "fields": "code,product_name,brands,nutriscore_grade"
        },
        timeout=30
    )
    response.raise_for_status()
    data = response.json()
    
    print(f"Nombre de produits : {data.get('count', 'N/A')}")
    for product in data.get("products", []):
        print(f"- {product.get('product_name', 'N/A')} ({product.get('brands', 'N/A')})")
    
    return data

if __name__ == "__main__":
    test_api()

Nombre de produits : 6
- Dolca chocolate negro (Dolca,Nestlé)
- Haselnuss Schokolade (Choceur)
- faire Schokolade 85% Kakao (Rapunzel)
-  (Coop,Karma)
- Nocciolata Dark Chocolate With Whole Hazelnuts, Dark (Venchi)


6 produits ont été récupérés. Parmi ceux-ci, on trouve des chocolats de marques variées, allant des grandes marques internationales comme Nestlé et Venchi, à des marques bio ou locales comme Rapunzel et Choceur. Certains produits contiennent des informations complètes sur le nom et la marque, tandis qu’un produit semble avoir une marque renseignée mais pas de nom ((Coop,Karma)). Ce jeu de données initial met en évidence la nécessité d’un nettoyage pour gérer les valeurs manquantes et standardiser les noms et marques avant toute analyse ou transformation ultérieure.

### Partie 2 : Construction du pipeline d'acquisition 

##### 2.4 Exercice : Adapter à votre API

In [8]:
# Dans votre notebook ou script
prompt = f"""
J'ai cette API : OpenFoodFacts** | Alimentation | 
Documentation : https://openfoodfacts.github.io/openfoodfacts-server/api/

Adapte le code de fetcher.py pour récupérer tous les produits contenant les champs code, product_name, brands, categories, nutriscore_grade, energy_100g, fat_100g, sugars_100g, salt_100g, proteins_100g

Garde la même structure avec retry et pagination.
"""

print(ask_api_assistant(prompt))

Absolument ! Voici le code Python adapté du `fetcher.py` pour interroger l'API OpenFoodFacts et récupérer les champs spécifiés, tout en conservant la logique de retry et de pagination.

```python
import time
import httpx
from typing import Dict, Any, List, Optional

# Configuration de l'API OpenFoodFacts
API_URL = "https://fr.openfoodfacts.org/cgi/search.pl"
# Champs que nous souhaitons récupérer pour chaque produit
FIELDS = [
    "code",
    "product_name",
    "brands",
    "categories",
    "nutriscore_grade",
    "energy_100g",
    "fat_100g",
    "sugars_100g",
    "salt_100g",
    "proteins_100g",
]

# Paramètres de recherche initiaux (vous pouvez les modifier)
# Ici, on cherche des produits dont le nom contient "pomme" en France
SEARCH_PARAMS = {
    "search_terms": "pomme",
    "search_simple": 1,
    "action": "process",
    "country": "France",
    "page_size": 24,  # Nombre de résultats par page
    "fields": ",".join(FIELDS),
    "json": 1,
}

# Paramètres de retry
MAX_RETR

In [9]:
import httpx
import asyncio
import logging
import nest_asyncio

# Permet de réutiliser la boucle asyncio existante dans Jupyter
nest_asyncio.apply()

# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# URL de base de l'API OpenFoodFacts
BASE_URL = "https://world.openfoodfacts.org/cgi/search.pl"

# Paramètres de recherche
SEARCH_PARAMS = {
    "search_terms": "chocolat",
    "search_simple": 1,
    "action": "process",
    "json": 1,
    "page_size": 100,
}

# Champs spécifiques à récupérer
FIELDS_TO_FETCH = "code,product_name,brands,categories,nutriscore_grade,energy_100g,fat_100g,sugars_100g,salt_100g,proteins_100g"

# Configuration du retry
MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 5


MAX_PAGES = 200  # Limite à 200 pages

async def fetch_products_paginated():
    all_products = []
    page = 1
    logging.info(f"Début de la récupération des produits. Terme de recherche: '{SEARCH_PARAMS.get('search_terms')}'")

    while page <= MAX_PAGES:  # <-- limite à 200 pages
        current_params = SEARCH_PARAMS.copy()
        current_params["page"] = page
        current_params["fields"] = FIELDS_TO_FETCH

        retries = 0
        while retries < MAX_RETRIES:
            try:
                logging.info(f"Requête pour la page {page}...")
                async with httpx.AsyncClient(timeout=60.0) as client:
                    response = await client.get(BASE_URL, params=current_params)
                response.raise_for_status()
                data = response.json()

                if not data.get("products"):
                    logging.info(f"Plus de produits trouvés ou fin de la pagination après la page {page-1}.")
                    return all_products

                logging.info(f"Page {page} récupérée avec succès. Nombre de produits: {len(data['products'])}")
                all_products.extend(data["products"])
                page += 1
                break  # Sortir de la boucle retry

            except (httpx.RequestError, httpx.HTTPStatusError) as exc:
                retries += 1
                logging.error(f"Erreur pour la page {page}: {exc}")
                if retries >= MAX_RETRIES:
                    logging.error(f"Nombre maximal de tentatives atteint pour la page {page}. Abandon.")
                    return all_products
                logging.info(f"Nouvelle tentative dans {RETRY_DELAY_SECONDS} secondes...")
                await asyncio.sleep(RETRY_DELAY_SECONDS)

            except Exception as exc:
                retries += 1
                logging.error(f"Erreur inattendue pour la page {page}: {exc}")
                if retries >= MAX_RETRIES:
                    logging.error(f"Nombre maximal de tentatives atteint pour la page {page}. Abandon.")
                    return all_products
                await asyncio.sleep(RETRY_DELAY_SECONDS)

    return all_products



# Exemple d'utilisation directement dans Jupyter
products = await fetch_products_paginated()

if products:
    print(f"Voici les {min(5, len(products))} premiers produits récupérés :")
    for i, product in enumerate(products[:5]):
        print(f"\n--- Produit {i+1} ---")
        for field in FIELDS_TO_FETCH.split(','):
            print(f"{field}: {product.get(field, 'N/A')}")
else:
    print("Aucun produit n'a été récupéré.")


2025-12-16 20:31:31,108 - INFO - Début de la récupération des produits. Terme de recherche: 'chocolat'
2025-12-16 20:31:31,109 - INFO - Requête pour la page 1...
2025-12-16 20:31:36,001 - INFO - Page 1 récupérée avec succès. Nombre de produits: 100
2025-12-16 20:31:36,003 - INFO - Requête pour la page 2...
2025-12-16 20:31:37,087 - INFO - Page 2 récupérée avec succès. Nombre de produits: 100
2025-12-16 20:31:37,087 - INFO - Requête pour la page 3...
2025-12-16 20:31:38,167 - INFO - Page 3 récupérée avec succès. Nombre de produits: 100
2025-12-16 20:31:38,168 - INFO - Requête pour la page 4...
2025-12-16 20:31:39,803 - INFO - Page 4 récupérée avec succès. Nombre de produits: 100
2025-12-16 20:31:39,804 - INFO - Requête pour la page 5...
2025-12-16 20:31:41,068 - INFO - Page 5 récupérée avec succès. Nombre de produits: 100
2025-12-16 20:31:41,069 - INFO - Requête pour la page 6...
2025-12-16 20:31:42,488 - INFO - Page 6 récupérée avec succès. Nombre de produits: 100
2025-12-16 20:31:42,4

Voici les 5 premiers produits récupérés :

--- Produit 1 ---
code: 7622210449283
product_name: Prince Goût Chocolat
brands: LU
categories: Snacks,Breakfasts,Sweet snacks,Biscuits and cakes,Biscuits and crackers,Sandwich biscuits
nutriscore_grade: e
energy_100g: 1960
fat_100g: 17.5
sugars_100g: 31.5
salt_100g: 0.5
proteins_100g: 6.5

--- Produit 2 ---
code: 3046920029759
product_name: Excellence Noir Prodigieux 90% Cacao
brands: Lindt
categories: Snacks,Snacks sucrés,Cacao et dérivés,Chocolats,Chocolats noirs,Chocolats noirs en tablette,Chocolats noirs extra fin
nutriscore_grade: e
energy_100g: 2477
fat_100g: 55
sugars_100g: 7
salt_100g: 0.03
proteins_100g: 10

--- Produit 3 ---
code: 3017620425035
product_name: Nutella
brands: Ferrero
categories: Petit-déjeuners,Produits à tartiner,Produits à tartiner sucrés,Pâtes à tartiner,Pâtes à tartiner au chocolat,Pâtes à tartiner aux noisettes,Pâtes à tartiner aux noisettes et au cacao
nutriscore_grade: e
energy_100g: 2252
fat_100g: 30.9
sugars_

La récupération des produits a été effectuée avec succès pour le terme de recherche "chocolat", couvrant un total de 200 pages, chaque page contenant 100 produits, soit un total de 20 000 produits extraits. Chaque produit inclut des informations détaillées telles que le code-barres, le nom du produit, la marque, les catégories, le Nutri-Score, ainsi que les valeurs nutritionnelles pour 100 g (énergie, graisses, sucres, sel, protéines). Les premiers produits récupérés illustrent la diversité des articles, allant des biscuits chocolatés aux tablettes de chocolat noir en passant par les pâtes à tartiner, avec des profils nutritionnels variés. Ce dataset permet ainsi une analyse fine des produits chocolatés disponibles sur le marché.

#### 3.2 Exercice : Personnaliser le nettoyage

1. Exécutez `generate_cleaning_code()` avec vos données réelles


In [10]:
from pipeline.transformer import raw_to_dataframe, generate_cleaning_code

df_raw = raw_to_dataframe(products)


DataFrame créé : (20000, 10)


In [11]:
cleaning_code = generate_cleaning_code(df_raw)
print(cleaning_code)


[92m20:39:15 - LiteLLM:INFO[0m: utils.py:3443 - 
LiteLLM completion() model= mistral; provider = ollama
2025-12-16 20:39:15,034 - INFO - 
LiteLLM completion() model= mistral; provider = ollama
[92m20:43:00 - LiteLLM:INFO[0m: utils.py:1311 - Wrapper: Completed Call, calling success_handler
2025-12-16 20:43:00,427 - INFO - Wrapper: Completed Call, calling success_handler


ici est un exemple de code Python Pandas qui effectue la gestion des valeurs manquantes, types et normalisation pour votre DataFrame :

```python
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Créez un DataFrame à partir de vos données
df = pd.DataFrame(data)

# Génération des noms de colonnes pour les types standardisés
standardized_columns = ['brands', 'categories', 'code', 'energy_100g', 'fat_100g', 'nutriscore_grade', 'product_name', 'proteins_100g', 'salt_100g', 'sugars_100g']

# Génération des noms de colonnes pour les types catégories
categorical_columns = ['brands', 'categories', 'nutriscore_grade']

# Normalisation des données numériques (StandardScaler)
scaler = StandardScaler()
df[standardized_columns] = scaler.fit_transform(df[standardized_columns])

# Génération de la liste des valeurs manquantes par colonne
missing_values = df.isnull().sum()
print("Nombre de valeurs manquantes par colonne :\n", missing_values)

# Remplacement des valeurs manquante

2. Analysez le code proposé par l'IA


Le code proposé par l’IA couvre les principales étapes du nettoyage de données (gestion des valeurs manquantes, normalisation, encodage). Cependant, il présente plusieurs erreurs critiques, notamment l’application de StandardScaler à des colonnes non numériques et une mauvaise utilisation de OneHotEncoder. De plus, le code mélange nettoyage générique et préparation pour le machine learning, ce qui nuit à la modularité du pipeline. Une adaptation est donc nécessaire pour garantir un code robuste, réutilisable et exécutable.

3. Intégrez les transformations pertinentes dans `clean_dataframe()`


Clean_dataframe() modifié avec les transformations proposées par l'IA

4. Testez et ajustez

In [24]:
import importlib
from pipeline.transformer import clean_dataframe, raw_to_dataframe
from pipeline.storage import save_clean_parquet

df_clean = clean_dataframe(df_raw)
print(df_clean.head())

# Sauvegarde
save_clean_parquet(df_clean, "chocolats_clean")


Doublons supprimés : 0
DataFrame nettoyé : (20000, 10)
            brands                                         categories  \
0               lu  snacks,breakfasts,sweet snacks,biscuits and ca...   
1            lindt  snacks,snacks sucrés,cacao et dérivés,chocolat...   
2          ferrero  petit-déjeuners,produits à tartiner,produits à...   
3      j. d. gross  snacks,sweet snacks,cocoa and its products,cho...   
4  green & black's  snacks,snacks sucrés,cacao et dérivés,chocolat...   

            code  energy_100g  fat_100g nutriscore_grade  \
0  7622210449283       1960.0      17.5                e   
1  3046920029759       2477.0      55.0                e   
2  3017620425035       2252.0      30.9                e   
3       20995553       2510.0      48.0                d   
4  7622210578464       2428.0      42.0                e   

                           product_name  proteins_100g  salt_100g  sugars_100g  
0                  prince goût chocolat            6.5      0.50

'data\\processed\\chocolats_clean_20251216_215516.parquet'

### Partie 4 : Stockage et orchestration

##### 4.3 Exécution

Création du package __init__.py

Exécuter
uv run python -m pipeline.main --category chocolats --name chocolats_fr

### Partie 5 : Vérification avec DuckDB

In [27]:
"""Vérification des données nettoyées avec DuckDB."""
import duckdb

# Connexion
con = duckdb.connect()

# Lire uniquement les fichiers Parquet des données nettoyées
df = con.execute("""
    SELECT * 
    FROM read_parquet('data/processed/chocolats_clean_*.parquet')
""").df()

# Identifier les colonnes numériques
num_cols = df.select_dtypes(include=['float', 'int']).columns.tolist()
print("Colonnes numériques détectées :", num_cols)

# Statistiques globales pour toutes les colonnes numériques
for col in num_cols:
    avg_val = df[col].mean()
    print(f"Moyenne de {col} : {avg_val:.2f}")

# Valeurs manquantes par colonne
missing_values = df.isnull().sum()
print("\nValeurs manquantes par colonne :")
print(missing_values[missing_values > 0])

# Identifier les colonnes de marques
brand_cols = [c for c in df.columns if "brands" in c.lower()]
print("\nColonnes marques détectées :", brand_cols)

# Nombre de valeurs uniques par colonne marque
print("\nValeurs uniques par colonne marque :")
for col in brand_cols:
    unique_count = df[col].nunique()
    print(f"{col}: {unique_count} valeurs uniques")


Colonnes numériques détectées : ['energy_100g', 'fat_100g', 'proteins_100g', 'salt_100g', 'sugars_100g']
Moyenne de energy_100g : 1887.27
Moyenne de fat_100g : 24.73
Moyenne de proteins_100g : 6.99
Moyenne de salt_100g : 0.33
Moyenne de sugars_100g : 33.05

Valeurs manquantes par colonne :
Series([], dtype: int64)

Colonnes marques détectées : ['brands']

Valeurs uniques par colonne marque :
brands: 4146 valeurs uniques


In [None]:
brand_col = 'brands'
unique_brands = df[brand_col].nunique()
print(f"\nNombre de marques uniques : {unique_brands}")
print(df[brand_col].value_counts().head(10))  



Nombre de marques uniques : 4146
brands
non renseigné    950
u                425
carrefour        386
lindt            383
                 337
leader price     299
casino           269
nestlé           242
cora             229
auchan           219
Name: count, dtype: int64


Nous avons identifié 4146 marques uniques dans la colonne brands. Parmi ces marques, certaines valeurs sont manquantes ou mal renseignées, par exemple non renseigné apparaît 950 fois et des entrées ambiguës comme u apparaissent 425 fois. Les marques les plus fréquentes sont carrefour (386 occurrences), lindt (383), une valeur vide (337), leader price (299), casino (269), nestlé (242), cora (229) et auchan (219). Cette distribution montre que, bien que le dataset soit riche en marques différentes, une proportion significative des données nécessite un nettoyage ou une standardisation supplémentaire pour éviter les doublons et homogénéiser les noms des marques.