In [85]:
import re
import logging
import pandas as pd
import numpy as np
from typing import Dict, Optional, List, Any, Tuple
from datetime import datetime
import PyPDF2
from pathlib import Path

# Import Camelot
try:
    import camelot
    print(f"‚úÖ Camelot version: {camelot.__version__}")
except ImportError:
    print("‚ùå Camelot non install√©. Ex√©cutez: pip install camelot-py[cv]")
    camelot = None

class BaseExtractor:
    """Classe de base pour tous les extracteurs."""
    
    def __init__(self, logger: Optional[logging.Logger] = None):
        self.logger = logger or self._create_default_logger()

    def log(self, message: str, level: int = logging.INFO) -> None:
        if self.logger:
            self.logger.log(level, message)

    def _create_default_logger(self) -> logging.Logger:
        logger = logging.getLogger(f"{self.__class__.__name__}")
        
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            console_handler = logging.StreamHandler()
            console_handler.setLevel(logging.INFO)
            formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
            console_handler.setFormatter(formatter)
            logger.addHandler(console_handler)
        
        return logger

    def clean_text(self, text: str) -> str:
        """Nettoie et normalise le texte."""
        if not text or pd.isna(text):
            return ""
        cleaned = " ".join(str(text).split())
        replacements = {
            "\u2019": "'", "\u2018": "'", "\u201c": '"', "\u201d": '"',
            "\u2013": "-", "\u2014": "-", "\u00a0": " ",
        }
        for old, new in replacements.items():
            cleaned = cleaned.replace(old, new)
        return cleaned.strip()

‚úÖ Camelot version: 1.0.0


In [89]:
import re
import logging
import pandas as pd
import numpy as np
from typing import Dict, Optional, List, Any, Tuple
from datetime import datetime
import PyPDF2
from pathlib import Path

try:
    import camelot
    print(f"‚úÖ Camelot version: {camelot.__version__}")
except ImportError:
    print("‚ùå Camelot non install√©")
    camelot = None


class BaseExtractor:
    def __init__(self, logger: Optional[logging.Logger] = None):
        self.logger = logger or self._create_default_logger()

    def log(self, message: str, level: int = logging.INFO) -> None:
        if self.logger:
            self.logger.log(level, message)

    def _create_default_logger(self) -> logging.Logger:
        logger = logging.getLogger(f"{self.__class__.__name__}")
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            console_handler = logging.StreamHandler()
            console_handler.setLevel(logging.INFO)
            formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )
            console_handler.setFormatter(formatter)
            logger.addHandler(console_handler)
        return logger


class SmartGDSExtractor(BaseExtractor):
    """
    Extracteur GDS intelligent qui :
    1. Garde les dates comme headers
    2. Coupe la table √† FiO2=100
    3. Efface les colonnes vides par section
    4. Prend l'info la plus r√©cente avec le bon pourcentage FiO2
    """

    def __init__(self, logger: Optional[logging.Logger] = None, debug: bool = False):
        super().__init__(logger=logger)
        self.debug = debug

        if camelot is None:
            raise ImportError("Camelot requis: pip install camelot-py[cv]")

    def extract_gds_data_from_pdf(self, pdf_path: str, pages: str = "all") -> Dict[str, Any]:
        """Extrait les donn√©es GDS du PDF avec la nouvelle logique."""
        if not Path(pdf_path).exists():
            self.log(f"Fichier non trouv√©: {pdf_path}", level=logging.ERROR)
            return {}

        try:
            # Extraction Camelot
            tables = camelot.read_pdf(pdf_path, pages=pages, flavor='stream')
            self.log(f"Trouv√© {len(tables)} tables")

            # Trouve la table GDS principale
            gds_table = self._find_main_gds_table(tables)
            if gds_table is None:
                self.log("Aucune table GDS trouv√©e", level=logging.WARNING)
                return {}

            # Applique la nouvelle logique de traitement
            return self._process_gds_table_smart(gds_table)

        except Exception as e:
            self.log(f"Erreur extraction: {str(e)}", level=logging.ERROR)
            return {}

    def _find_main_gds_table(self, tables) -> Optional[object]:
        """Trouve la table GDS principale (la plus compl√®te)."""
        best_table = None
        best_score = 0

        for table in tables:
            df = table.df
            table_text = df.to_string().lower()

            # Score bas√© sur indicateurs GDS
            indicators = ['fio2', 'ph', 'paco2', 'pao2', 'sao2', 'peep', 'co3h', 'mmhg']
            score = sum(1 for ind in indicators if ind in table_text)

            # Bonus pour structure attendue
            if 'fio2<100' in table_text and 'fio2=100' in table_text:
                score += 5

            if score > best_score:
                best_score = score
                best_table = table

        if self.debug and best_table is not None:
            print(f"üéØ Table GDS s√©lectionn√©e (score: {best_score})")
            print(f"   Shape: {best_table.df.shape}")

        return best_table


    def _process_gds_table_smart(self, table) -> Dict[str, Any]:
        """Version corrig√©e avec gestion des cas sans split."""
        df = table.df.copy()

        if self.debug:
            print(f"\nüîç Traitement table GDS:")
            print(f"   Shape originale: {df.shape}")

        # √âtape 1: Extrait les headers (dates/heures)
        headers = self._extract_headers(df)

        # √âtape 2: Trouve la ligne de s√©paration FiO2=100
        split_row = self._find_fio2_100_split(df)

        if split_row is None:
            # Cas particulier : pas de split trouv√©, traite tout comme une section
            self.log("Aucun split FiO2=100 d√©tect√©, analyse comme section mixte", level=logging.WARNING)
            return self._process_mixed_section(df, headers)

        # √âtape 3: Divise la table
        fio2_less_100_df = df.iloc[:split_row].copy()
        fio2_100_df = df.iloc[split_row:].copy()

        # Important: Ajoute les headers aux deux sections
        if len(headers) > 0:
            header_rows = df.iloc[:2].copy()  # Lignes dates et heures
            
            # Pour FiO2=100, ajoute les headers au d√©but
            fio2_100_df = pd.concat([header_rows, fio2_100_df], ignore_index=True)

        if self.debug:
            print(f"   Split √† la ligne {split_row}")
            print(f"   FiO2<100 shape: {fio2_less_100_df.shape}")
            print(f"   FiO2=100 shape: {fio2_100_df.shape} (headers ajout√©s)")

        # √âtape 4: Traite chaque section si elle n'est pas vide
        all_data = []
        
        if len(fio2_less_100_df) > 0:
            fio2_less_100_clean, fio2_less_100_headers = self._remove_empty_columns_per_section(
                fio2_less_100_df, headers, "FiO2<100"
            )
            fio2_less_100_data = self._extract_section_data_smart(
                fio2_less_100_clean, fio2_less_100_headers, "FiO2<100"
            )
            all_data.extend(fio2_less_100_data)
        
        if len(fio2_100_df) > 0:
            fio2_100_clean, fio2_100_headers = self._remove_empty_columns_per_section(
                fio2_100_df, headers, "FiO2=100"
            )
            fio2_100_data = self._extract_section_data_smart(
                fio2_100_clean, fio2_100_headers, "FiO2=100"
            )
            all_data.extend(fio2_100_data)

        # √âtape 5: S√©lectionne les meilleures donn√©es
        if all_data:
            return self._select_best_data_smart(all_data)
        else:
            return {}
        


        
    def _process_mixed_section(self, df: pd.DataFrame, headers: List[Dict]) -> Dict[str, Any]:
        """Traite une table mixte en analysant les colonnes individuellement."""
        if self.debug:
            print("   üîÑ Analyse intelligente des colonnes par type FiO2")
        
        # Nettoie les colonnes vides
        df_clean, clean_headers = self._remove_empty_columns_per_section(df, headers, "Mixte")
        
        # NOUVELLE APPROCHE: Analyse chaque colonne individuellement
        all_data = []
        
        for header in clean_headers:
            col_idx = header['column_index']
            timestamp = f"{header['date']} {header['time']}".strip()
            
            if self.debug:
                print(f"   üìä Analyse colonne {col_idx} ({timestamp})")
            
            # D√©termine le type de cette colonne sp√©cifique
            column_type, fio2_percentage = self._analyze_column_type(df_clean, col_idx)
            
            if column_type and fio2_percentage:
                if self.debug:
                    print(f"      ‚Üí Type d√©tect√©: {column_type}, FiO2: {fio2_percentage}%")
                
                # Extrait les donn√©es pour cette colonne sp√©cifique
                column_data = self._extract_column_data(
                    df_clean, col_idx, timestamp, column_type, fio2_percentage
                )
                
                if column_data:
                    all_data.append(column_data)
            else:
                if self.debug:
                    print(f"      ‚Üí Aucune donn√©es dans cette colonne")
        
        if all_data:
            return self._select_best_data_smart(all_data)
        else:
            return {}
        




    def _extract_column_data(
        self, 
        df: pd.DataFrame, 
        col_idx: int, 
        timestamp: str, 
        fio2_type: str, 
        fio2_percentage: float
    ) -> Optional[Dict[str, Any]]:
        """Extrait les donn√©es m√©dicales d'une colonne sp√©cifique."""
        
        # Trouve les lignes de param√®tres
        param_rows = self._find_medical_parameter_rows(df)
        
        data_entry = {
            'timestamp': timestamp,
            'fio2_type': fio2_type,
            'fio2_percentage': fio2_percentage,
            'column_index': col_idx
        }
        
        extracted_params = []
        
        # Extrait chaque param√®tre
        for param_name, row_idx in param_rows.items():
            if row_idx < len(df) and col_idx < len(df.columns):
                numeric_value = self._extract_parameter_value_from_row(
                    df, row_idx, col_idx, param_name
                )
                
                if numeric_value is not None:
                    data_entry[param_name] = numeric_value
                    extracted_params.append(param_name)
        
        if len(extracted_params) >= 3:  # Au moins 3 param√®tres pour √™tre valide
            if self.debug:
                print(f"      ‚úÖ {timestamp} ({fio2_type}, {fio2_percentage}%): "
                    f"{len(extracted_params)} params [{', '.join(extracted_params)}]")
            return data_entry
        else:
            if self.debug:
                print(f"      ‚ùå {timestamp}: seulement {len(extracted_params)} params")
            return None

    def _analyze_column_type(self, df: pd.DataFrame, col_idx: int) -> Tuple[Optional[str], Optional[float]]:
        """Analyse une colonne sp√©cifique pour d√©terminer son type FiO2."""
        
        # Compte les valeurs m√©dicales pr√©sentes dans cette colonne
        medical_values = 0
        has_percentage = False
        found_percentage = None
        
        for row_idx, row in df.iterrows():
            if col_idx < len(row):
                cell_value = str(row.iloc[col_idx]).strip()
                
                if cell_value and cell_value not in ['nan', '']:
                    # Cherche des valeurs m√©dicales
                    if re.search(r'\d+(?:\.\d+)?\s*(?:mmhg|mmol|%|cm)', cell_value.lower()):
                        medical_values += 1
                    
                    # Cherche pH
                    if re.search(r'\d\.\d{2}', cell_value):
                        medical_values += 1
                    
                    # Cherche des pourcentages FiO2 sp√©cifiques
                    percentage_match = re.search(r'(\d+)\s*%', cell_value)
                    if percentage_match:
                        pct = float(percentage_match.group(1))
                        # V√©rifie si c'est sur une ligne FiO2
                        row_text = ' '.join(str(c) for c in row if str(c).strip()).lower()
                        if 'fio2' in row_text or 'pourcentage' in row_text:
                            has_percentage = True
                            found_percentage = pct
        
        if self.debug:
            print(f"      Colonne {col_idx}: {medical_values} valeurs m√©dicales, "
                f"pourcentage: {found_percentage}%")
        
        # D√©termine le type bas√© sur l'analyse
        if medical_values >= 3:  # Au moins 3 param√®tres m√©dicaux
            if has_percentage and found_percentage and found_percentage < 100:
                return "FiO2<100", found_percentage
            elif has_percentage and found_percentage == 100:
                return "FiO2=100", found_percentage
            else:
                # Pas de pourcentage explicite, devine bas√© sur les valeurs PaO2
                pao2_value = self._get_pao2_from_column(df, col_idx)
                if pao2_value and pao2_value > 300:  # PaO2 √©lev√© sugg√®re FiO2=100
                    return "FiO2=100", 100.0
                else:
                    return "FiO2<100", 21.0  # D√©faut air ambiant
        
        return None, None

    def _get_pao2_from_column(self, df: pd.DataFrame, col_idx: int) -> Optional[float]:
        """Extrait la valeur PaO2 d'une colonne pour aider √† d√©terminer le type."""
        for row_idx, row in df.iterrows():
            row_text = ' '.join(str(c) for c in row if str(c).strip()).lower()
            if 'pao2' in row_text and col_idx < len(row):
                cell_value = str(row.iloc[col_idx]).strip()
                pao2_match = re.search(r'(\d+(?:\.\d+)?)\s*mmhg', cell_value.lower())
                if pao2_match:
                    return float(pao2_match.group(1))
        return None



    def _extract_headers(self, df: pd.DataFrame) -> List[Dict[str, str]]:
        """
        Extrait les headers (dates/heures) de la table.

        Returns:
            Liste de dictionnaires avec 'date', 'time', 'column_index'
        """
        headers = []

        # Cherche dans les premi√®res lignes
        for row_idx in range(min(3, len(df))):
            row = df.iloc[row_idx]

            for col_idx, cell in enumerate(row):
                cell_str = str(cell).strip()

                if not cell_str or cell_str == 'nan':
                    continue

                # Cherche des dates
                date_match = re.search(r'(\d{2}/\d{2}/\d{4})', cell_str)
                time_match = re.search(r'(\d{2}:\d{2})', cell_str)

                if date_match or time_match:
                    # Trouve ou cr√©e l'entr√©e header pour cette colonne
                    header_entry = next(
                        (h for h in headers if h['column_index'] == col_idx), None
                    )

                    if header_entry is None:
                        header_entry = {'column_index': col_idx, 'date': '', 'time': ''}
                        headers.append(header_entry)

                    if date_match:
                        header_entry['date'] = date_match.group(1)
                    if time_match:
                        header_entry['time'] = time_match.group(1)

        # Trie par index de colonne
        headers.sort(key=lambda x: x['column_index'])

        if self.debug:
            print(f"   üìÖ Headers extraits: {len(headers)}")
            for h in headers:
                print(f"      Col {h['column_index']}: {h['date']} {h['time']}")

        return headers

    def _find_fio2_100_split(self, df: pd.DataFrame) -> Optional[int]:
        """Trouve la VRAIE ligne de s√©paration FiO2=100, pas le titre g√©n√©ral."""
        
        if self.debug:
            print(f"   üîç Recherche du split FiO2=100 dans {len(df)} lignes")
            # Debug: affiche quelques lignes pour comprendre la structure
            for i in range(min(len(df), 12)):
                row_text = ' '.join(str(cell) for cell in df.iloc[i] if str(cell).strip())[:100]
                print(f"      Ligne {i}: {row_text}")
        
        # √âTAPE 1: Cherche la ligne qui marque le D√âBUT des donn√©es FiO2=100
        # Cette ligne contient souvent "FiO2=100" ET "pH" ou d'autres param√®tres
        for idx in range(1, len(df)):  # Skip ligne 0 (titre g√©n√©ral)
            row_text = ' '.join(str(cell) for cell in df.iloc[idx] if str(cell).strip()).lower()
            
            # Patterns sp√©cifiques pour la ligne de transition
            transition_patterns = [
                r'fio2\s*=\s*100.*ph',      # "FiO2=100 : pH" 
                r'fio2\s*=\s*100\s*:',      # "FiO2=100 :"
                r'^fio2\s*=\s*100\s*$',     # Ligne avec juste "FiO2=100"
                r'.*fio2\s*=\s*100.*',      # Toute ligne contenant "FiO2=100"
            ]
            
            for pattern in transition_patterns:
                if re.search(pattern, row_text):
                    if self.debug:
                        print(f"   ‚úÖ Split FiO2=100 trouv√© ligne {idx}: {row_text[:80]}")
                    return idx
        
        # √âTAPE 2: M√©thode alternative - cherche le changement de structure
        # Apr√®s les donn√©es FiO2<100, il y a souvent une ligne diff√©rente avant FiO2=100
        for idx in range(2, len(df) - 2):
            current_row = ' '.join(str(cell) for cell in df.iloc[idx] if str(cell).strip()).lower()
            next_row = ' '.join(str(cell) for cell in df.iloc[idx + 1] if str(cell).strip()).lower()
            
            # Cherche une ligne avec "fio2=100" suivie d'une ligne avec des param√®tres
            if ('fio2=100' in current_row and 
                any(param in next_row for param in ['ph', 'paco2', 'pao2', 'mmhg'])):
                if self.debug:
                    print(f"   ‚úÖ Split alternatif ligne {idx}: {current_row[:60]}")
                return idx
                
        # √âTAPE 3: Cherche par analyse de contenu - d√©tecte le changement de pattern
        # Les lignes FiO2<100 ont des donn√©es, puis il y a une transition
        data_lines = []
        for idx in range(1, len(df)):
            row_text = ' '.join(str(cell) for cell in df.iloc[idx] if str(cell).strip())
            
            # Compte les donn√©es m√©dicales dans cette ligne
            medical_count = len(re.findall(r'\d+(?:\.\d+)?\s*(?:mmhg|mmol|%|cm)', row_text.lower()))
            medical_count += len(re.findall(r'\d\.\d{2}', row_text))  # pH
            
            data_lines.append((idx, medical_count, row_text.lower()))
            
            if self.debug and medical_count > 0:
                print(f"      Ligne {idx}: {medical_count} valeurs m√©dicales")
        
        # Cherche le point o√π le pattern change
        for i in range(1, len(data_lines) - 1):
            idx, count, text = data_lines[i]
            
            # Si cette ligne contient "fio2=100" et que les lignes suivantes ont des donn√©es
            if 'fio2=100' in text and count == 0:
                # V√©rifie que les lignes suivantes ont des donn√©es
                following_data = sum(data_lines[j][1] for j in range(i + 1, min(i + 4, len(data_lines))))
                if following_data > 3:  # Au moins quelques param√®tres dans les lignes suivantes
                    if self.debug:
                        print(f"   ‚úÖ Split par analyse ligne {idx}: changement de pattern d√©tect√©")
                    return idx
        
        if self.debug:
            print("   ‚ùå Aucun split FiO2=100 trouv√©")
        return None

    def _remove_empty_columns_per_section(
        self,
        df: pd.DataFrame,
        headers: List[Dict],
        section_type: str
    ) -> Tuple[pd.DataFrame, List[Dict]]:
        """Version am√©lior√©e pour d√©tecter correctement les colonnes avec donn√©es."""
        # Colonnes √† garder : toujours la colonne 0 (labels)
        columns_to_keep = {0}

        if self.debug:
            print(f"   üîç Analyse colonnes pour {section_type}:")
            print(f"      DataFrame shape: {df.shape}")

        # Analyser chaque colonne pour voir si elle a du contenu m√©dical
        for col_idx in range(1, len(df.columns)):
            col_content = []
            numeric_values = 0
            medical_units = 0
            
            for row_idx in range(len(df)):
                cell_value = str(df.iloc[row_idx, col_idx]).strip()
                if cell_value and cell_value != 'nan':
                    col_content.append(cell_value.lower())
                    
                    # Compte les valeurs num√©riques
                    if re.search(r'\d+(?:\.\d+)?', cell_value):
                        numeric_values += 1
                    
                    # Compte les unit√©s m√©dicales
                    if re.search(r'mmhg|mmol|%|cm.*eau', cell_value.lower()):
                        medical_units += 1

            col_text = ' '.join(col_content)

            # Crit√®res plus stricts pour garder une colonne
            has_timestamps = bool(re.search(r'\d{2}/\d{2}/\d{4}|\d{2}:\d{2}', col_text))
            has_medical_values = numeric_values >= 2  # Au moins 2 valeurs num√©riques
            has_medical_units = medical_units >= 1    # Au moins 1 unit√© m√©dicale
            has_ph_values = bool(re.search(r'\d\.\d{2}', col_text))  # pH typique

            should_keep = has_timestamps or (has_medical_values and (has_medical_units or has_ph_values))

            if should_keep:
                columns_to_keep.add(col_idx)
                
            if self.debug:
                print(f"      Col {col_idx}: nums={numeric_values}, units={medical_units}, "
                    f"timestamps={has_timestamps}, pH={has_ph_values} -> {'GARD√âE' if should_keep else 'SUPPRIM√âE'}")
                if should_keep:
                    print(f"         Contenu: {col_text[:80]}...")

        # Filtre le DataFrame
        columns_to_keep = sorted(list(columns_to_keep))
        df_clean = df.iloc[:, columns_to_keep]

        # Met √† jour les headers
        old_to_new_mapping = {old_idx: new_idx for new_idx, old_idx in enumerate(columns_to_keep)}

        updated_headers = []
        for header in headers:
            if header['column_index'] in old_to_new_mapping:
                new_header = header.copy()
                new_header['column_index'] = old_to_new_mapping[header['column_index']]
                new_header['original_column'] = header['column_index']
                updated_headers.append(new_header)

        if self.debug:
            print(f"   üßπ {section_type} nettoyage: {df.shape} -> {df_clean.shape}")
            print(f"      Colonnes gard√©es: {columns_to_keep}")
            print(f"      Headers finaux: {[(h['original_column'], h['date'], h['time']) for h in updated_headers]}")

        return df_clean, updated_headers







    def _extract_section_data_smart(
        self,
        df: pd.DataFrame,
        headers: List[Dict],
        section_type: str
    ) -> List[Dict[str, Any]]:
        """
        Extrait les donn√©es d'une section avec la logique am√©lior√©e.
        """
        if self.debug:
            print(f"\n   üìä Extraction section {section_type}:")
            print(f"      Shape: {df.shape}")

        data_entries = []

        # Trouve les pourcentages FiO2 pour cette section
        fio2_percentages = self._extract_fio2_percentages(df, section_type)

        # Trouve les lignes de param√®tres m√©dicaux
        param_rows = self._find_medical_parameter_rows(df)

        if self.debug:
            print(f"      FiO2 percentages: {fio2_percentages}")
            print(f"      Param rows: {list(param_rows.keys())}")

        # Pour chaque header (timestamp), extrait les valeurs
        for header in headers:
            col_idx = header['column_index']

            if col_idx >= len(df.columns):
                continue

            timestamp = f"{header['date']} {header['time']}".strip()
            if not timestamp:
                continue

            # D√©termine le pourcentage FiO2 pour cette colonne
            fio2_percentage = self._get_fio2_for_column(
                fio2_percentages, col_idx, section_type
            )

            data_entry = {
                'timestamp': timestamp,
                'fio2_type': section_type,
                'fio2_percentage': fio2_percentage,
                'column_index': col_idx
            }

            # Extrait les valeurs des param√®tres pour cette colonne
            has_medical_data = False
            extracted_params = []

            for param_name, row_idx in param_rows.items():
                if row_idx < len(df) and col_idx < len(df.columns):
                    # Am√©lioration: extraction cibl√©e par param√®tre
                    numeric_value = self._extract_parameter_value_from_row(
                        df, row_idx, col_idx, param_name
                    )

                    if numeric_value is not None:
                        data_entry[param_name] = numeric_value
                        has_medical_data = True
                        extracted_params.append(param_name)
                        if self.debug:
                            print(f"      ‚Üí {param_name}: {numeric_value} (ligne {row_idx}, col {col_idx}) ‚úì")
                    else:
                        if self.debug:
                            cell_value = str(df.iloc[row_idx, col_idx]).strip()
                            print(f"      ‚Üí {param_name}: √âCHEC - cellule='{cell_value}' (ligne {row_idx}, col {col_idx}) ‚úó")

            # Ajoute seulement si on a des donn√©es m√©dicales
            if has_medical_data:
                data_entries.append(data_entry)
                if self.debug:
                    print(f"      ‚úÖ {timestamp}: {len(extracted_params)} params [{', '.join(extracted_params)}]")

        return data_entries


    def _extract_parameter_value_from_row(
        self,
        df: pd.DataFrame,
        row_idx: int,
        col_idx: int,
        param_name: str,
    ) -> Optional[float]:
        """
        Extrait la valeur sp√©cifique d'un param√®tre avec recherche √©tendue.
        """
        if row_idx >= len(df) or col_idx >= len(df.columns):
            return None

        cell_value = str(df.iloc[row_idx, col_idx]).strip()

        if self.debug:
            print(f"         Analyse cellule [{row_idx}, {col_idx}] pour {param_name}: '{cell_value}'")

        # NOUVELLE LOGIQUE: Recherche √©tendue si cellule vide
        search_cells = [cell_value]
        
        if not cell_value or cell_value in ["nan", "", "√Ä ajouter"]:
            # Cherche dans un rayon de 3 lignes autour
            for offset in [1, -1, 2, -2, 3]:
                check_row = row_idx + offset
                if 0 <= check_row < len(df):
                    next_cell = str(df.iloc[check_row, col_idx]).strip()
                    if next_cell and next_cell not in ["nan", "", "√Ä ajouter"]:
                        search_cells.append(next_cell)
                        if self.debug:
                            print(f"         ‚Üí Cellule alternative ligne {check_row}: '{next_cell}'")

        # Teste chaque cellule trouv√©e
        for cell_candidate in search_cells:
            if not cell_candidate:
                continue
                
            extracted_value = self._extract_parameter_by_type(cell_candidate, param_name)
            
            if extracted_value is not None:
                if self.debug:
                    print(f"         ‚Üí Valeur extraite pour {param_name}: {extracted_value}")
                return extracted_value

        if self.debug:
            print(f"         ‚Üí Aucune valeur valide trouv√©e pour {param_name}")
        
        return None

    def _extract_parameter_by_type(self, cell_value: str, param_name: str) -> Optional[float]:
        """Extraction sp√©cialis√©e par type de param√®tre."""
        
        if param_name == "pH":
            ph_match = re.search(r"(\d+\.\d{1,2})", cell_value)
            if ph_match:
                value = float(ph_match.group(1))
                if 6.5 <= value <= 8.0:
                    return value

        elif param_name in ["PaCO2", "PaO2"]:
            # Cherche avec unit√© mmHg d'abord
            pressure_match = re.search(r"(\d+(?:\.\d+)?)\s*mmhg", cell_value.lower())
            if pressure_match:
                value = float(pressure_match.group(1))
                if self.debug:
                    print(f"         ‚Üí Trouv√© {param_name} avec mmHg: {value}")
                return value
            
            # Cherche un nombre seul SEULEMENT si pas d'unit√© parasite
            if not re.search(r"mmol|%|cm", cell_value.lower()):
                num_match = re.search(r"(\d+(?:\.\d+)?)", cell_value)
                if num_match:
                    value = float(num_match.group(1))
                    if self.debug:
                        print(f"         ‚Üí Trouv√© {param_name} sans unit√©: {value}")
                    
                    # Validation stricte selon le param√®tre
                    if param_name == "PaCO2" and 10 <= value <= 100:
                        return value
                    elif param_name == "PaO2" and 30 <= value <= 600:
                        return value

        elif param_name == "CO3H":
            # Cherche avec unit√© mmol d'abord
            co3h_match = re.search(r"(\d+(?:\.\d+)?)\s*mmol", cell_value.lower())
            if co3h_match:
                value = float(co3h_match.group(1))
                if 10 <= value <= 35:
                    return value

        elif param_name == "SaO2":
            sao2_match = re.search(r"(\d+(?:\.\d+)?)\s*%", cell_value)
            if sao2_match:
                value = float(sao2_match.group(1))
                if 70 <= value <= 100:
                    return value

        elif param_name == "PEEP":
            peep_match = re.search(r"(\d+(?:\.\d+)?)\s*cm", cell_value.lower())
            if peep_match:
                value = float(peep_match.group(1))
                if 0 <= value <= 25:
                    return value

        return None




    def _is_value_plausible(self, param_name: str, value: float) -> bool:
        """V√©rifie si une valeur est plausible pour un param√®tre donn√©."""
        plausible_ranges = {
            "pH": (6.5, 8.0),
            "PaCO2": (10, 100),
            "PaO2": (30, 600),
            "SaO2": (70, 100),
            "CO3H": (10, 35),
            "PEEP": (0, 25),
        }
        if param_name in plausible_ranges:
            min_val, max_val = plausible_ranges[param_name]
            return min_val <= value <= max_val
        return True




    def _extract_fio2_percentages(self, df: pd.DataFrame, section_type: str) -> Dict[int, float]:
        """Version corrig√©e pour mieux identifier les pourcentages FiO2."""
        percentages = {}

        if self.debug:
            print(f"      Recherche pourcentages FiO2 dans section {section_type}")

        # Patterns plus sp√©cifiques selon le type de section
        if section_type == "FiO2<100":
            fio2_indicators = [
                "fio2<100.*pourcentage",
                "fio2.*<.*100.*pourcentage", 
                "pourcentage.*:",  # si isol√© dans section FiO2<100
            ]
        else:  # FiO2=100
            fio2_indicators = [
                "fio2=100",
                "fio2.*=.*100",
            ]

        # √âTAPE 1: Identifier les lignes FiO2 sp√©cifiques
        fio2_indicator_rows = []
        
        for row_idx, row in df.iterrows():
            row_text = " ".join(str(cell) for cell in row if str(cell).strip()).lower()
            
            if any(re.search(indicator, row_text) for indicator in fio2_indicators):
                fio2_indicator_rows.append(row_idx)
                if self.debug:
                    print(f"         Ligne FiO2 identifi√©e {row_idx}: {row_text[:60]}...")

        # √âTAPE 2: Chercher les pourcentages dans ces lignes et adjacentes
        for row_idx in fio2_indicator_rows:
            # Cherche dans la ligne courante et les 2 suivantes
            for offset in range(0, 3):
                check_row = row_idx + offset
                if check_row >= len(df):
                    continue
                    
                row = df.iloc[check_row]
                
                for col_idx, cell in enumerate(row):
                    cell_str = str(cell).strip()
                    percentage_match = re.search(r"(\d+)\s*%", cell_str)
                    
                    if percentage_match:
                        percentage = float(percentage_match.group(1))
                        
                        # Validation plus stricte
                        is_valid = False
                        if section_type == "FiO2<100" and 21 <= percentage < 100:
                            is_valid = True
                        elif section_type == "FiO2=100" and percentage == 100:
                            is_valid = True
                        
                        if is_valid:
                            percentages[col_idx] = percentage
                            if self.debug:
                                print(f"         ‚Üí FiO2 {percentage}% pour col {col_idx}")

        if self.debug:
            print(f"         Pourcentages FiO2 finaux: {percentages}")

        return percentages




    def _get_fio2_for_column(
        self,
        percentages: Dict[int, float],
        col_idx: int,
        section_type: str
    ) -> float:
        """D√©termine le pourcentage FiO2 pour une colonne sp√©cifique."""
        if col_idx in percentages:
            return percentages[col_idx]

        # Valeurs par d√©faut selon le type
        if section_type == "FiO2=100":
            return 100.0
        else:
            return 21.0  # Air ambiant par d√©faut



    def _find_medical_parameter_rows(self, df: pd.DataFrame) -> Dict[str, int]:
        """Trouve les lignes contenant les param√®tres m√©dicaux avec am√©lioration."""
        param_rows = {}

        # Param√®tres √† chercher avec patterns am√©lior√©s
        param_patterns = {
            'pH': r'\.{0,5}ph\b(?!\w)|ph\s*[:=]|fio2.*ph',
            'PaCO2': r'\.{0,5}paco2\b|paco2\s*[:=]',
            'PaO2': r'\.{0,5}pao2\b|pao2\s*[:=]', 
            'SaO2': r'\.{0,5}sao2\b|sao2\s*[:=]',
            'CO3H': r'\.{0,5}co3h-?\b|co3h\s*[:=]',
            'PEEP': r'\.{0,5}peep\b|peep\s*[:=]'
        }

        for row_idx, row in df.iterrows():
            row_text = ' '.join(str(cell) for cell in row if str(cell).strip()).lower()

            if self.debug and any(param.lower() in row_text for param in ["ph", "paco2", "pao2", "sao2", "co3h", "peep"]):
                print(f"      Ligne {row_idx}: {row_text[:80]}...")

            # Priorit√© stricte : un seul param√®tre par ligne
            for param_name, pattern in param_patterns.items():
                if re.search(pattern, row_text):
                    # V√©rifie qu'il y a des donn√©es num√©riques dans cette ligne ou les suivantes
                    has_data_nearby = self._check_data_nearby(df, row_idx)
                    if has_data_nearby:
                        param_rows[param_name] = row_idx
                        if self.debug:
                            print(f"      ‚Üí {param_name} trouv√© ligne {row_idx}")
                        break

        return param_rows

    def _check_data_nearby(self, df: pd.DataFrame, row_idx: int) -> bool:
        """V√©rifie s'il y a des donn√©es num√©riques pr√®s de cette ligne."""
        # Cherche dans un rayon de 3 lignes
        for offset in range(-2, 4):
            check_row = row_idx + offset
            if 0 <= check_row < len(df):
                row = df.iloc[check_row]
                row_text = ' '.join(str(cell) for cell in row if str(cell).strip())
                
                # Cherche des patterns num√©riques m√©dicaux
                if re.search(r'\d+(?:\.\d+)?\s*(?:mmhg|mmol|%|cm)', row_text.lower()):
                    return True
                if re.search(r'\d+\.\d{2}', row_text):  # pH style
                    return True
        
        return False






    def _extract_numeric_value(self, text: str) -> Optional[float]:
        """Extrait une valeur num√©rique d'une cha√Æne."""
        if not text or text.strip() == '' or text.strip().lower() in ['nan', '']:
            return None

        # Nettoie le texte (garde seulement chiffres, points, virgules)
        cleaned = re.sub(r'[^\d\.\,\-]', ' ', str(text))

        # Cherche des nombres d√©cimaux
        decimal_matches = re.findall(r'\d+[,\.]\d+', cleaned)
        if decimal_matches:
            value_str = decimal_matches[0].replace(',', '.')
            try:
                return float(value_str)
            except ValueError:
                pass

        # Cherche des entiers
        int_matches = re.findall(r'\d+', cleaned)
        if int_matches:
            try:
                return float(int_matches[0])
            except ValueError:
                pass

        return None


    def _select_best_data_smart(self, all_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Version corrig√©e de la s√©lection avec priorit√© temporelle."""
        if not all_data:
            return {}

        def parse_timestamp(entry: Dict) -> datetime:
            ts_str = entry.get('timestamp', '')
            try:
                return datetime.strptime(ts_str, "%d/%m/%Y %H:%M")
            except ValueError:
                try:
                    return datetime.strptime(ts_str, "%d/%m/%Y")
                except ValueError:
                    return datetime.min

        def priority_score(entry: Dict) -> Tuple:
            timestamp = parse_timestamp(entry)
            # CORRECTION: Priorit√© au timestamp le plus r√©cent en PREMIER
            # Puis FiO2=100 > FiO2<100
            # Puis FiO2% le plus √©lev√©
            fio2_type_priority = 2 if entry.get('fio2_type') == 'FiO2=100' else 1
            fio2_percentage = entry.get('fio2_percentage', 0)
            return (timestamp, fio2_type_priority, fio2_percentage)

        # Trie par priorit√© (le plus √©lev√© en premier)
        sorted_data = sorted(all_data, key=priority_score, reverse=True)
        
        if self.debug:
            print(f"\nüîÑ S√©lection parmi {len(all_data)} entr√©es:")
            for i, entry in enumerate(sorted_data[:3]):  # Affiche top 3
                score = priority_score(entry)
                print(f"   {i+1}. {entry.get('timestamp')} | {entry.get('fio2_type')} | "
                    f"{entry.get('fio2_percentage')}% | Score: {score}")
        
        selected = sorted_data[0]

        # Construit le r√©sultat final
        result = {}
        exclude_keys = {'timestamp', 'fio2_type', 'column_index'}

        for key, value in selected.items():
            if key not in exclude_keys:
                result[key] = value

        if self.debug:
            print(f"\nüéØ Donn√©es s√©lectionn√©es:")
            print(f"   Timestamp: {selected.get('timestamp')} (le plus r√©cent)")
            print(f"   Type: {selected.get('fio2_type')}")
            print(f"   FiO2: {selected.get('fio2_percentage')}%")
            print(f"   Param√®tres: {len([k for k in result.keys() if k != 'fio2_percentage'])}")

        return result






    def validate_gds_data(self, data: Dict[str, float]) -> List[str]:
        """Valide les donn√©es extraites."""
        warnings = []

        ranges = {
            "pH": (6.8, 8.0),
            "PaCO2": (10, 100),
            "PaO2": (30, 600),
            "SaO2": (70, 100),
            "CO3H": (10, 35),
            "PEEP": (0, 20),
            "fio2_percentage": (21, 100)
        }

        for param, (min_val, max_val) in ranges.items():
            if param in data:
                value = data[param]
                if not (min_val <= value <= max_val):
                    warnings.append(f"{param} = {value} hors plage ({min_val}-{max_val})")

        return warnings


# Fonctions de test
def test_smart_gds_extractor(pdf_path: str, pages: str = "all", debug: bool = True):
    """Teste l'extracteur GDS intelligent."""
    print(f"üß† Test Smart GDS Extractor")
    print(f"üìÅ Fichier: {pdf_path}")
    print("=" * 60)

    if not Path(pdf_path).exists():
        print(f"‚ùå Fichier non trouv√©: {pdf_path}")
        return None

    # Initialise l'extracteur
    extractor = SmartGDSExtractor(debug=debug)

    # Extrait les donn√©es
    print("\nüîÑ Extraction avec logique intelligente...")
    gds_data = extractor.extract_gds_data_from_pdf(pdf_path, pages)

    # Affiche les r√©sultats
    print(f"\nüìä R√©sultats finaux:")
    print("=" * 40)

    if gds_data:
        print("‚úÖ Donn√©es GDS extraites avec succ√®s!")
        for param, value in gds_data.items():
            print(f"   {param:20}: {value}")

        # Validation
        warnings = extractor.validate_gds_data(gds_data)

        if warnings:
            print(f"\n‚ö†Ô∏è  Avertissements:")
            for warning in warnings:
                print(f"   ‚Ä¢ {warning}")
        else:
            print(f"\n‚úÖ Toutes les valeurs sont valides")
    else:
        print("‚ùå Aucune donn√©e extraite")

    return gds_data



‚úÖ Camelot version: 1.0.0


In [93]:

# Pour tester
PDF_PATH = r"C:\Users\benysar\Documents\GitHub\pulmo-cristal\sandbox\sample_pdfs\53628.pdf"  # Changez ce chemin
result = test_smart_gds_extractor(PDF_PATH)


üß† Test Smart GDS Extractor
üìÅ Fichier: C:\Users\benysar\Documents\GitHub\pulmo-cristal\sandbox\sample_pdfs\53628.pdf

üîÑ Extraction avec logique intelligente...


2025-08-27 09:50:16,651 - SmartGDSExtractor - INFO - Trouv√© 19 tables


üéØ Table GDS s√©lectionn√©e (score: 13)
   Shape: (9, 4)

üîç Traitement table GDS:
   Shape originale: (9, 4)
   üìÖ Headers extraits: 3
      Col 1: 05/01/2009 12:00
      Col 2: 05/01/2009 15:40
      Col 3: 06/01/2009 03:45
   üîç Recherche du split FiO2=100 dans 9 lignes
      Ligne 0: Gaz du sang FiO2 <100% et FiO2=100%
      Ligne 1: 05/01/2009 12:00 05/01/2009 15:40 06/01/2009 03:45
      Ligne 2: FiO2<100 : pourcentage : 60 %
      Ligne 3: ...pH 7.48
      Ligne 4: ...PaCO2 30 mmHg
      Ligne 5: ...PaO2 236 mmHg
      Ligne 6: ...CO3H- 23 mmol/l
      Ligne 7: ...SaO2 99.6 %
      Ligne 8: ...PEEP 5 cm d'eau
      Ligne 2: 1 valeurs m√©dicales
      Ligne 3: 1 valeurs m√©dicales
      Ligne 4: 1 valeurs m√©dicales
      Ligne 5: 1 valeurs m√©dicales
      Ligne 6: 1 valeurs m√©dicales
      Ligne 7: 1 valeurs m√©dicales
      Ligne 8: 1 valeurs m√©dicales
   ‚ùå Aucun split FiO2=100 trouv√©
   üîÑ Analyse intelligente des colonnes par type FiO2
   üîç Analyse colonnes