In [None]:
"""
Script am√©lior√© de collecte de donn√©es - INSD Burkina Faso
Collecte des indicateurs statistiques depuis burkinafaso.opendataforafrica.org
"""

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import logging
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Set, Optional
from datetime import datetime
import json
from pathlib import Path

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'scraping_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Configuration
class Config:
    BASE_URL = "https://www.insd.bf/sites/default/files/2024-08/"
    ENTRY_POINTS = [
        "",
        "fr/definitions-concept",
        "fr/methodes",
        "fr/resultats",
        "fr/data",
    ]
    HEADERS = {
        "User-Agent": "Mozilla/5.0 (compatible; DataScraper/1.0; +https://example.org/bot)",
        "Accept-Language": "fr-FR,fr;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    }
    TIMEOUT = 20
    DELAY_BETWEEN_REQUESTS = 1.5
    MAX_RETRIES = 3
    OUTPUT_DIR = Path("output")
    

class DataScraper:
    """Classe principale pour le scraping de donn√©es INSD"""
    
    def __init__(self, config: Config = Config()):
        self.config = config
        self.session = requests.Session()
        self.session.headers.update(config.HEADERS)
        self.visited_urls: Set[str] = set()
        self.all_records: List[Dict] = []
        
        # Cr√©er le dossier de sortie
        self.config.OUTPUT_DIR.mkdir(exist_ok=True)
    
    def fetch_page(self, url: str, retry_count: int = 0) -> Optional[str]:
        """
        R√©cup√®re le contenu HTML d'une page avec gestion des erreurs et retry
        """
        if url in self.visited_urls:
            logger.debug(f"URL d√©j√† visit√©e: {url}")
            return None
            
        try:
            logger.info(f"Chargement de: {url}")
            resp = self.session.get(url, timeout=self.config.TIMEOUT)
            resp.raise_for_status()
            self.visited_urls.add(url)
            return resp.text
            
        except requests.Timeout:
            logger.warning(f"Timeout pour {url}")
            if retry_count < self.config.MAX_RETRIES:
                time.sleep(self.config.DELAY_BETWEEN_REQUESTS * 2)
                return self.fetch_page(url, retry_count + 1)
                
        except requests.HTTPError as e:
            logger.error(f"Erreur HTTP {e.response.status_code} pour {url}")
            
        except requests.RequestException as e:
            logger.error(f"√âchec du chargement {url}: {e}")
            if retry_count < self.config.MAX_RETRIES:
                time.sleep(self.config.DELAY_BETWEEN_REQUESTS * 2)
                return self.fetch_page(url, retry_count + 1)
        
        return None
    
    def is_valid_url(self, url: str) -> bool:
        """V√©rifie si l'URL est valide et appartient au domaine cible"""
        try:
            parsed = urlparse(url)
            base_parsed = urlparse(self.config.BASE_URL)
            return parsed.netloc == base_parsed.netloc or parsed.netloc == ''
        except:
            return False
    
    def extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
        """
        Extrait les liens pertinents d'une page
        """
        links = []
        keywords = [
            '/publication/', '/statistique/', '/data/', '/indicator/',
            'rapport', 'enquete', 'recensement', 'annuaire'
        ]
        
        for a in soup.select("a[href]"):
            href = a.get('href', '')
            text = a.get_text(strip=True).lower()
            
            # V√©rifier si le lien est pertinent
            is_relevant = any(kw in href.lower() or kw in text for kw in keywords)
            
            if is_relevant and href:
                full_url = urljoin(base_url, href)
                if self.is_valid_url(full_url) and full_url not in self.visited_urls:
                    links.append(full_url)
        
        return list(set(links))  # √âliminer les doublons
    
    def parse_table_data(self, table) -> List[Dict]:
        """
        Parse un tableau HTML et extrait les donn√©es
        """
        data = []
        rows = table.select("tr")
        
        # Essayer de d√©tecter les en-t√™tes
        headers = []
        first_row = rows[0] if rows else None
        
        if first_row:
            header_cells = first_row.find_all(['th', 'td'])
            headers = [cell.get_text(strip=True) for cell in header_cells]
        
        # Parser les donn√©es
        for row in rows[1:] if headers else rows:
            cells = row.find_all(['td', 'th'])
            if len(cells) >= 2:
                if headers and len(cells) == len(headers):
                    # Utiliser les en-t√™tes comme cl√©s
                    row_data = {headers[i]: cells[i].get_text(strip=True) 
                               for i in range(len(cells))}
                    data.append(row_data)
                else:
                    # Format cl√©-valeur simple
                    label = cells[0].get_text(strip=True)
                    value = cells[1].get_text(strip=True)
                    if label and value:
                        data.append({"indicateur": label, "valeur": value})
        
        return data
    
    def parse_indicators(self, soup: BeautifulSoup) -> List[Dict]:
        """
        Extrait les indicateurs d'une page
        """
        data = []
        
        # Chercher dans les tableaux
        for table in soup.select("table"):
            table_data = self.parse_table_data(table)
            data.extend(table_data)
        
        # Chercher dans les divs avec classes sp√©cifiques
        for bloc in soup.select(".indicator, .data-block, .statistics, .metric"):
            title = bloc.select_one(".title, h3, h4, strong")
            value = bloc.select_one(".value, .number, .data")
            
            if title and value:
                data.append({
                    "indicateur": title.get_text(strip=True),
                    "valeur": value.get_text(strip=True)
                })
        
        # Chercher des listes de d√©finitions
        for dl in soup.select("dl"):
            terms = dl.select("dt")
            descriptions = dl.select("dd")
            
            for term, desc in zip(terms, descriptions):
                data.append({
                    "indicateur": term.get_text(strip=True),
                    "valeur": desc.get_text(strip=True)
                })
        
        return data
    
    def process_page(self, url: str) -> List[Dict]:
        """
        Traite une page compl√®te et retourne les donn√©es extraites
        """
        html = self.fetch_page(url)
        if not html:
            return []
        
        soup = BeautifulSoup(html, "html.parser")
        indicators = self.parse_indicators(soup)
        
        # Ajouter les m√©tadonn√©es
        for record in indicators:
            record['source_url'] = url
            record['date_extraction'] = datetime.now().isoformat()
        
        logger.info(f"Extrait {len(indicators)} indicateurs de {url}")
        return indicators
    
    def scrape(self) -> pd.DataFrame:
        """
        Lance le processus de scraping complet
        """
        logger.info("D√©marrage du scraping...")
        
        for path in self.config.ENTRY_POINTS:
            url = urljoin(self.config.BASE_URL, path)
            
            # Traiter la page principale
            records = self.process_page(url)
            self.all_records.extend(records)
            
            # R√©cup√©rer et traiter les sous-pages
            html = self.fetch_page(url)
            if html:
                soup = BeautifulSoup(html, "html.parser")
                sub_links = self.extract_links(soup, url)
                
                logger.info(f"Trouv√© {len(sub_links)} sous-pages √† explorer")
                
                for link in sub_links[:50]:  # Limiter pour √©viter trop de requ√™tes
                    time.sleep(self.config.DELAY_BETWEEN_REQUESTS)
                    sub_records = self.process_page(link)
                    self.all_records.extend(sub_records)
            
            time.sleep(self.config.DELAY_BETWEEN_REQUESTS)
        
        logger.info(f"Scraping termin√©. Total: {len(self.all_records)} enregistrements")
        return self.create_dataframe()
    
    def create_dataframe(self) -> pd.DataFrame:
        """
        Cr√©e un DataFrame √† partir des donn√©es collect√©es
        """
        if not self.all_records:
            logger.warning("Aucune donn√©e collect√©e")
            return pd.DataFrame()
        
        df = pd.DataFrame(self.all_records)
        
        # Nettoyage des donn√©es
        if 'valeur' in df.columns:
            # Supprimer les espaces multiples
            df['valeur'] = df['valeur'].str.replace(r'\s+', ' ', regex=True)
        
        # Supprimer les doublons
        df = df.drop_duplicates()
        
        return df
    
    def export_data(self, df: pd.DataFrame):
        """
        Exporte les donn√©es dans plusieurs formats
        """
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        if df.empty:
            logger.warning("Aucune donn√©e √† exporter")
            return
        
        # Export CSV
        csv_path = self.config.OUTPUT_DIR / f"insd_data_{timestamp}.csv"
        df.to_csv(csv_path, index=False, encoding='utf-8-sig')
        logger.info(f"Export CSV: {csv_path}")
        
        # Export JSON
        json_path = self.config.OUTPUT_DIR / f"insd_data_{timestamp}.json"
        df.to_json(json_path, orient='records', force_ascii=False, indent=2)
        logger.info(f"Export JSON: {json_path}")
        
        # Export Excel
        try:
            excel_path = self.config.OUTPUT_DIR / f"insd_data_{timestamp}.xlsx"
            df.to_excel(excel_path, index=False, engine='openpyxl')
            logger.info(f"Export Excel: {excel_path}")
        except ImportError:
            logger.warning("openpyxl non install√©, export Excel ignor√©")
        
        # Statistiques
        stats = {
            "total_records": len(df),
            "columns": list(df.columns),
            "sources_uniques": df['source_url'].nunique() if 'source_url' in df.columns else 0,
            "date_extraction": timestamp
        }
        
        stats_path = self.config.OUTPUT_DIR / f"stats_{timestamp}.json"
        with open(stats_path, 'w', encoding='utf-8') as f:
            json.dump(stats, f, indent=2, ensure_ascii=False)
        logger.info(f"Statistiques: {stats_path}")
        
        return csv_path


def main():
    """Point d'entr√©e principal"""
    try:
        scraper = DataScraper()
        df = scraper.scrape()
        
        if not df.empty:
            scraper.export_data(df)
            print(f"\n‚úÖ Scraping termin√© avec succ√®s!")
            print(f"üìä {len(df)} enregistrements collect√©s")
            print(f"üìÅ Fichiers sauvegard√©s dans: {scraper.config.OUTPUT_DIR}")
        else:
            print("‚ö†Ô∏è Aucune donn√©e extraite")
            
    except KeyboardInterrupt:
        logger.info("Interruption par l'utilisateur")
    except Exception as e:
        logger.error(f"Erreur fatale: {e}", exc_info=True)
        raise


if __name__ == "__main__":
    main()

2025-11-01 10:27:05,799 - INFO - D√©marrage du scraping...


2025-11-01 10:27:05,801 - INFO - Chargement de: https://insd.bf
2025-11-01 10:27:06,841 - ERROR - √âchec du chargement https://insd.bf: HTTPSConnectionPool(host='insd.bf', port=443): Max retries exceeded with url: / (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x0000029102AA7250>: Failed to resolve 'insd.bf' ([Errno 11001] getaddrinfo failed)"))
2025-11-01 10:27:09,843 - INFO - Chargement de: https://insd.bf
2025-11-01 10:27:09,849 - ERROR - √âchec du chargement https://insd.bf: HTTPSConnectionPool(host='insd.bf', port=443): Max retries exceeded with url: / (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x0000029102AA79D0>: Failed to resolve 'insd.bf' ([Errno 11001] getaddrinfo failed)"))
2025-11-01 10:27:12,851 - INFO - Chargement de: https://insd.bf
2025-11-01 10:27:13,190 - ERROR - √âchec du chargement https://insd.bf: HTTPSConnectionPool(host='insd.bf', port=443): Max retries exceeded with url: / (Caused by NameResolut

‚ö†Ô∏è Aucune donn√©e extraite


In [14]:
"""
Script am√©lior√© de collecte de donn√©es - INSD Burkina Faso
Collecte des indicateurs statistiques depuis burkinafaso.opendataforafrica.org
Inclut l'extraction de donn√©es depuis les PDFs
"""

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import logging
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Set, Optional, Tuple
from datetime import datetime
import json
from pathlib import Path
import re

# Imports pour le traitement des PDFs
try:
    import PyPDF2
    PDF_SUPPORT = True
except ImportError:
    PDF_SUPPORT = False
    logging.warning("PyPDF2 non install√©. Pour activer l'extraction PDF: pip install PyPDF2")

try:
    import pdfplumber
    PDFPLUMBER_SUPPORT = True
except ImportError:
    PDFPLUMBER_SUPPORT = False
    logging.warning("pdfplumber non install√©. Pour une meilleure extraction: pip install pdfplumber")

try:
    import tabula
    TABULA_SUPPORT = True
except ImportError:
    TABULA_SUPPORT = False
    logging.warning("tabula-py non install√©. Pour l'extraction de tableaux: pip install tabula-py")

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'scraping_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Configuration
class Config:
    BASE_URL = "https://www.insd.bf/sites/default/files/2024-08/"
    ENTRY_POINTS = [
        "",
        
    ]
    HEADERS = {
        "User-Agent": "Mozilla/5.0 (compatible; DataScraper/1.0; +https://example.org/bot)",
        "Accept-Language": "fr-FR,fr;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    }
    TIMEOUT = 20
    DELAY_BETWEEN_REQUESTS = 1.5
    MAX_RETRIES = 3
    OUTPUT_DIR = Path("output")
    PDF_DIR = Path("output/pdfs")
    EXTRACT_PDFS = True
    MAX_PDFS = 20  # Limite de PDFs √† t√©l√©charger
    

class PDFExtractor:
    """Classe pour l'extraction de donn√©es depuis les PDFs"""
    
    def __init__(self, pdf_path: Path):
        self.pdf_path = pdf_path
        self.text_content = ""
        self.tables = []
        self.metadata = {}
    
    def extract_with_pypdf2(self) -> str:
        """Extraction de texte basique avec PyPDF2"""
        if not PDF_SUPPORT:
            return ""
        
        try:
            text = ""
            with open(self.pdf_path, 'rb') as file:
                reader = PyPDF2.PdfReader(file)
                self.metadata = {
                    'pages': len(reader.pages),
                    'metadata': reader.metadata
                }
                
                for page in reader.pages:
                    text += page.extract_text() + "\n"
            
            return text
        except Exception as e:
            logger.error(f"Erreur PyPDF2 pour {self.pdf_path}: {e}")
            return ""
    
    def extract_with_pdfplumber(self) -> Tuple[str, List[pd.DataFrame]]:
        """Extraction avanc√©e avec pdfplumber (texte + tableaux)"""
        if not PDFPLUMBER_SUPPORT:
            return "", []
        
        try:
            text = ""
            tables = []
            
            with pdfplumber.open(self.pdf_path) as pdf:
                self.metadata['pages'] = len(pdf.pages)
                
                for page in pdf.pages:
                    # Extraire le texte
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + "\n"
                    
                    # Extraire les tableaux
                    page_tables = page.extract_tables()
                    for table in page_tables:
                        if table:
                            df = pd.DataFrame(table[1:], columns=table[0])
                            tables.append(df)
            
            return text, tables
        except Exception as e:
            logger.error(f"Erreur pdfplumber pour {self.pdf_path}: {e}")
            return "", []
    
    def extract_with_tabula(self) -> List[pd.DataFrame]:
        """Extraction de tableaux avec tabula-py"""
        if not TABULA_SUPPORT:
            return []
        
        try:
            tables = tabula.read_pdf(
                str(self.pdf_path),
                pages='all',
                multiple_tables=True,
                pandas_options={'header': 'infer'}
            )
            return tables
        except Exception as e:
            logger.error(f"Erreur tabula pour {self.pdf_path}: {e}")
            return []
    
    def extract_indicators_from_text(self, text: str) -> List[Dict]:
        """Extrait des indicateurs statistiques du texte"""
        indicators = []
        
        # Patterns pour d√©tecter les indicateurs
        patterns = [
            # Format: "Indicateur : valeur"
            r'([A-Z√Ä-≈∏][^:]{10,100})\s*:\s*([0-9.,\s%]+(?:\s*[A-Za-z√Ä-√ø]+)?)',
            # Format: "- Indicateur: valeur"
            r'-\s*([A-Z√Ä-≈∏][^:]{10,80})\s*:\s*([0-9.,\s%]+)',
            # Format: tableaux simples "Description | Valeur"
            r'([A-Z√Ä-≈∏][^\|]{10,80})\s*\|\s*([0-9.,\s%]+)',
        ]
        
        for pattern in patterns:
            matches = re.finditer(pattern, text, re.MULTILINE | re.IGNORECASE)
            for match in matches:
                label = match.group(1).strip()
                value = match.group(2).strip()
                
                # Filtrer les r√©sultats non pertinents
                if len(label) > 15 and len(value) > 0:
                    indicators.append({
                        'indicateur': label,
                        'valeur': value,
                        'type': 'text_extraction'
                    })
        
        return indicators
    
    def extract_all(self) -> Dict:
        """Extraction compl√®te du PDF avec toutes les m√©thodes disponibles"""
        logger.info(f"Extraction du PDF: {self.pdf_path.name}")
        
        result = {
            'filename': self.pdf_path.name,
            'text': "",
            'tables': [],
            'indicators': [],
            'metadata': {}
        }
        
        # Essayer pdfplumber en premier (le plus complet)
        if PDFPLUMBER_SUPPORT:
            text, tables = self.extract_with_pdfplumber()
            result['text'] = text
            result['tables'] = tables
            logger.info(f"pdfplumber: {len(text)} chars, {len(tables)} tableaux")
        
        # Si pdfplumber n'est pas disponible, utiliser PyPDF2
        elif PDF_SUPPORT:
            text = self.extract_with_pypdf2()
            result['text'] = text
            logger.info(f"PyPDF2: {len(text)} chars")
        
        # Essayer tabula pour les tableaux (compl√©ment)
        if TABULA_SUPPORT and not result['tables']:
            tabula_tables = self.extract_with_tabula()
            result['tables'].extend(tabula_tables)
            logger.info(f"tabula: {len(tabula_tables)} tableaux suppl√©mentaires")
        
        # Extraire les indicateurs du texte
        if result['text']:
            indicators = self.extract_indicators_from_text(result['text'])
            result['indicators'] = indicators
            logger.info(f"Indicateurs extraits: {len(indicators)}")
        
        result['metadata'] = self.metadata
        
        return result


class DataScraper:
    """Classe principale pour le scraping de donn√©es INSD"""
    
    def __init__(self, config: Config = Config()):
        self.config = config
        self.session = requests.Session()
        self.session.headers.update(config.HEADERS)
        self.visited_urls: Set[str] = set()
        self.all_records: List[Dict] = []
        self.pdf_data: List[Dict] = []
        self.downloaded_pdfs: List[Path] = []
        
        # Cr√©er les dossiers de sortie
        self.config.OUTPUT_DIR.mkdir(exist_ok=True)
        self.config.PDF_DIR.mkdir(exist_ok=True)
    
    def fetch_page(self, url: str, retry_count: int = 0) -> Optional[str]:
        """R√©cup√®re le contenu HTML d'une page avec gestion des erreurs et retry"""
        if url in self.visited_urls:
            logger.debug(f"URL d√©j√† visit√©e: {url}")
            return None
            
        try:
            logger.info(f"Chargement de: {url}")
            resp = self.session.get(url, timeout=self.config.TIMEOUT)
            resp.raise_for_status()
            self.visited_urls.add(url)
            return resp.text
            
        except requests.Timeout:
            logger.warning(f"Timeout pour {url}")
            if retry_count < self.config.MAX_RETRIES:
                time.sleep(self.config.DELAY_BETWEEN_REQUESTS * 2)
                return self.fetch_page(url, retry_count + 1)
                
        except requests.HTTPError as e:
            logger.error(f"Erreur HTTP {e.response.status_code} pour {url}")
            
        except requests.RequestException as e:
            logger.error(f"√âchec du chargement {url}: {e}")
            if retry_count < self.config.MAX_RETRIES:
                time.sleep(self.config.DELAY_BETWEEN_REQUESTS * 2)
                return self.fetch_page(url, retry_count + 1)
        
        return None
    
    def download_pdf(self, url: str, filename: Optional[str] = None) -> Optional[Path]:
        """T√©l√©charge un PDF"""
        try:
            logger.info(f"T√©l√©chargement PDF: {url}")
            resp = self.session.get(url, timeout=self.config.TIMEOUT * 2)
            resp.raise_for_status()
            
            # D√©terminer le nom du fichier
            if not filename:
                filename = url.split('/')[-1]
                if not filename.endswith('.pdf'):
                    filename += '.pdf'
            
            # Nettoyer le nom de fichier
            filename = re.sub(r'[^\w\-_\. ]', '_', filename)
            filepath = self.config.PDF_DIR / filename
            
            # Sauvegarder le PDF
            with open(filepath, 'wb') as f:
                f.write(resp.content)
            
            logger.info(f"PDF t√©l√©charg√©: {filepath}")
            return filepath
            
        except Exception as e:
            logger.error(f"√âchec du t√©l√©chargement PDF {url}: {e}")
            return None
    
    def is_valid_url(self, url: str) -> bool:
        """V√©rifie si l'URL est valide et appartient au domaine cible"""
        try:
            parsed = urlparse(url)
            base_parsed = urlparse(self.config.BASE_URL)
            return parsed.netloc == base_parsed.netloc or parsed.netloc == ''
        except:
            return False
    
    def extract_pdf_links(self, soup: BeautifulSoup, base_url: str) -> List[Tuple[str, str]]:
        """Extrait les liens vers des PDFs avec leur titre"""
        pdf_links = []
        
        for a in soup.select("a[href]"):
            href = a.get('href', '')
            text = a.get_text(strip=True)
            
            # V√©rifier si c'est un PDF
            is_pdf = (
                href.lower().endswith('.pdf') or
                'pdf' in href.lower() or
                'document' in href.lower() or
                'rapport' in text.lower() or
                'publication' in text.lower()
            )
            
            if is_pdf:
                full_url = urljoin(base_url, href)
                if self.is_valid_url(full_url) or full_url.endswith('.pdf'):
                    pdf_links.append((full_url, text))
        
        return pdf_links
    
    def extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
        """Extrait les liens pertinents d'une page"""
        links = []
        keywords = [
            '/publication/', '/statistique/', '/data/', '/indicator/',
            'rapport', 'enquete', 'recensement', 'annuaire'
        ]
        
        for a in soup.select("a[href]"):
            href = a.get('href', '')
            text = a.get_text(strip=True).lower()
            
            # V√©rifier si le lien est pertinent
            is_relevant = any(kw in href.lower() or kw in text for kw in keywords)
            
            if is_relevant and href and not href.lower().endswith('.pdf'):
                full_url = urljoin(base_url, href)
                if self.is_valid_url(full_url) and full_url not in self.visited_urls:
                    links.append(full_url)
        
        return list(set(links))  # √âliminer les doublons
    
    def parse_table_data(self, table) -> List[Dict]:
        """Parse un tableau HTML et extrait les donn√©es"""
        data = []
        rows = table.select("tr")
        
        # Essayer de d√©tecter les en-t√™tes
        headers = []
        first_row = rows[0] if rows else None
        
        if first_row:
            header_cells = first_row.find_all(['th', 'td'])
            headers = [cell.get_text(strip=True) for cell in header_cells]
        
        # Parser les donn√©es
        for row in rows[1:] if headers else rows:
            cells = row.find_all(['td', 'th'])
            if len(cells) >= 2:
                if headers and len(cells) == len(headers):
                    row_data = {headers[i]: cells[i].get_text(strip=True) 
                               for i in range(len(cells))}
                    data.append(row_data)
                else:
                    label = cells[0].get_text(strip=True)
                    value = cells[1].get_text(strip=True)
                    if label and value:
                        data.append({"indicateur": label, "valeur": value})
        
        return data
    
    def parse_indicators(self, soup: BeautifulSoup) -> List[Dict]:
        """Extrait les indicateurs d'une page"""
        data = []
        
        # Chercher dans les tableaux
        for table in soup.select("table"):
            table_data = self.parse_table_data(table)
            data.extend(table_data)
        
        # Chercher dans les divs avec classes sp√©cifiques
        for bloc in soup.select(".indicator, .data-block, .statistics, .metric"):
            title = bloc.select_one(".title, h3, h4, strong")
            value = bloc.select_one(".value, .number, .data")
            
            if title and value:
                data.append({
                    "indicateur": title.get_text(strip=True),
                    "valeur": value.get_text(strip=True)
                })
        
        # Chercher des listes de d√©finitions
        for dl in soup.select("dl"):
            terms = dl.select("dt")
            descriptions = dl.select("dd")
            
            for term, desc in zip(terms, descriptions):
                data.append({
                    "indicateur": term.get_text(strip=True),
                    "valeur": desc.get_text(strip=True)
                })
        
        return data
    
    def process_page(self, url: str) -> List[Dict]:
        """Traite une page compl√®te et retourne les donn√©es extraites"""
        html = self.fetch_page(url)
        if not html:
            return []
        
        soup = BeautifulSoup(html, "html.parser")
        indicators = self.parse_indicators(soup)
        
        # Ajouter les m√©tadonn√©es
        for record in indicators:
            record['source_url'] = url
            record['source_type'] = 'html'
            record['date_extraction'] = datetime.now().isoformat()
        
        logger.info(f"Extrait {len(indicators)} indicateurs de {url}")
        return indicators
    
    def process_pdfs(self):
        """Traite tous les PDFs t√©l√©charg√©s"""
        if not self.downloaded_pdfs:
            logger.info("Aucun PDF √† traiter")
            return
        
        logger.info(f"Traitement de {len(self.downloaded_pdfs)} PDFs...")
        
        for pdf_path in self.downloaded_pdfs:
            try:
                extractor = PDFExtractor(pdf_path)
                pdf_data = extractor.extract_all()
                
                # Ajouter les indicateurs extraits
                for indicator in pdf_data['indicators']:
                    indicator['source_file'] = pdf_path.name
                    indicator['source_type'] = 'pdf'
                    indicator['date_extraction'] = datetime.now().isoformat()
                
                self.all_records.extend(pdf_data['indicators'])
                
                # Ajouter les tableaux
                for i, table in enumerate(pdf_data['tables']):
                    if not table.empty:
                        for _, row in table.iterrows():
                            record = row.to_dict()
                            record['source_file'] = pdf_path.name
                            record['source_type'] = 'pdf_table'
                            record['table_index'] = i
                            record['date_extraction'] = datetime.now().isoformat()
                            self.all_records.append(record)
                
                self.pdf_data.append(pdf_data)
                
            except Exception as e:
                logger.error(f"Erreur lors du traitement de {pdf_path}: {e}")
    
    def scrape(self) -> pd.DataFrame:
        """Lance le processus de scraping complet"""
        logger.info("D√©marrage du scraping...")
        pdf_count = 0
        
        for path in self.config.ENTRY_POINTS:
            url = urljoin(self.config.BASE_URL, path)
            
            # Traiter la page principale
            records = self.process_page(url)
            self.all_records.extend(records)
            
            # R√©cup√©rer le contenu pour extraire les liens
            html = self.fetch_page(url)
            if html:
                soup = BeautifulSoup(html, "html.parser")
                
                # Extraire et t√©l√©charger les PDFs
                if self.config.EXTRACT_PDFS and pdf_count < self.config.MAX_PDFS:
                    pdf_links = self.extract_pdf_links(soup, url)
                    logger.info(f"Trouv√© {len(pdf_links)} PDFs potentiels")
                    
                    for pdf_url, pdf_title in pdf_links:
                        if pdf_count >= self.config.MAX_PDFS:
                            break
                        
                        time.sleep(self.config.DELAY_BETWEEN_REQUESTS)
                        pdf_path = self.download_pdf(pdf_url, pdf_title)
                        if pdf_path:
                            self.downloaded_pdfs.append(pdf_path)
                            pdf_count += 1
                
                # Extraire et traiter les sous-pages
                sub_links = self.extract_links(soup, url)
                logger.info(f"Trouv√© {len(sub_links)} sous-pages √† explorer")
                
                for link in sub_links[:50]:
                    time.sleep(self.config.DELAY_BETWEEN_REQUESTS)
                    sub_records = self.process_page(link)
                    self.all_records.extend(sub_records)
            
            time.sleep(self.config.DELAY_BETWEEN_REQUESTS)
        
        # Traiter les PDFs t√©l√©charg√©s
        if self.config.EXTRACT_PDFS:
            self.process_pdfs()
        
        logger.info(f"Scraping termin√©. Total: {len(self.all_records)} enregistrements")
        logger.info(f"PDFs trait√©s: {len(self.downloaded_pdfs)}")
        
        return self.create_dataframe()
    
    def create_dataframe(self) -> pd.DataFrame:
        """Cr√©e un DataFrame √† partir des donn√©es collect√©es"""
        if not self.all_records:
            logger.warning("Aucune donn√©e collect√©e")
            return pd.DataFrame()
        
        df = pd.DataFrame(self.all_records)
        
        # Nettoyage des donn√©es
        for col in df.columns:
            if df[col].dtype == 'object':
                df[col] = df[col].astype(str).str.replace(r'\s+', ' ', regex=True)
                df[col] = df[col].str.strip()
        
        # Supprimer les doublons
        df = df.drop_duplicates()
        
        return df
    
    def export_data(self, df: pd.DataFrame):
        """Exporte les donn√©es dans plusieurs formats"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        if df.empty:
            logger.warning("Aucune donn√©e √† exporter")
            return
        
        # Export CSV
        csv_path = self.config.OUTPUT_DIR / f"insd_data_{timestamp}.csv"
        df.to_csv(csv_path, index=False, encoding='utf-8-sig')
        logger.info(f"Export CSV: {csv_path}")
        
        # Export JSON
        json_path = self.config.OUTPUT_DIR / f"insd_data_{timestamp}.json"
        df.to_json(json_path, orient='records', force_ascii=False, indent=2)
        logger.info(f"Export JSON: {json_path}")
        
        # Export Excel
        try:
            excel_path = self.config.OUTPUT_DIR / f"insd_data_{timestamp}.xlsx"
            
            with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
                # Feuille principale avec toutes les donn√©es
                df.to_excel(writer, sheet_name='Toutes_donnees', index=False)
                
                # Feuilles s√©par√©es par type de source
                if 'source_type' in df.columns:
                    for source_type in df['source_type'].unique():
                        df_type = df[df['source_type'] == source_type]
                        sheet_name = f"Source_{source_type}"[:31]  # Limite Excel
                        df_type.to_excel(writer, sheet_name=sheet_name, index=False)
            
            logger.info(f"Export Excel: {excel_path}")
        except ImportError:
            logger.warning("openpyxl non install√©, export Excel ignor√©")
        except Exception as e:
            logger.error(f"Erreur export Excel: {e}")
        
        # Statistiques d√©taill√©es
        stats = {
            "total_records": len(df),
            "columns": list(df.columns),
            "date_extraction": timestamp,
            "pdfs_downloaded": len(self.downloaded_pdfs),
            "pdf_files": [p.name for p in self.downloaded_pdfs]
        }
        
        if 'source_type' in df.columns:
            stats['records_by_source'] = df['source_type'].value_counts().to_dict()
        
        if 'source_url' in df.columns:
            stats['sources_html_uniques'] = df[df.get('source_type', '') == 'html']['source_url'].nunique()
        
        stats_path = self.config.OUTPUT_DIR / f"stats_{timestamp}.json"
        with open(stats_path, 'w', encoding='utf-8') as f:
            json.dump(stats, f, indent=2, ensure_ascii=False)
        logger.info(f"Statistiques: {stats_path}")
        
        # Export des m√©tadonn√©es PDF
        if self.pdf_data:
            pdf_metadata = []
            for pdf in self.pdf_data:
                pdf_metadata.append({
                    'filename': pdf['filename'],
                    'text_length': len(pdf['text']),
                    'tables_count': len(pdf['tables']),
                    'indicators_count': len(pdf['indicators']),
                    'metadata': pdf['metadata']
                })
            
            pdf_meta_path = self.config.OUTPUT_DIR / f"pdf_metadata_{timestamp}.json"
            with open(pdf_meta_path, 'w', encoding='utf-8') as f:
                json.dump(pdf_metadata, f, indent=2, ensure_ascii=False)
            logger.info(f"M√©tadonn√©es PDF: {pdf_meta_path}")
        
        return csv_path


def main():
    """Point d'entr√©e principal"""
    
    # Afficher les d√©pendances disponibles
    print("üîç V√©rification des d√©pendances PDF:")
    print(f"  - PyPDF2: {'‚úÖ Install√©' if PDF_SUPPORT else '‚ùå Non install√©'}")
    print(f"  - pdfplumber: {'‚úÖ Install√©' if PDFPLUMBER_SUPPORT else '‚ùå Non install√©'}")
    print(f"  - tabula-py: {'‚úÖ Install√©' if TABULA_SUPPORT else '‚ùå Non install√©'}")
    print()
    
    if not any([PDF_SUPPORT, PDFPLUMBER_SUPPORT, TABULA_SUPPORT]):
        print("‚ö†Ô∏è  Aucune biblioth√®que PDF install√©e.")
        print("üì¶ Pour installer toutes les d√©pendances:")
        print("   pip install PyPDF2 pdfplumber tabula-py openpyxl")
        print()
        response = input("Continuer sans extraction PDF? (o/N): ")
        if response.lower() != 'o':
            return
        Config.EXTRACT_PDFS = False
    
    try:
        scraper = DataScraper()
        df = scraper.scrape()
        
        if not df.empty:
            scraper.export_data(df)
            print(f"\n‚úÖ Scraping termin√© avec succ√®s!")
            print(f"üìä {len(df)} enregistrements collect√©s")
            print(f"üìÑ {len(scraper.downloaded_pdfs)} PDFs t√©l√©charg√©s et trait√©s")
            print(f"üìÅ Fichiers sauvegard√©s dans: {scraper.config.OUTPUT_DIR}")
        else:
            print("‚ö†Ô∏è  Aucune donn√©e extraite")
            
    except KeyboardInterrupt:
        logger.info("Interruption par l'utilisateur")
    except Exception as e:
        logger.error(f"Erreur fatale: {e}", exc_info=True)
        raise


if __name__ == "__main__":
    main()

2025-11-01 10:35:13,861 - INFO - D√©marrage du scraping...
2025-11-01 10:35:13,862 - INFO - Chargement de: https://www.insd.bf/sites/default/files/2024-08/


üîç V√©rification des d√©pendances PDF:
  - PyPDF2: ‚úÖ Install√©
  - pdfplumber: ‚úÖ Install√©
  - tabula-py: ‚ùå Non install√©



2025-11-01 10:35:14,266 - ERROR - Erreur HTTP 404 pour https://www.insd.bf/sites/default/files/2024-08/
2025-11-01 10:35:14,268 - INFO - Chargement de: https://www.insd.bf/sites/default/files/2024-08/
2025-11-01 10:35:14,415 - ERROR - Erreur HTTP 404 pour https://www.insd.bf/sites/default/files/2024-08/
2025-11-01 10:35:15,917 - INFO - Aucun PDF √† traiter
2025-11-01 10:35:15,919 - INFO - Scraping termin√©. Total: 0 enregistrements
2025-11-01 10:35:15,920 - INFO - PDFs trait√©s: 0


‚ö†Ô∏è  Aucune donn√©e extraite


In [3]:
"""
main.py
Impl√©mentation RAG minimaliste, locale et open-source :
- Embeddings : sentence-transformers (all-MiniLM-L6-v2)
- Vector DB : FAISS + SQLite (sqlite-utils for convenience)
- LLM : llama-cpp-python (local ggml) OR transformers fallback

Endpoints:
- POST /ingest  -> {"docs": [{"id": "doc1", "text": "...", "meta": {...}}, ...]}
- POST /query   -> {"question": "..." , "k": 5}
"""

import os
import json
import sqlite3
from typing import List, Optional
#from pydantic import BaseModel
from fastapi import FastAPI, HTTPException
import numpy as np

# Embeddings
from sentence_transformers import SentenceTransformer

# FAISS
import faiss

# SQLite helper
from sqlite_utils import Database

# LLM (two options)
try:
    # Prefer llama-cpp-python if available (local ggml)
    from llama_cpp import Llama
    LLAMA_AVAILABLE = True
except Exception:
    LLAMA_AVAILABLE = False

try:
    from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
    TRANSFORMERS_AVAILABLE = True
except Exception:
    TRANSFORMERS_AVAILABLE = False

# --------------------------
# Configuration
# --------------------------
DATA_DIR = os.environ.get("RAG_DATA_DIR", "./rag_data")
os.makedirs(DATA_DIR, exist_ok=True)
FAISS_INDEX_PATH = os.path.join(DATA_DIR, "faiss.index")
SQLITE_PATH = os.path.join(DATA_DIR, "metastore.db")
EMBED_MODEL_NAME = os.environ.get("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
# LLM options (set via env or edit here)
LLAMA_GGML_PATH = os.environ.get("LLAMA_GGML_PATH", None)  # ex: "./models/ggml-model-q4_0.bin"
FALLBACK_TRANSFORMER = os.environ.get("FALLBACK_MODEL", "tiiuae/falcon-7b-instruct")  # user can change

EMBED_DIM = 384  # all-MiniLM-L6-v2 ‚Üí 384 dims

# --------------------------
# Init components
# --------------------------
# Embedding model
embedder = SentenceTransformer(EMBED_MODEL_NAME)

# SQLite metadata DB via sqlite-utils for convenience
db = Database(SQLITE_PATH)
if "documents" not in db.table_names():
    db["documents"].create({
        "id": str,
        "text": str,
        "meta": str,
        "embedding_id": int
    }, pk="id")

# FAISS index (IndexFlatIP for cosine-similarity with normalized vectors)
# We'll store normalized embeddings to use inner product as cosine-similarity
if os.path.exists(FAISS_INDEX_PATH):
    print("Loading FAISS index from disk...")
    index = faiss.read_index(FAISS_INDEX_PATH)
    # ensure index has correct dimension (assume embedded dimension matches)
else:
    index = faiss.IndexFlatIP(EMBED_DIM)
    # optionally wrap in IndexIDMap to keep stable ids
    index = faiss.IndexIDMap(index)

# Keep track of next embedding id
def get_next_embedding_id():
    cur = db["documents"].conn.execute("SELECT MAX(embedding_id) FROM documents").fetchone()[0]
    return (int(cur) + 1) if cur is not None else 0

# --------------------------
# LLM Setup
# --------------------------
llm_client = None
use_llama = False
if LLAMA_AVAILABLE and LLAMA_GGML_PATH and os.path.exists(LLAMA_GGML_PATH):
    print("Using llama-cpp-python with model:", LLAMA_GGML_PATH)
    llm_client = Llama(model_path=LLAMA_GGML_PATH)
    use_llama = True
elif TRANSFORMERS_AVAILABLE:
    # fallback: load a small-to-medium instruct model (user responsibility to provide one that fits RAM)
    print("Using transformers fallback model:", FALLBACK_TRANSFORMER)
    tokenizer = AutoTokenizer.from_pretrained(FALLBACK_TRANSFORMER)
    model = AutoModelForCausalLM.from_pretrained(FALLBACK_TRANSFORMER, device_map="auto")
    text_gen = pipeline("text-generation", model=model, tokenizer=tokenizer, device=0 if hasattr(model, "device") else -1)
else:
    print("No LLM backend available. Install llama-cpp-python or transformers.")

# --------------------------
# Utilities
# --------------------------
def normalize_embeddings(vectors: np.ndarray) -> np.ndarray:
    """Normalize rows to unit length (for cosine similarity using inner product)."""
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    norms[norms==0] = 1e-10
    return vectors / norms

def embed_texts(texts: List[str]) -> np.ndarray:
    embs = embedder.encode(texts, convert_to_numpy=True, show_progress_bar=False)
    if embs.ndim == 1:
        embs = embs.reshape(1, -1)
    return normalize_embeddings(embs)

def add_documents(docs: List[dict]):
    """
    docs: list of {"id": str, "text": str, "meta": dict}
    """
    if not docs:
        return
    texts = [d["text"] for d in docs]
    embeddings = embed_texts(texts)
    start_id = get_next_embedding_id()
    ids = np.arange(start_id, start_id + len(docs)).astype("int64")
    # add to FAISS
    index.add_with_ids(embeddings.astype("float32"), ids)
    # store metadata in sqlite
    for doc, emb_id in zip(docs, ids):
        db["documents"].insert({
            "id": doc["id"],
            "text": doc["text"],
            "meta": json.dumps(doc.get("meta", {}), ensure_ascii=False),
            "embedding_id": int(emb_id)
        }, replace=True)
    # persist faiss
    faiss.write_index(index, FAISS_INDEX_PATH)

def search(query: str, k: int = 5):
    q_emb = embed_texts([query]).astype("float32")
    if index.ntotal == 0:
        return []
    scores, ids = index.search(q_emb, k)
    scores = scores[0].tolist()
    ids = ids[0].tolist()
    results = []
    for sid, sc in zip(ids, scores):
        if sid == -1:
            continue
        r = db["documents"].get(sid, where="embedding_id = ?", columns=["id", "text", "meta", "embedding_id"])
        # Because we used embedding_id as numeric IDs, retrieve via query
        cur = db["documents"].conn.execute("SELECT id, text, meta, embedding_id FROM documents WHERE embedding_id = ?", (sid,))
        row = cur.fetchone()
        if not row:
            continue
        doc_id, text, meta_json, emb_id = row
        meta = json.loads(meta_json) if meta_json else {}
        results.append({
            "id": doc_id,
            "text": text,
            "meta": meta,
            "score": float(sc),
            "embedding_id": int(emb_id)
        })
    return results

# --------------------------
# Prompt assembly for RAG
# --------------------------
def build_rag_prompt(question: str, docs: List[dict], max_len_chars: int = 3000) -> str:
    """
    Assemble prompt: include question, retrieved docs (with source), and instructions.
    Trim docs if needed to fit max_len_chars.
    """
    instruction = (
        "Tu es un assistant utile. Utilise les documents fournis pour r√©pondre pr√©cis√©ment √† la question.\n"
        "Cite les sources sous forme [source:id] √† la fin de la r√©ponse.\n\n"
    )
    context_blocks = []
    total = 0
    for d in docs:
        block = f"[source:{d['id']}]\n{d['text']}\n\n"
        if total + len(block) > max_len_chars:
            break
        context_blocks.append(block)
        total += len(block)
    context = "\n".join(context_blocks)
    prompt = instruction + "Contexte :\n" + context + "\nQuestion : " + question + "\nR√©ponse :"
    return prompt

# --------------------------
# LLM Generator
# --------------------------
def generate_answer(prompt: str, max_tokens: int = 512, temperature: float = 0.2) -> str:
    if use_llama and llm_client:
        # llama-cpp-python interface
        resp = llm_client.create(prompt=prompt, max_tokens=max_tokens, temperature=temperature)
        return resp.get("choices", [{}])[0].get("text", "").strip()
    elif TRANSFORMERS_AVAILABLE:
        # transformers pipeline
        outputs = text_gen(prompt, max_length=len(prompt.split()) + max_tokens, do_sample=True, temperature=temperature, num_return_sequences=1)
        return outputs[0]["generated_text"][len(prompt):].strip()
    else:
        raise RuntimeError("Aucun backend LLM disponible. Installez llama-cpp-python (et un mod√®le ggml) ou transformers.")

# --------------------------
# FastAPI
# --------------------------
app = FastAPI(title="RAG Open-Source (FAISS + SentenceTransformers + LLM)")

class DocItem(BaseModel):
    id: str
    text: str
    meta: Optional[dict] = {}

class IngestRequest(BaseModel):
    docs: List[DocItem]

class QueryRequest(BaseModel):
    question: str
    k: Optional[int] = 5
    max_context_chars: Optional[int] = 3000

@app.post("/ingest")
def ingest(payload: IngestRequest):
    try:
        items = []
        for d in payload.docs:
            items.append({"id": d.id, "text": d.text, "meta": d.meta})
        add_documents(items)
        return {"status": "ok", "added": len(items)}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/query")
def query(payload: QueryRequest):
    if index.ntotal == 0:
        raise HTTPException(400, "Index vide ‚Äî ing√©rez d'abord des documents via /ingest.")
    results = search(payload.question, k=payload.k)
    if not results:
        return {"answer": "", "sources": [], "retrieved": []}
    prompt = build_rag_prompt(payload.question, results, max_len_chars=payload.max_context_chars)
    try:
        answer = generate_answer(prompt)
    except Exception as e:
        raise HTTPException(500, f"Erreur LLM: {e}")
    sources = [r["id"] for r in results]
    return {"answer": answer, "sources": sources, "retrieved": results}

# health
@app.get("/health")
def health():
    return {"status": "ok", "index_size": int(index.ntotal)}

# --------------------------
# Simple CLI ingestion helper (optional)
# --------------------------
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--run", action="store_true", help="Run API")
    parser.add_argument("--ingest-file", type=str, help="JSON file with docs list [{id,text,meta},...]")
    parser.add_argument("--host", default="0.0.0.0")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()

    if args.ingest_file:
        with open(args.ingest_file, "r", encoding="utf-8") as f:
            docs = json.load(f)
        add_documents(docs)
        print(f"Ing√©r√© {len(docs)} documents.")
    if args.run:
        import uvicorn
        uvicorn.run("main:app", host=args.host, port=args.port, reload=True)


#python -m venv venv
#venv\Scripts\activate.bat
#pip install -r requirements.txt

ModuleNotFoundError: No module named 'fastapi'