In [2]:
import os
import re
import json
import random
from pathlib import Path
import spacy
from spacy.training import Example
from spacy.tokens import DocBin
from spacy.util import minibatch

# === PARAMÈTRES ===
MODEL_DIR = "model_output"
DATA_DIR = "data_entraînement"
TEXTS_DIR = "annonces_legales"
LABEL = "SIREN"
PATTERN_SIREN = r"\b\d{9}\b"  # Un SIREN = 9 chiffres exactement

class SirenExtractor:
    def __init__(self):
        self.model_dir = MODEL_DIR
        self.data_dir = DATA_DIR
        self.texts_dir = TEXTS_DIR
        self.label = LABEL
        self.pattern = PATTERN_SIREN
        self.nlp = None
    
    def load_or_create_model(self):
        """Charge un modèle existant ou crée un nouveau modèle vierge"""
        model_path = Path(self.model_dir)
        meta_path = model_path / "meta.json"
        if model_path.exists() and meta_path.exists():
            print("🔁 Modèle existant chargé.")
            self.nlp = spacy.load(self.model_dir)
        else:
            print("🆕 Nouveau modèle vierge créé.")
            self.nlp = spacy.blank("fr")
        return self.nlp
    
    def extract_siren_positions(self, text):
        """Extrait les positions des numéros SIREN dans le texte"""
        return [(m.start(), m.end()) for m in re.finditer(self.pattern, text)]
    
    def validate_siren(self, siren):
        """Validation basique du numéro SIREN (peut être étendue)"""
        if len(siren) != 9 or not siren.isdigit():
            return False
        # Ici vous pourriez ajouter l'algorithme de validation Luhn si nécessaire
        return True
    
    def create_training_data(self, output_path):
        """Convertit les fichiers .txt en données d'entraînement JSONL"""
        if not os.path.exists(self.texts_dir):
            print(f"❌ Dossier {self.texts_dir} introuvable.")
            return False
        
        training_data = []
        processed_files = 0
        
        for filename in os.listdir(self.texts_dir):
            if filename.endswith(".txt"):
                filepath = os.path.join(self.texts_dir, filename)
                try:
                    with open(filepath, "r", encoding="utf-8") as f:
                        text = f.read().strip()
                    
                    if not text:
                        continue
                    
                    # Extraire les positions des SIREN
                    entities = self.extract_siren_positions(text)
                    
                    # Filtrer les SIREN valides
                    valid_entities = []
                    for start, end in entities:
                        siren = text[start:end]
                        if self.validate_siren(siren):
                            valid_entities.append((start, end, self.label))
                    
                    if valid_entities:  # Seulement si on a trouvé des SIREN valides
                        training_data.append({
                            "text": text,
                            "entities": valid_entities,
                            "source": filename
                        })
                        processed_files += 1
                
                except Exception as e:
                    print(f"❌ Erreur lors du traitement de {filename}: {e}")
        
        # Sauvegarder en JSONL
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with open(output_path, "w", encoding="utf-8") as f:
            for record in training_data:
                json.dump(record, f, ensure_ascii=False)
                f.write("\n")
        
        print(f"✅ {len(training_data)} échantillons créés à partir de {processed_files} fichiers")
        print(f"✅ Données sauvegardées dans {output_path}")
        return len(training_data) > 0
    
    def convert_to_docbin(self, jsonl_path):
        """Convertit les données JSONL en DocBin pour spaCy"""
        if not os.path.exists(jsonl_path):
            print(f"❌ Fichier {jsonl_path} introuvable.")
            return None
        
        db = DocBin()
        examples_count = 0
        
        with open(jsonl_path, "r", encoding="utf-8") as f:
            for line_num, line in enumerate(f, 1):
                try:
                    record = json.loads(line.strip())
                    text = record["text"]
                    entities = record["entities"]
                    
                    # Créer le document
                    doc = self.nlp.make_doc(text)
                    
                    # Créer les entités
                    ents = []
                    for start, end, label in entities:
                        span = doc.char_span(start, end, label=label, alignment_mode="contract")
                        if span is not None:
                            ents.append(span)
                        else:
                            print(f"⚠️  Entité ignorée ligne {line_num}: '{text[start:end]}'")
                    
                    doc.ents = ents
                    db.add(doc)
                    examples_count += 1
                
                except Exception as e:
                    print(f"❌ Erreur ligne {line_num}: {e}")
        
        print(f"✅ {examples_count} exemples convertis en DocBin")
        return db
    
    def train_model(self, docbin, n_iter=20, batch_size=4, dropout=0.3):
        """Entraîne le modèle NER avec les données fournies"""
        if not docbin:
            print("❌ Pas de données d'entraînement.")
            return False
        
        # Configurer le pipeline NER
        if "ner" not in self.nlp.pipe_names:
            ner = self.nlp.add_pipe("ner", last=True)
        else:
            ner = self.nlp.get_pipe("ner")
        
        # Ajouter le label
        ner.add_label(self.label)
        
        # Préparer les données d'entraînement
        train_examples = []
        docs = list(docbin.get_docs(self.nlp.vocab))
        
        for doc in docs:
            example = Example.from_dict(doc, {"entities": [(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents]})
            train_examples.append(example)
        
        # Initialiser le modèle
        self.nlp.initialize(lambda: train_examples)
        
        print(f"🚀 Début de l'entraînement ({n_iter} époques, {len(train_examples)} exemples)")
        
        # Entraînement
        for epoch in range(n_iter):
            random.shuffle(train_examples)
            losses = {}
            
            # Entraîner par batch
            batches = minibatch(train_examples, size=batch_size)
            for batch in batches:
                self.nlp.update(batch, drop=dropout, losses=losses)
            
            # Afficher les progrès
            if (epoch + 1) % 5 == 0 or epoch == 0:
                print(f"📊 Époque {epoch + 1:2d}/{n_iter} - Perte NER: {losses.get('ner', 0):.4f}")
        
        # Sauvegarder le modèle
        self.nlp.to_disk(self.model_dir)
        print(f"💾 Modèle sauvegardé dans {self.model_dir}")
        return True
    
    def predict(self, text):
        """Prédit les entités SIREN dans un texte"""
        if not self.nlp:
            print("❌ Modèle non chargé.")
            return []
        
        doc = self.nlp(text)
        predictions = []
        
        for ent in doc.ents:
            if ent.label_ == self.label:
                predictions.append({
                    "text": ent.text,
                    "start": ent.start_char,
                    "end": ent.end_char,
                    "confidence": getattr(ent, 'confidence', 'N/A')
                })
        
        return predictions
    
    def evaluate_model(self, test_text_samples=None):
        """Évalue les performances du modèle sur des échantillons de test"""
        if not test_text_samples:
            test_text_samples = [
                "La société DUPONT SARL est enregistrée sous le SIREN 123456789 à Paris.",
                "Les entreprises 987654321 et 111222333 ont fusionné.",
                "SIREN: 555666777 - Société MARTIN & FILS",
                "Aucun numéro SIREN dans ce texte.",
                "Contact: 01.23.45.67.89 - SIREN 444555666"
            ]
        
        print("🔍 Évaluation du modèle:")
        print("-" * 50)
        
        for i, text in enumerate(test_text_samples, 1):
            predictions = self.predict(text)
            print(f"Test {i}: {text[:50]}...")
            if predictions:
                for pred in predictions:
                    print(f"  ✅ SIREN trouvé: {pred['text']} (pos: {pred['start']}-{pred['end']})")
            else:
                print("  ❌ Aucun SIREN détecté")
            print()

def main():
    """Fonction principale"""
    print("🏗️  Initialisation de l'extracteur SIREN")
    extractor = SirenExtractor()
    
    # Créer les dossiers nécessaires
    os.makedirs(DATA_DIR, exist_ok=True)
    
    # Charger ou créer le modèle
    extractor.load_or_create_model()
    
    # Étape 1: Créer les données d'entraînement
    jsonl_path = os.path.join(DATA_DIR, "train.jsonl")
    print("\n📚 Création des données d'entraînement...")
    if not extractor.create_training_data(jsonl_path):
        print("❌ Impossible de créer les données d'entraînement.")
        return
    
    # Étape 2: Convertir en DocBin
    print("\n🔄 Conversion en format spaCy...")
    docbin = extractor.convert_to_docbin(jsonl_path)
    if not docbin:
        return
    
    # Étape 3: Entraîner le modèle
    print("\n🎯 Entraînement du modèle...")
    if extractor.train_model(docbin, n_iter=20):
        print("\n✅ Entraînement terminé avec succès!")
    else:
        print("\n❌ Échec de l'entraînement.")
        return
    
    # Étape 4: Évaluer le modèle
    print("\n🧪 Évaluation du modèle...")
    extractor.evaluate_model()
    
    print("\n🎉 Pipeline terminé!")

if __name__ == "__main__":
    main()

🏗️  Initialisation de l'extracteur SIREN
🆕 Nouveau modèle vierge créé.

📚 Création des données d'entraînement...
❌ Dossier annonces_legales introuvable.
❌ Impossible de créer les données d'entraînement.
