# Data Sourcing : De Débutant à Pro 🚀

## Table des matières

1. [Setup et imports](#setup)
2. [Partie 1 : Maîtriser les fichiers CSV](#csv)
3. [Partie 2 : APIs - Collecter des données depuis le web](#api)
4. [Partie 3 : Web Scraping avec BeautifulSoup](#scraping)
5. [Partie 4 : Projet complet - Pipeline de données](#project)
6. [Exercices et défis](#exercises)

## 1. Setup et imports {#setup}

Commençons par installer et importer toutes les bibliothèques nécessaires.

In [1]:
# Installation des packages nécessaires (à exécuter une seule fois)
!pip install requests beautifulsoup4 pandas lxml

# Imports essentiels
import csv
import json
import time
import os
from datetime import datetime
import requests
from bs4 import BeautifulSoup
import pandas as pd
from pprint import pprint
import warnings
warnings.filterwarnings('ignore')

print("✅ Tous les packages sont importés avec succès!")

Collecting pandas
  Downloading pandas-2.3.0-cp313-cp313-macosx_11_0_arm64.whl.metadata (91 kB)
Collecting lxml
  Downloading lxml-5.4.0-cp313-cp313-macosx_10_13_universal2.whl.metadata (3.5 kB)
Collecting numpy>=1.26.0 (from pandas)
  Downloading numpy-2.3.0-cp313-cp313-macosx_14_0_arm64.whl.metadata (62 kB)
Collecting pytz>=2020.1 (from pandas)
  Using cached pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Using cached tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.3.0-cp313-cp313-macosx_11_0_arm64.whl (10.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.7/10.7 MB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hDownloading lxml-5.4.0-cp313-cp313-macosx_10_13_universal2.whl (8.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.1/8.1 MB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading numpy-2.3.0-cp313-cp313-macos

In [5]:
# Installation des packages nécessaires (à exécuter une seule fois)
!pip install requests beautifulsoup4 pandas lxml

# Imports essentiels
import csv
import json
import time
import os
from datetime import datetime
import requests
from bs4 import BeautifulSoup
import pandas as pd
from pprint import pprint
import warnings
warnings.filterwarnings('ignore')

print("✅ Tous les packages sont importés avec succès!")

✅ Tous les packages sont importés avec succès!


In [6]:
# Créer la structure de dossiers pour notre projet
folders = ['data', 'data/raw', 'data/processed', 'outputs', 'cache']
for folder in folders:
    os.makedirs(folder, exist_ok=True)

print("📁 Structure de dossiers créée:")
for folder in folders:
    print(f"  └── {folder}/")

📁 Structure de dossiers créée:
  └── data/
  └── data/raw/
  └── data/processed/
  └── outputs/
  └── cache/


---

## 2. Partie 1 : Maîtriser les fichiers CSV {#csv}

### 2.1 Télécharger un fichier CSV depuis internet

In [7]:
# Méthode 1 : Avec requests
def download_csv(url, filename):
    """Télécharge un fichier CSV depuis une URL"""
    response = requests.get(url)
    response.raise_for_status()  # Vérifier les erreurs

    filepath = f'data/raw/{filename}'
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(response.text)

    print(f"✅ Fichier téléchargé: {filepath}")
    return filepath

In [8]:
# Télécharger un fichier d'exemple
url = 'https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv'
csv_file = download_csv(url, 'addresses.csv')

✅ Fichier téléchargé: data/raw/addresses.csv


In [9]:
# Aperçu rapide du fichier
print("📄 Aperçu du fichier CSV:")
print("-" * 50)
with open(csv_file, 'r') as f:
    for i, line in enumerate(f):
        if i < 5:  # Afficher les 5 premières lignes
            print(line.strip())
print("-" * 50)

📄 Aperçu du fichier CSV:
--------------------------------------------------
John,Doe,120 jefferson st.,Riverside, NJ, 08075
Jack,McGinnis,220 hobo Av.,Phila, PA,09119
"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075
Stephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234
,Blankman,,SomeTown, SD, 00298
--------------------------------------------------


### 2.2 Lire un CSV sans pandas (méthode native Python)

In [None]:
# Lecture basique avec le module csv
print("🔍 Lecture avec csv.reader():")
print("-" * 50)

with open(csv_file, 'r') as f:
    reader = csv.reader(f, skipinitialspace=True)
    for i, row in enumerate(reader):
        if i < 3:  # Afficher les 3 premières lignes
            print(f"Ligne {i+1}: {row}")
            print(f"  → Prénom: {row[0]}")
            print(f"  → Nom: {row[1]}")
            print(f"  → Adresse: {row[2]}")
            print()

### 2.3 CSV avec en-têtes

In [None]:
# Télécharger un CSV avec en-têtes
url_with_headers = 'https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv'
csv_with_headers = download_csv(url_with_headers, 'biostats.csv')

# Lire avec DictReader
print("📊 Lecture avec csv.DictReader():")
print("-" * 50)

with open(csv_with_headers, 'r') as f:
    reader = csv.DictReader(f, skipinitialspace=True)

    # Afficher les colonnes disponibles
    print(f"Colonnes: {reader.fieldnames}\n")

    # Lire quelques lignes
    for i, row in enumerate(reader):
        if i < 3:
            print(f"Personne {i+1}:")
            print(f"  Nom: {row['Name']}")
            print(f"  Sexe: {row['Sex']}")
            print(f"  Âge: {row['Age']} ans")
            print(f"  Taille: {row['Height (in)']} pouces")
            print()

### 2.4 Analyser et nettoyer les données CSV

In [None]:
def analyze_csv(filepath):
    """Analyse un fichier CSV et retourne des statistiques"""
    stats = {
        'total_rows': 0,
        'total_columns': 0,
        'empty_cells': 0,
        'column_names': [],
        'sample_data': []
    }

    with open(filepath, 'r') as f:
        reader = csv.DictReader(f, skipinitialspace=True)
        stats['column_names'] = reader.fieldnames
        stats['total_columns'] = len(reader.fieldnames)

        for i, row in enumerate(reader):
            stats['total_rows'] += 1

            # Compter les cellules vides
            for value in row.values():
                if not value or value.strip() == '':
                    stats['empty_cells'] += 1

            # Garder quelques exemples
            if i < 3:
                stats['sample_data'].append(row)

    return stats

# Analyser notre CSV
print("📈 Analyse du fichier CSV:")
print("=" * 50)
stats = analyze_csv(csv_with_headers)
print(f"Total de lignes: {stats['total_rows']}")
print(f"Total de colonnes: {stats['total_columns']}")
print(f"Cellules vides: {stats['empty_cells']}")
print(f"Colonnes: {', '.join(stats['column_names'])}")

### 2.5 Écrire un fichier CSV

In [None]:
# Créer des données à sauvegarder
beatles = [
    {'prénom': 'John', 'nom': 'Lennon', 'instrument': 'guitare', 'année_naissance': 1940},
    {'prénom': 'Paul', 'nom': 'McCartney', 'instrument': 'basse', 'année_naissance': 1942},
    {'prénom': 'George', 'nom': 'Harrison', 'instrument': 'guitare', 'année_naissance': 1943},
    {'prénom': 'Ringo', 'nom': 'Starr', 'instrument': 'batterie', 'année_naissance': 1940}
]

# Écrire le CSV
output_file = 'data/processed/beatles.csv'

with open(output_file, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['prénom', 'nom', 'instrument', 'année_naissance']
    writer = csv.DictWriter(f, fieldnames=fieldnames)

    writer.writeheader()
    for beatle in beatles:
        writer.writerow(beatle)

print(f"✅ Fichier créé: {output_file}")

# Vérifier le résultat
print("\n📄 Contenu du fichier créé:")
print("-" * 50)
with open(output_file, 'r') as f:
    print(f.read())

### 2.6 Exercice pratique : Fusionner plusieurs CSV

In [None]:
def merge_csv_files(file_list, output_file, key_column=None):
    """
    Fusionne plusieurs fichiers CSV en un seul
    Si key_column est spécifié, supprime les doublons basés sur cette colonne
    """
    all_data = []
    all_fieldnames = set()

    # Lire tous les fichiers
    for filepath in file_list:
        with open(filepath, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            all_fieldnames.update(reader.fieldnames)
            all_data.extend(list(reader))

    # Supprimer les doublons si nécessaire
    if key_column and key_column in all_fieldnames:
        seen = set()
        unique_data = []
        for row in all_data:
            key = row.get(key_column)
            if key not in seen:
                seen.add(key)
                unique_data.append(row)
        all_data = unique_data

    # Écrire le fichier fusionné
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=sorted(all_fieldnames))
        writer.writeheader()
        writer.writerows(all_data)

    print(f"✅ {len(file_list)} fichiers fusionnés → {output_file}")
    print(f"   Total: {len(all_data)} lignes, {len(all_fieldnames)} colonnes")

    return all_data

# Test de fusion (créons d'abord des fichiers d'exemple)
# Vous pouvez tester cette fonction avec vos propres fichiers CSV

## 3. Partie 2 : APIs - Collecter des données depuis le web {#api}

### 3.1 Comprendre les APIs REST

In [None]:

# Anatomie d'une URL d'API
def explain_api_url(url):
    """Décompose et explique une URL d'API"""
    from urllib.parse import urlparse, parse_qs

    parsed = urlparse(url)
    params = parse_qs(parsed.query)

    print("🔍 Analyse de l'URL:")
    print(f"  📍 Protocole: {parsed.scheme}")
    print(f"  🌐 Domaine: {parsed.netloc}")
    print(f"  📂 Chemin: {parsed.path}")
    if params:
        print(f"  ❓ Paramètres:")
        for key, values in params.items():
            print(f"     - {key}: {', '.join(values)}")

# Exemple
url_example = "https://api.github.com/users/octocat?type=user&sort=created"
explain_api_url(url_example)

### 3.2 Première requête API

In [None]:
# API simple sans authentification : GitHub
def get_github_user(username):
    """Récupère les informations d'un utilisateur GitHub"""
    url = f'https://api.github.com/users/{username}'

    print(f"🔄 Requête vers: {url}")
    response = requests.get(url)

    # Vérifier le statut
    print(f"📊 Status code: {response.status_code}")

    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print(f"❌ Erreur: {response.status_code}")
        return None

# Tester avec un utilisateur
user_data = get_github_user('torvalds')  # Linus Torvalds

if user_data:
    print("\n👤 Informations utilisateur:")
    print(f"  Nom: {user_data['name']}")
    print(f"  Entreprise: {user_data['company']}")
    print(f"  Bio: {user_data['bio'][:100]}..." if user_data['bio'] else "  Bio: N/A")
    print(f"  Repos publics: {user_data['public_repos']}")
    print(f"  Followers: {user_data['followers']}")

In [None]:
### 3.3 API avec paramètres

In [None]:
# API de livres Open Library
def search_books(query, limit=5):
    """Recherche des livres via l'API Open Library"""
    base_url = 'https://openlibrary.org/search.json'

    params = {
        'q': query,
        'limit': limit,
        'fields': 'title,author_name,first_publish_year,isbn,number_of_pages_median'
    }

    print(f"🔍 Recherche de livres: '{query}'")
    response = requests.get(base_url, params=params)

    if response.status_code == 200:
        data = response.json()
        return data['docs']
    else:
        print(f"❌ Erreur: {response.status_code}")
        return []

# Rechercher des livres
books = search_books('Python programming', limit=3)

print(f"\n📚 {len(books)} livres trouvés:\n")
for i, book in enumerate(books, 1):
    print(f"{i}. {book.get('title', 'Sans titre')}")
    authors = book.get('author_name', ['Auteur inconnu'])
    print(f"   Auteur(s): {', '.join(authors[:2])}")
    print(f"   Année: {book.get('first_publish_year', 'N/A')}")
    print(f"   Pages: {book.get('number_of_pages_median', 'N/A')}")
    print()
```

### 3.4 Gestion avancée des APIs

```python
class APIClient:
    """Client API générique avec fonctionnalités avancées"""

    def __init__(self, base_url, headers=None, timeout=10):
        self.base_url = base_url.rstrip('/')
        self.headers = headers or {}
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update(self.headers)

        # Statistiques
        self.stats = {
            'requests': 0,
            'errors': 0,
            'total_time': 0
        }

    def _make_request(self, method, endpoint, **kwargs):
        """Effectue une requête avec gestion d'erreurs"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"

        start_time = time.time()
        self.stats['requests'] += 1

        try:
            response = self.session.request(
                method, url, timeout=self.timeout, **kwargs
            )
            response.raise_for_status()

            self.stats['total_time'] += time.time() - start_time
            return response

        except requests.exceptions.RequestException as e:
            self.stats['errors'] += 1
            print(f"❌ Erreur: {e}")
            return None

    def get(self, endpoint, params=None):
        """Requête GET"""
        return self._make_request('GET', endpoint, params=params)

    def post(self, endpoint, data=None, json=None):
        """Requête POST"""
        return self._make_request('POST', endpoint, data=data, json=json)

    def get_stats(self):
        """Affiche les statistiques"""
        print("\n📊 Statistiques API:")
        print(f"  Total requêtes: {self.stats['requests']}")
        print(f"  Erreurs: {self.stats['errors']}")
        if self.stats['requests'] > 0:
            avg_time = self.stats['total_time'] / self.stats['requests']
            print(f"  Temps moyen: {avg_time:.2f}s")
            success_rate = (1 - self.stats['errors']/self.stats['requests']) * 100
            print(f"  Taux de succès: {success_rate:.1f}%")

# Utilisation du client
github_client = APIClient('https://api.github.com')

# Faire plusieurs requêtes
endpoints = ['users/github', 'users/microsoft', 'users/google']
for endpoint in endpoints:
    response = github_client.get(endpoint)
    if response:
        data = response.json()
        print(f"✅ {data['name']}: {data['public_repos']} repos")

github_client.get_stats()

### 3.5 API avec pagination

In [None]:
def get_all_pages(base_url, params=None, max_pages=5):
    """
    Récupère toutes les pages d'une API paginée
    Exemple avec l'API GitHub
    """
    all_results = []
    page = 1

    while page <= max_pages:
        # Ajouter la pagination aux paramètres
        current_params = params.copy() if params else {}
        current_params['page'] = page
        current_params['per_page'] = 30  # GitHub default

        print(f"📄 Récupération page {page}...")
        response = requests.get(base_url, params=current_params)

        if response.status_code != 200:
            print(f"❌ Erreur page {page}: {response.status_code}")
            break

        data = response.json()

        # Si pas de données, on arrête
        if not data:
            print(f"✅ Fin de la pagination à la page {page}")
            break

        all_results.extend(data)
        page += 1

        # Respecter le rate limit
        time.sleep(0.5)

    return all_results

# Exemple : récupérer les repos d'une organisation
org_repos = get_all_pages(
    'https://api.github.com/orgs/python/repos',
    params={'type': 'public', 'sort': 'stars'},
    max_pages=3
)

print(f"\n📦 {len(org_repos)} repositories récupérés")
print("\nTop 5 par étoiles:")
for i, repo in enumerate(org_repos[:5], 1):
    print(f"{i}. {repo['name']} ⭐ {repo['stargazers_count']}")

### 3.6 Exercice : Créer un wrapper d'API complet

In [None]:
class BookAPI:
    """Wrapper pour l'API Open Library"""

    def __init__(self):
        self.base_url = 'https://openlibrary.org'
        self.cache = {}

    def search_books(self, query, **kwargs):
        """Recherche de livres"""
        endpoint = f"{self.base_url}/search.json"
        params = {'q': query, **kwargs}

        # Simple cache en mémoire
        cache_key = f"search_{query}_{str(kwargs)}"
        if cache_key in self.cache:
            print("📦 Résultat depuis le cache")
            return self.cache[cache_key]

        response = requests.get(endpoint, params=params)
        if response.status_code == 200:
            data = response.json()
            self.cache[cache_key] = data
            return data
        return None

    def get_book_by_isbn(self, isbn):
        """Récupère un livre par ISBN"""
        endpoint = f"{self.base_url}/api/books"
        params = {
            'bibkeys': f'ISBN:{isbn}',
            'format': 'json',
            'jscmd': 'data'
        }

        response = requests.get(endpoint, params=params)
        if response.status_code == 200:
            data = response.json()
            return data.get(f'ISBN:{isbn}')
        return None

    def get_author(self, author_id):
        """Récupère les infos d'un auteur"""
        endpoint = f"{self.base_url}/authors/{author_id}.json"
        response = requests.get(endpoint)
        if response.status_code == 200:
            return response.json()
        return None

In [None]:
# Utilisation
book_api = BookAPI()

# Rechercher
results = book_api.search_books('Harry Potter', limit=1)
if results and results['docs']:
    book = results['docs'][0]
    print(f"📖 Trouvé: {book.get('title')}")
    print(f"   Auteur: {book.get('author_name', ['Unknown'])[0]}")

    # Récupérer par ISBN si disponible
    if 'isbn' in book and book['isbn']:
        isbn = book['isbn'][0]
        print(f"\n🔍 Recherche détails pour ISBN: {isbn}")
        details = book_api.get_book_by_isbn(isbn)
        if details:
            print(f"   Éditeur: {details.get('publishers', [{}])[0].get('name', 'N/A')}")
            print(f"   Pages: {details.get('number_of_pages', 'N/A')}")

---

## 4. Partie 3 : Web Scraping avec BeautifulSoup {#scraping}

### 4.1 Introduction au HTML et BeautifulSoup

In [None]:
# HTML d'exemple pour comprendre la structure
html_example = """
<html>
    <head>
        <title>Ma Page d'Exemple</title>
    </head>
    <body>
        <h1 class="main-title">Bienvenue sur ma page</h1>
        <div class="content">
            <p id="intro">Ceci est une introduction.</p>
            <ul class="list">
                <li class="item">Premier élément</li>
                <li class="item">Deuxième élément</li>
                <li class="item special">Élément spécial</li>
            </ul>
        </div>
        <div class="footer">
            <p>© 2024 Mon Site</p>
        </div>
    </body>
</html>
"""

# Parser avec BeautifulSoup
soup = BeautifulSoup(html_example, 'html.parser')

print("🌳 Structure HTML parsée")
print("=" * 50)

# Naviguer dans le HTML
print(f"Titre de la page: {soup.title.string}")
print(f"H1 principal: {soup.h1.string}")
print(f"Paragraphe intro: {soup.find(id='intro').string}")

# Trouver tous les éléments de liste
print("\nÉléments de la liste:")
for li in soup.find_all('li'):
    classes = li.get('class', [])
    print(f"  - {li.string} (classes: {', '.join(classes)})")

### 4.2 Scraping d'un vrai site web

In [None]:
def scrape_quotes():
    """Scrape des citations depuis quotes.toscrape.com"""
    url = 'http://quotes.toscrape.com/'

    print(f"🕷️ Scraping de {url}")
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')

    quotes_data = []

    # Trouver toutes les citations
    quotes = soup.find_all('div', class_='quote')

    for quote in quotes:
        # Extraire le texte
        text = quote.find('span', class_='text').text

        # Extraire l'auteur
        author = quote.find('small', class_='author').text

        # Extraire les tags
        tags = [tag.text for tag in quote.find_all('a', class_='tag')]

        quotes_data.append({
            'text': text,
            'author': author,
            'tags': tags
        })

    return quotes_data

# Scraper les citations
quotes = scrape_quotes()

print(f"\n📜 {len(quotes)} citations trouvées:\n")
for i, quote in enumerate(quotes[:3], 1):
    print(f"{i}. {quote['text'][:60]}...")
    print(f"   - {quote['author']}")
    print(f"   Tags: {', '.join(quote['tags'])}")
    print()

### 4.3 Scraping avancé : IMDB

In [None]:
def scrape_imdb_movies(num_movies=10):
    """
    Scrape les films populaires d'IMDB
    Note: Dans un cas réel, respectez robots.txt et les conditions d'utilisation
    """
    url = 'https://www.imdb.com/chart/moviemeter/'

    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept-Language': 'en-US,en;q=0.9'
    }

    print(f"🎬 Scraping IMDB: {url}")
    response = requests.get(url, headers=headers)

    if response.status_code != 200:
        print(f"❌ Erreur: {response.status_code}")
        return []

    soup = BeautifulSoup(response.content, 'html.parser')
    movies = []

    # Chercher le tableau des films
    movie_table = soup.find('tbody', class_='lister-list')
    if not movie_table:
        print("❌ Structure de la page a changé")
        return []

    movie_rows = movie_table.find_all('tr')[:num_movies]

    for row in movie_rows:
        try:
            # Titre
            title_column = row.find('td', class_='titleColumn')
            title = title_column.find('a').text.strip()

            # Année
            year = title_column.find('span', class_='secondaryInfo').text.strip('()')

            # Note
            rating_column = row.find('td', class_='ratingColumn')
            rating = rating_column.find('strong')
            rating = rating.text if rating else 'N/A'

            # Rang
            rank = row.find('td', class_='posterColumn').find('span')['data-value']

            movies.append({
                'rank': rank,
                'title': title,
                'year': year,
                'rating': rating
            })

        except Exception as e:
            print(f"⚠️ Erreur lors du parsing d'un film: {e}")
            continue

    return movies

# Note: Cette fonction est à des fins éducatives
# Toujours vérifier robots.txt et respecter les limites
print("⚠️ Note: Ceci est un exemple éducatif.")
print("   En production, utilisez l'API IMDB ou respectez robots.txt\n")

# Pour l'exemple, nous allons créer des données simulées
mock_movies = [
    {'rank': '1', 'title': 'The Shawshank Redemption', 'year': '1994', 'rating': '9.3'},
    {'rank': '2', 'title': 'The Godfather', 'year': '1972', 'rating': '9.2'},
    {'rank': '3', 'title': 'The Dark Knight', 'year': '2008', 'rating': '9.0'},
]

print("🎬 Top 3 films (données simulées):")
for movie in mock_movies:
    print(f"{movie['rank']}. {movie['title']} ({movie['year']}) - ⭐ {movie['rating']}")

### 4.4 Techniques de navigation avancées

In [None]:
# HTML complexe pour démonstration
complex_html = """
<div class="container">
    <article class="post" id="post1">
        <header>
            <h2>Premier Article</h2>
            <span class="author">Par Alice</span>
            <time>2024-01-15</time>
        </header>
        <div class="content">
            <p>Premier paragraphe de l'article.</p>
            <p>Deuxième paragraphe avec <a href="/link1">un lien</a>.</p>
        </div>
        <footer>
            <span class="tags">Python, Web Scraping</span>
            <span class="comments">5 commentaires</span>
        </footer>
    </article>

    <article class="post" id="post2">
        <header>
            <h2>Deuxième Article</h2>
            <span class="author">Par Bob</span>
            <time>2024-01-16</time>
        </header>
        <div class="content">
            <p>Contenu du deuxième article.</p>
        </div>
    </article>
</div>
"""
soup = BeautifulSoup(complex_html, 'html.parser')

print("🧭 Navigation avancée dans le HTML")
print("=" * 50)

# 1. Naviguer avec les relations parent/enfant
article = soup.find('article')
header = article.find('header')
print(f"1. Titre de l'article: {header.h2.string}")
print(f"   Parent du header: {header.parent.name}")

# 2. Naviguer entre siblings
print(f"\n2. Siblings du header:")
for sibling in header.find_next_siblings():
    print(f"   - {sibling.name}: {sibling.get('class', [])}")

# 3. Recherche avec fonctions personnalisées
def has_multiple_classes(tag):
    """Trouve les tags avec plusieurs classes"""
    return tag.has_attr('class') and len(tag['class']) > 1

tags_with_multiple_classes = soup.find_all(has_multiple_classes)
print(f"\n3. Tags avec plusieurs classes: {len(tags_with_multiple_classes)}")

# 4. Extraction de données structurées
def extract_article_data(article):
    """Extrait toutes les données d'un article"""
    return {
        'id': article.get('id'),
        'title': article.find('h2').string,
        'author': article.find('span', class_='author').text.replace('Par ', ''),
        'date': article.find('time').string,
        'content': ' '.join(p.get_text(strip=True) for p in article.find_all('p')),
        'tags': article.find('span', class_='tags').text if article.find('span', class_='tags') else 'N/A'
    }

print("\n4. Données extraites:")
for article in soup.find_all('article'):
    data = extract_article_data(article)
    print(f"\n   Article: {data['title']}")
    print(f"   Auteur: {data['author']}")
    print(f"   Date: {data['date']}")

### 4.5 Gestion des erreurs et robustesse

In [None]:
class RobustScraper:
    """Scraper robuste avec gestion d'erreurs"""

    def __init__(self, retry_count=3, delay=1):
        self.retry_count = retry_count
        self.delay = delay
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; Educational Bot)'
        })

    def fetch_page(self, url):
        """Récupère une page avec retry"""
        for attempt in range(self.retry_count):
            try:
                print(f"🔄 Tentative {attempt + 1}/{self.retry_count} pour {url}")
                response = self.session.get(url, timeout=10)
                response.raise_for_status()
                return response

            except requests.RequestException as e:
                print(f"❌ Erreur: {e}")
                if attempt < self.retry_count - 1:
                    print(f"⏳ Attente {self.delay}s avant retry...")
                    time.sleep(self.delay)
                else:
                    print("❌ Échec après toutes les tentatives")
                    return None

    def safe_extract(self, soup, selector, attribute=None, default='N/A'):
        """Extraction sécurisée d'éléments"""
        try:
            element = soup.select_one(selector)
            if element:
                if attribute:
                    return element.get(attribute, default)
                return element.get_text(strip=True)
            return default
        except Exception as e:
            print(f"⚠️ Erreur d'extraction pour {selector}: {e}")
            return default

    def scrape_with_structure(self, url, structure):
        """
        Scrape selon une structure définie
        structure = {
            'title': {'selector': 'h1', 'attribute': None},
            'image': {'selector': 'img.main', 'attribute': 'src'}
        }
        """
        response = self.fetch_page(url)
        if not response:
            return None

        soup = BeautifulSoup(response.content, 'html.parser')
        data = {}

        for field, config in structure.items():
            data[field] = self.safe_extract(
                soup,
                config['selector'],
                config.get('attribute'),
                config.get('default', 'N/A')
            )

        return data

# Utilisation
scraper = RobustScraper()

# Définir la structure à extraire
structure = {
    'title': {'selector': 'h1.main-title'},
    'author': {'selector': 'span.author'},
    'date': {'selector': 'time', 'attribute': 'datetime'},
    'content': {'selector': 'div.content'}
}

# Test avec une URL (utiliser une vraie URL ici)
# data = scraper.scrape_with_structure('https://example.com', structure)

---

## 5. Partie 4 : Projet complet - Pipeline de données {#project}

### 5.1 Architecture du pipeline

In [None]:
import hashlib
import pickle
from abc import ABC, abstractmethod

class DataSource(ABC):
    """Classe abstraite pour les sources de données"""

    @abstractmethod
    def extract(self):
        pass

    @abstractmethod
    def validate(self, data):
        pass

class CSVSource(DataSource):
    def __init__(self, filepath):
        self.filepath = filepath

    def extract(self):
        """Extrait les données du CSV"""
        data = []
        with open(self.filepath, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                data.append(row)
        return data

    def validate(self, data):
        """Valide les données CSV"""
        if not data:
            return False
        # Vérifier que toutes les lignes ont les mêmes clés
        keys = set(data[0].keys())
        return all(set(row.keys()) == keys for row in data)

class APISource(DataSource):
    def __init__(self, url, params=None):
        self.url = url
        self.params = params or {}

    def extract(self):
        """Extrait les données de l'API"""
        response = requests.get(self.url, params=self.params)
        if response.status_code == 200:
            return response.json()
        return []

    def validate(self, data):
        """Valide les données API"""
        return isinstance(data, (list, dict)) and len(str(data)) > 0

class WebSource(DataSource):
    def __init__(self, url, extractor_func):
        self.url = url
        self.extractor_func = extractor_func

    def extract(self):
        """Extrait les données du site web"""
        response = requests.get(self.url)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            return self.extractor_func(soup)
        return []

    def validate(self, data):
        """Valide les données web"""
        return isinstance(data, list) and len(data) > 0

print("✅ Classes de base définies")

### 5.2 Pipeline de transformation

In [None]:
class DataPipeline:
    """Pipeline complet de traitement de données"""

    def __init__(self, name):
        self.name = name
        self.sources = []
        self.transformations = []
        self.data = []
        self.metadata = {
            'created': datetime.now(),
            'sources_count': 0,
            'records_count': 0,
            'errors': []
        }

    def add_source(self, source):
        """Ajoute une source de données"""
        self.sources.append(source)
        self.metadata['sources_count'] = len(self.sources)
        return self

    def add_transformation(self, func):
        """Ajoute une transformation"""
        self.transformations.append(func)
        return self

    def extract_all(self):
        """Extrait données de toutes les sources"""
        print(f"\n🚀 Démarrage pipeline: {self.name}")
        print("=" * 50)

        all_data = []

        for i, source in enumerate(self.sources, 1):
            print(f"\n📥 Source {i}/{len(self.sources)}: {source.__class__.__name__}")

            try:
                data = source.extract()

                if source.validate(data):
                    if isinstance(data, dict):
                        data = [data]

                    all_data.extend(data)
                    print(f"   ✅ {len(data)} enregistrements extraits")
                else:
                    error = f"Validation échouée pour {source.__class__.__name__}"
                    print(f"   ❌ {error}")
                    self.metadata['errors'].append(error)

            except Exception as e:
                error = f"Erreur extraction {source.__class__.__name__}: {str(e)}"
                print(f"   ❌ {error}")
                self.metadata['errors'].append(error)

        self.data = all_data
        self.metadata['records_count'] = len(all_data)
        return self

    def transform_all(self):
        """Applique toutes les transformations"""
        print(f"\n🔄 Application de {len(self.transformations)} transformations")

        for i, transform in enumerate(self.transformations, 1):
            print(f"   📝 Transformation {i}: {transform.__name__}")
            try:
                self.data = transform(self.data)
                print(f"      ✅ Succès ({len(self.data)} enregistrements)")
            except Exception as e:
                error = f"Erreur transformation {transform.__name__}: {str(e)}"
                print(f"      ❌ {error}")
                self.metadata['errors'].append(error)

        return self

    def save(self, format='json', output_dir='outputs'):
        """Sauvegarde les données"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

        if format == 'json':
            filepath = f"{output_dir}/{self.name}_{timestamp}.json"
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump({
                    'metadata': {k: str(v) for k, v in self.metadata.items()},
                    'data': self.data
                }, f, indent=2, ensure_ascii=False)

        elif format == 'csv':
            filepath = f"{output_dir}/{self.name}_{timestamp}.csv"
            if self.data:
                with open(filepath, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.DictWriter(f, fieldnames=self.data[0].keys())
                    writer.writeheader()
                    writer.writerows(self.data)

        print(f"\n💾 Données sauvegardées: {filepath}")
        return filepath

    def get_summary(self):
        """Résumé du pipeline"""
        print(f"\n📊 Résumé du pipeline: {self.name}")
        print("=" * 50)
        print(f"Sources: {self.metadata['sources_count']}")
        print(f"Enregistrements: {self.metadata['records_count']}")
        print(f"Erreurs: {len(self.metadata['errors'])}")

        if self.metadata['errors']:
            print("\n⚠️ Erreurs rencontrées:")
            for error in self.metadata['errors']:
                print(f"   - {error}")

# Définir des transformations
def clean_text(data):
    """Nettoie les champs texte"""
    for record in data:
        for key, value in record.items():
            if isinstance(value, str):
                record[key] = value.strip()
    return data

def add_timestamp(data):
    """Ajoute un timestamp à chaque enregistrement"""
    timestamp = datetime.now().isoformat()
    for record in data:
        record['processed_at'] = timestamp
    return data

def remove_duplicates(data):
    """Supprime les doublons basés sur un hash"""
    seen = set()
    unique_data = []

    for record in data:
        # Créer un hash unique pour chaque enregistrement
        record_hash = hashlib.md5(
            json.dumps(record, sort_keys=True).encode()
        ).hexdigest()

        if record_hash not in seen:
            seen.add(record_hash)
            unique_data.append(record)

    print(f"      Doublons supprimés: {len(data) - len(unique_data)}")
    return unique_data

print("✅ Pipeline et transformations définis")

### 5.3 Exemple d'utilisation complète

In [None]:
# Créer un pipeline complet
pipeline = DataPipeline("donnees_combinees")

# 1. Ajouter une source CSV
csv_source = CSVSource('data/processed/beatles.csv')
pipeline.add_source(csv_source)

# 2. Ajouter une source API (exemple avec une API de test)
api_source = APISource(
    'https://jsonplaceholder.typicode.com/users',
    params={'_limit': 5}
)
pipeline.add_source(api_source)

# 3. Ajouter une source Web
def extract_quotes_simple(soup):
    """Extracteur simple pour les citations"""
    quotes = []
    # Simuler l'extraction (en production, utiliser le vrai scraping)
    quotes.append({
        'text': 'La vie est belle',
        'author': 'Anonyme',
        'source': 'web'
    })
    return quotes

web_source = WebSource('http://quotes.toscrape.com/', extract_quotes_simple)
pipeline.add_source(web_source)

# 4. Ajouter des transformations
pipeline.add_transformation(clean_text)
pipeline.add_transformation(add_timestamp)
pipeline.add_transformation(remove_duplicates)

# 5. Exécuter le pipeline
pipeline.extract_all().transform_all()

# 6. Sauvegarder les résultats
json_file = pipeline.save(format='json')
csv_file = pipeline.save(format='csv')

# 7. Afficher le résumé
pipeline.get_summary()

# 8. Aperçu des données finales
print("\n👀 Aperçu des données finales:")
for i, record in enumerate(pipeline.data[:3], 1):
    print(f"\nEnregistrement {i}:")
    for key, value in list(record.items())[:4]:  # Afficher 4 premiers champs
        print(f"  {key}: {value}")

### 5.4 Cache et optimisation

In [None]:
class CachedDataSource:
    """Wrapper pour ajouter du cache à n'importe quelle source"""

    def __init__(self, source, cache_dir='cache', expire_hours=24):
        self.source = source
        self.cache_dir = cache_dir
        self.expire_hours = expire_hours
        os.makedirs(cache_dir, exist_ok=True)

    def _get_cache_key(self):
        """Génère une clé de cache unique"""
        source_str = f"{self.source.__class__.__name__}_{self.source.__dict__}"
        return hashlib.md5(source_str.encode()).hexdigest()

    def _get_cache_path(self):
        """Chemin du fichier cache"""
        return os.path.join(self.cache_dir, f"{self._get_cache_key()}.pkl")

    def _is_cache_valid(self, cache_path):
        """Vérifie si le cache est encore valide"""
        if not os.path.exists(cache_path):
            return False

        # Vérifier l'âge du cache
        file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
        age = datetime.now() - file_time
        return age.total_seconds() < self.expire_hours * 3600

    def extract(self):
        """Extrait avec mise en cache"""
        cache_path = self._get_cache_path()

        # Vérifier le cache
        if self._is_cache_valid(cache_path):
            print(f"   📦 Utilisation du cache: {os.path.basename(cache_path)}")
            with open(cache_path, 'rb') as f:
                return pickle.load(f)

        # Extraire les données
        print(f"   🔄 Extraction depuis la source...")
        data = self.source.extract()

        # Sauvegarder en cache
        with open(cache_path, 'wb') as f:
            pickle.dump(data, f)

        return data

    def validate(self, data):
        """Délègue la validation à la source originale"""
        return self.source.validate(data)

# Utilisation avec cache
print("🧪 Test du système de cache")
print("=" * 50)

# Source API avec cache
api_source = APISource('https://jsonplaceholder.typicode.com/posts', {'_limit': 10})
cached_api = CachedDataSource(api_source, expire_hours=1)

# Premier appel - pas de cache
print("\n1️⃣ Premier appel:")
data1 = cached_api.extract()
print(f"   Données récupérées: {len(data1)} éléments")

# Deuxième appel - depuis le cache
print("\n2️⃣ Deuxième appel:")
data2 = cached_api.extract()
print(f"   Données récupérées: {len(data2)} éléments")

print("\n✅ Le cache fonctionne!")

---

## 6. Exercices et défis {#exercises}

### Défi 1 : Analyseur CSV avancé

In [None]:
# TODO: Implémenter cette classe
class AdvancedCSVAnalyzer:
    """
    Votre mission : Créer un analyseur CSV qui peut :
    1. Détecter automatiquement le délimiteur (,;|\t)
    2. Détecter l'encodage du fichier
    3. Gérer les fichiers avec ou sans en-têtes
    4. Fournir des statistiques sur chaque colonne
    5. Détecter les types de données
    """

    def __init__(self, filepath):
        self.filepath = filepath
        # TODO: Implémenter

    def detect_delimiter(self):
        """Détecte automatiquement le délimiteur utilisé"""
        # TODO: Lire quelques lignes et analyser
        pass

    def detect_encoding(self):
        """Détecte l'encodage du fichier"""
        # TODO: Essayer différents encodages
        pass

    def analyze_columns(self):
        """Analyse chaque colonne (type, valeurs uniques, etc.)"""
        # TODO: Parcourir et analyser
        pass

    def generate_report(self):
        """Génère un rapport complet sur le fichier"""
        # TODO: Compiler toutes les analyses
        pass

# Test
# analyzer = AdvancedCSVAnalyzer('data/votre_fichier.csv')
# report = analyzer.generate_report()
# print(report)

### Défi 2 : Multi-API Aggregator

In [None]:
# TODO: Implémenter
class MultiAPIAggregator:
    """
    Votre mission : Créer un agrégateur qui peut :
    1. Interroger plusieurs APIs en parallèle
    2. Normaliser les formats de réponse différents
    3. Fusionner les résultats intelligemment
    4. Gérer les erreurs par API
    5. Respecter les rate limits de chaque API
    """

    def __init__(self):
        self.apis = {}
        # TODO: Implémenter

    def register_api(self, name, config):
        """Enregistre une nouvelle API"""
        # config = {
        #     'base_url': '...',
        #     'rate_limit': 60,  # requêtes par minute
        #     'normalizer': function,  # pour normaliser les réponses
        # }
        pass

    def query_all(self, search_term):
        """Interroge toutes les APIs enregistrées"""
        # TODO: Utiliser threading ou asyncio
        pass

    def merge_results(self, results):
        """Fusionne intelligemment les résultats"""
        # TODO: Dédupliquer, scorer, trier
        pass

# Exemple d'utilisation attendu :
# aggregator = MultiAPIAggregator()
# aggregator.register_api('github', {...})
# aggregator.register_api('gitlab', {...})
# results = aggregator.query_all('python scraping')

### Défi 3 : Smart Web Scraper

In [None]:
# TODO: Implémenter
class SmartWebScraper:
    """
    Votre mission : Créer un scraper intelligent qui peut :
    1. Détecter automatiquement la structure d'une page
    2. Identifier les patterns de données (listes, tableaux, etc.)
    3. S'adapter aux changements de structure
    4. Générer des sélecteurs CSS optimaux
    5. Exporter dans plusieurs formats
    """

    def __init__(self, url):
        self.url = url
        # TODO: Implémenter

    def analyze_structure(self):
        """Analyse la structure de la page"""
        # TODO: Identifier les patterns répétitifs
        pass

    def generate_selectors(self):
        """Génère automatiquement les meilleurs sélecteurs"""
        # TODO: Trouver les sélecteurs les plus stables
        pass

    def extract_data(self):
        """Extrait les données identifiées"""
        # TODO: Utiliser les sélecteurs générés
        pass

    def monitor_changes(self):
        """Monitore les changements de structure"""
        # TODO: Comparer avec version précédente
        pass

# Test
# scraper = SmartWebScraper('https://example.com/products')
# scraper.analyze_structure()
# data = scraper.extract_data()

### Challenge Final : Pipeline de veille concurrentielle

In [None]:
"""
CHALLENGE FINAL : Créer un système complet de veille concurrentielle

Objectif : Surveiller automatiquement les prix/produits de plusieurs sites

Fonctionnalités requises :
1. Configuration par fichier YAML/JSON
2. Support de multiples sources (API, Web, CSV)
3. Détection automatique des changements
4. Alertes (email, webhook, etc.)
5. Dashboard de visualisation
6. Historique des données
7. Export automatique

Structure suggérée :
competitive_intelligence/
├── config/
│   └── sources.yaml
├── extractors/
│   ├── api_extractor.py
│   ├── web_extractor.py
│   └── csv_extractor.py
├── processors/
│   ├── normalizer.py
│   ├── comparator.py
│   └── alerting.py
├── storage/
│   ├── database.py
│   └── cache.py
├── dashboard/
│   └── visualizer.py
└── main.py

Bonus :
- Interface web (Flask/Streamlit)
- Scheduling automatique (cron)
- Machine Learning pour prédictions
- API REST pour accéder aux données
"""

# Commencez ici :
class CompetitiveIntelligence:
    def __init__(self, config_file):
        self.config = self.load_config(config_file)
        self.sources = []
        self.processors = []
        self.storage = None

    def load_config(self, config_file):
        """Charge la configuration depuis un fichier"""
        # TODO: Implémenter
        pass

    def run(self):
        """Lance le pipeline complet"""
        # TODO: Implémenter
        pass

# Bonne chance ! 🚀

---

## 📚 Ressources et conseils finaux

### Meilleures pratiques à retenir

1. **Toujours respecter les sites web**
   - Vérifier robots.txt
   - Ajouter des délais entre requêtes
   - Utiliser un User-Agent descriptif

2. **Gérer les erreurs gracieusement**
   - Try/except partout
   - Logging détaillé
   - Retry avec backoff exponentiel

3. **Optimiser les performances**
   - Cache intelligent
   - Requêtes asynchrones quand possible
   - Batch processing

4. **Maintenir la qualité des données**
   - Validation à chaque étape
   - Tests unitaires
   - Monitoring continu

### Prochaines étapes

1. **Approfondir les bases de données**
   - SQLite pour le stockage local
   - PostgreSQL pour la production
   - MongoDB pour les données non-structurées

2. **Explorer les outils avancés**
   - Scrapy pour le web scraping industriel
   - Selenium pour les sites JavaScript
   - Apache Airflow pour l'orchestration

3. **Apprendre l'analyse de données**
   - Pandas pour la manipulation
   - Matplotlib/Seaborn pour la visualisation
   - Scikit-learn pour le ML

### Commandes utiles


In [None]:
# Vérifier robots.txt
curl https://example.com/robots.txt

# Tester une API
curl -X GET "https://api.example.com/endpoint" -H "accept: application/json"

# Monitorer les requêtes
netstat -an | grep :80

# Analyser un fichier CSV
head -n 10 file.csv | column -t -s ','

---

🎉 **Félicitations !** Vous avez maintenant toutes les bases pour devenir un expert en Data Sourcing !

N'oubliez pas : la pratique est la clé. Commencez par de petits projets et augmentez progressivement la complexité.

Bon code ! 🐍✨