# TP – API “Digital Social Score”

De l’analyse de texte à l’infrastructure Cloud sécurisée, scalable 
et conform

## Etape 1 : Exploration, analyse et anonymisation des données

###  Téléchargons le dataset Toxic Comment Classification Datase (Hugging Face)

In [None]:
from datasets import load_dataset
import pandas as pd

dataset = load_dataset("thesofakillers/jigsaw-toxic-comment-classification-challenge", split="train")

# Afficher les premier lignes
df = pd.DataFrame(dataset)
df


### Utilisons spaCy pour anonymiser les données 

In [None]:
# python -m spacy download en_core_web_sm"
import spacy
import re
import pandas as pd
from tqdm import tqdm

# Charger spaCy pour l'anglais (une seule fois)
nlp = spacy.load("en_core_web_sm")

# Patterns regex pour détecter les données personnelles
EMAIL_PATTERN = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
PHONE_PATTERN = re.compile(r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b')

def detect_personal_data(text):
    """
    Détecte toutes les données personnelles pour la conformité RGPD
    """
    if not text or pd.isna(text):
        return {
            'has_personal_data': False,
            'names': [],
            'emails': [],
            'phones': [],
            'addresses': []
        }
    
    text_str = str(text)
    
    # 1. EMAILS
    emails = EMAIL_PATTERN.findall(text_str)
    
    # 2. TÉLÉPHONES  
    phones = PHONE_PATTERN.findall(text_str)
    
    # 3. NOMS avec spaCy NER
    doc = nlp(text_str)
    names = []
    for ent in doc.ents:
        if ent.label_ == 'PERSON':
            names.append(ent.text)
    
    return {
        'has_personal_data': bool(names or emails or phones),
        'names': names,
        'emails': emails,
        'phones': phones,
    }

def anonymize_text(text, detection_result):
    """
    Anonymise automatiquement le texte pour la conformité RGPD
    """
    if not detection_result['has_personal_data']:
        return text
    
    anonymized_text = str(text)
    
    for email in detection_result['emails']:
        anonymized_text = anonymized_text.replace(email, "[EMAIL]")
    
    for phone in detection_result['phones']:
        anonymized_text = anonymized_text.replace(phone, "[PHONE]")
    
    for name in detection_result['names']:
        anonymized_text = anonymized_text.replace(name, "[NAME]")
    
    return anonymized_text


In [None]:
# CRÉATION DU DATASET ANONYMISÉ (df_anonymized)
print("ANONYMISATION DU DATASET")
print("=" * 30)

# Travaillons avec 10 000 données
max_to_anonymize = 10000

# Taille réelle à traiter
actual_size = min(max_to_anonymize, len(df))

print(f"PARAMETRES D'ANONYMISATION:")
print(f"Dataset total: {len(df):,} commentaires")
print(f"A anonymiser: {actual_size:,} commentaires")

# Créer une copie du DataFrame pour l'anonymisation
df_anonymized = df.copy()

# Statistiques de traitement
comments_with_personal_data = 0
total_anonymizations = 0

print(f"\nAnonymisation en cours...")

# Traiter les commentaires avec barre de progression
for i in tqdm(range(actual_size), desc="Anonymisation"):
    original_comment = df['comment_text'].iloc[i]
    
    # Détecter les données personnelles
    detection = detect_personal_data(original_comment)
    
    # Si des données personnelles sont trouvées, anonymiser
    if detection['has_personal_data']:
        comments_with_personal_data += 1
        
        # Compter le nombre total d'éléments à anonymiser
        num_elements = (len(detection['names']) + len(detection['emails']) + len(detection['phones']))
        total_anonymizations += num_elements
        
        # Anonymiser le texte
        anonymized_comment = anonymize_text(original_comment, detection)
        
        # Remplacer dans le DataFrame anonymisé
        df_anonymized.loc[i, 'comment_text'] = anonymized_comment

print(f"Total commentaires traités: {actual_size:,}")
print(f"Commentaires anonymisés: {comments_with_personal_data:,}")
print(f"Pourcentage de commentaires affectés: {comments_with_personal_data/actual_size*100:.2f}%")
print(f"Total d'éléments anonymisés: {total_anonymizations:,}")

# Sauvegarder le dataset anonymisé
anonymized_filename = "dataset_anonymized_rgpd_10k.csv"
df_anonymized.to_csv(anonymized_filename, index=False)
print(f"\nDataset anonymisé sauvegardé: {anonymized_filename}")

### Comparons la version initiale et anonymisée et justifions chaque choix.

## Etape 1 :  Préparation et entraînement d’un modèle IA

### Nettoyage des textes (ponctuation, emojis, casse)

In [None]:
import re
import string
import pandas as pd
from tqdm import tqdm

def clean_text(text):
    """
    Fonction complète de nettoyage de texte pour le NLP
    """
    if not text or pd.isna(text):
        return ""
    
    # Convertir en string au cas où
    text = str(text)
    
    # 1. CASSE : Convertir en minuscules
    text = text.lower()
    
    # 2. EMOJIS : Supprimer les emojis (patterns Unicode)
    emoji_pattern = re.compile("["
                              u"\U0001F600-\U0001F64F"  # emoticons
                              u"\U0001F300-\U0001F5FF"  # symboles & pictogrammes
                              u"\U0001F680-\U0001F6FF"  # transport & cartes
                              u"\U0001F1E0-\U0001F1FF"  # drapeaux
                              u"\U00002500-\U00002BEF"  # caractères chinois
                              u"\U00002702-\U000027B0"
                              u"\U00002702-\U000027B0"
                              u"\U000024C2-\U0001F251"
                              u"\U0001f926-\U0001f937"
                              u"\U00010000-\U0010ffff"
                              u"\u2640-\u2642"
                              u"\u2600-\u2B55"
                              u"\u200d"
                              u"\u23cf"
                              u"\u23e9"
                              u"\u231a"
                              u"\ufe0f"  # dingbats
                              u"\u3030"
                              "]+", flags=re.UNICODE)
    text = emoji_pattern.sub(r'', text)
    
    # 3. CARACTÈRES SPÉCIAUX : Supprimer les caractères non-ASCII
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)
    
    # 4. URLs : Supprimer les URLs
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', ' ', text)
    text = re.sub(r'www\.(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', ' ', text)
    
    # 5. MENTIONS : Supprimer les @mentions
    text = re.sub(r'@\w+', ' ', text)
    
    # 6. HASHTAGS : Supprimer les #hashtags
    text = re.sub(r'#\w+', ' ', text)
    
    # 7. CHIFFRES : Supprimer les chiffres isolés
    text = re.sub(r'\b\d+\b', ' ', text)
    
    # 8. PONCTUATION : Supprimer la ponctuation
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    # 9. ESPACES : Normaliser les espaces multiples
    text = re.sub(r'\s+', ' ', text)
    
    # 10. TRIM : Supprimer les espaces en début/fin
    text = text.strip()
    
    return text

In [None]:
# ÉTAPE 3: Application du nettoyage au dataset anonymisé
print("NETTOYAGE DU DATASET ANONYMISE")
print("=" * 35)

# Vérifier que df_anonymized existe
if 'df_anonymized' not in locals():
    print("df_anonymized n'existe pas encore!")
else:
    # Travailler seulement sur les 10 000 données anonymisées
    max_to_clean = 10000
    actual_size_to_clean = min(max_to_clean, len(df_anonymized))
    
    # Créer une copie du dataset anonymisé pour le nettoyage (seulement les 10k)
    df_cleaned = df_anonymized.head(actual_size_to_clean).copy()
    
    # Statistiques de nettoyage
    total_comments = len(df_cleaned)
    original_total_words = 0
    cleaned_total_words = 0
    empty_after_cleaning = 0
    
    print(f"Nettoyage de {total_comments:,} commentaires anonymisés...")
    
    # Appliquer le nettoyage avec barre de progression
    for i in tqdm(range(total_comments), desc="Nettoyage des commentaires"):
        original_comment = df_cleaned['comment_text'].iloc[i]
        
        if original_comment and not pd.isna(original_comment):
            # Compter les mots avant nettoyage
            original_total_words += len(str(original_comment).split())
            
            # Appliquer le nettoyage standard
            cleaned_comment = clean_text(original_comment)
            
            # Remplacer dans le DataFrame
            df_cleaned.loc[i, 'comment_text'] = cleaned_comment
            
            # Compter les mots après nettoyage
            if cleaned_comment:
                cleaned_total_words += len(cleaned_comment.split())
            else:
                empty_after_cleaning += 1
    

    print(f"Total commentaires traités: {total_comments:,}")
    if original_total_words > 0:
        print(f"Réduction: {((original_total_words - cleaned_total_words) / original_total_words * 100):.1f}%")
    print(f"Commentaires vides après nettoyage: {empty_after_cleaning}")
    
    # Supprimer les commentaires vides
    df_cleaned = df_cleaned[df_cleaned['comment_text'].str.len() > 0]
    print(f"Commentaires conservés: {len(df_cleaned):,}")
    
    # Sauvegarder le dataset nettoyé
    cleaned_filename = "dataset_cleaned_and_anonymized_10k.csv"
    df_cleaned.to_csv(cleaned_filename, index=False)
    print(f"\nDataset nettoyé sauvegardé: {cleaned_filename}")

### Entraînons deux modèles : statistique(SVM) et avancé (BERT)

In [None]:
# PREPARATION DU DATASET FINAL POUR L'ENTRAINEMENT

# Vérifier que df_cleaned existe
if 'df_cleaned' not in locals():
    print("ERREUR: df_cleaned n'existe pas!")
    print("Veuillez d'abord exécuter les étapes précédentes.")
else:
    # Sélectionner uniquement les colonnes nécessaires
    required_columns = ['comment_text', 'toxic']
    
    # Vérifier que les colonnes existent
    available_columns = list(df_cleaned.columns)
    missing_columns = [col for col in required_columns if col not in available_columns]
    
    if missing_columns:
        print(f"ATTENTION: Colonnes manquantes: {missing_columns}")
        # Utiliser les colonnes disponibles
        final_columns = [col for col in required_columns if col in available_columns]
    else:
        final_columns = required_columns
    
    print(f"Colonnes finales utilisées: {final_columns}")
    
    # Créer le dataset final
    df_final = df_cleaned[final_columns].copy()
    
 
    
    # Distribution du label TOXIC
    if 'toxic' in df_final.columns:
        toxic_dist = df_final['toxic'].value_counts()
        print(f"\nDISTRIBUTION DU LABEL TOXIC:")
        print(f"Non-toxic (0): {toxic_dist.get(0, 0):,} ({toxic_dist.get(0, 0)/len(df_final)*100:.1f}%)")
        print(f"Toxic (1): {toxic_dist.get(1, 0):,} ({toxic_dist.get(1, 0)/len(df_final)*100:.1f}%)")
        
        # Vérifier l'équilibre des classes
        toxic_ratio = toxic_dist.get(1, 0) / len(df_final) * 100
        if toxic_ratio < 5:
            print(f"ATTENTION: Classes très déséquilibrées ({toxic_ratio:.1f}% toxic)")
        elif toxic_ratio < 20:
            print(f" Classes modérément déséquilibrées ({toxic_ratio:.1f}% toxic)")
        else:
            print(f" Classes relativement équilibrées ({toxic_ratio:.1f}% toxic)")

Les models sont dans le dossier model

### Comparons précision, rappel, et temps de traitement

In [None]:
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime
import os
import pickle
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support, 
    confusion_matrix, classification_report, roc_auc_score
)

class ModelBenchmark:
    def __init__(self, test_data_path="test_toxic_10k.csv"):
        """Initialiser le benchmark avec les données de test"""
        print(" BENCHMARK DES MODÈLES - TOXIC DETECTION")
        print("=" * 50)
        
        # Charger les données de test
        self.test_df = pd.read_csv(test_data_path)
        self.X_test = self.test_df['comment_text'].tolist()
        self.y_test = self.test_df['toxic'].tolist()
        
        print(f"Dataset de test: {len(self.X_test)} commentaires")
        print(f"Distribution: {np.bincount(self.y_test)} [Non-toxic, Toxic]")
        print()
        
        self.results = {}
    
    def benchmark_svm(self):
        """Benchmarker le modèle SVM"""
        print(" BENCHMARK SVM")
        print("-" * 20)
        
        try:
            # Charger le modèle SVM sauvegardé
            svm_path = "model/svm_pipeline.pkl"
            if not os.path.exists(svm_path):
                print(f" Modèle SVM non trouvé: {svm_path}")
                return None
            
            with open(svm_path, 'rb') as f:
                svm_model = pickle.load(f)
            
            start_time = time.time()
            
            # Prédictions
            predictions = svm_model.predict(self.X_test)
            
            # Essayer d'obtenir les probabilités
            probabilities = None
            try:
                # SVM n'a pas predict_proba par défaut, utiliser decision_function
                decision_scores = svm_model.decision_function(self.X_test)
                # Convertir en probabilités approximatives avec sigmoid
                probabilities = 1 / (1 + np.exp(-decision_scores))
            except:
                probabilities = None
            
            inference_time = time.time() - start_time
            
            # Calculer les métriques
            accuracy = accuracy_score(self.y_test, predictions)
            precision, recall, f1, _ = precision_recall_fscore_support(
                self.y_test, predictions, average='binary'
            )
            
            # AUC si probabilities disponibles
            auc = None
            if probabilities is not None:
                try:
                    auc = roc_auc_score(self.y_test, probabilities)
                except:
                    pass
            
            # Matrice de confusion
            cm = confusion_matrix(self.y_test, predictions)
            
            svm_results = {
                'model_name': 'SVM',
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'auc': auc,
                'inference_time': inference_time,
                'inference_per_sample': inference_time / len(self.X_test),
                'confusion_matrix': cm.tolist(),
                'predictions': predictions
            }
            
            print(f"SVM - Temps d'inférence: {inference_time:.2f}s")
            print(f"   Accuracy: {accuracy:.4f}")
            print(f"   F1-Score: {f1:.4f}")
            print()
            
            self.results['SVM'] = svm_results
            return svm_results
            
        except Exception as e:
            print(f" Erreur SVM: {e}")
            return None
    
    def benchmark_bert(self):
        """Benchmarker le modèle BERT"""
        print(" BENCHMARK BERT")
        print("-" * 20)
        
        try:
            # Charger le modèle BERT sauvegardé
            bert_path = "./bert_model"
            if not os.path.exists(bert_path):
                print(f" Modèle BERT non trouvé: {bert_path}")
                return None
            
            # Charger tokenizer et modèle
            tokenizer = AutoTokenizer.from_pretrained(bert_path)
            model = AutoModelForSequenceClassification.from_pretrained(bert_path)
            model.eval()
            
            start_time = time.time()
            
            # Tokenisation
            encoded = tokenizer(
                self.X_test, 
                truncation=True, 
                padding=True, 
                max_length=128, 
                return_tensors='pt'
            )
            
            # Prédictions
            with torch.no_grad():
                outputs = model(**encoded)
                logits = outputs.logits
                probabilities = torch.nn.functional.softmax(logits, dim=-1)
                predictions = torch.argmax(logits, dim=-1).numpy()
                proba_toxic = probabilities[:, 1].numpy()  # Probabilité classe toxic (1)
            
            inference_time = time.time() - start_time
            
            # Calculer les métriques
            accuracy = accuracy_score(self.y_test, predictions)
            precision, recall, f1, _ = precision_recall_fscore_support(
                self.y_test, predictions, average='binary'
            )
            
            # AUC
            auc = roc_auc_score(self.y_test, proba_toxic)
            
            # Matrice de confusion
            cm = confusion_matrix(self.y_test, predictions)
            
            bert_results = {
                'model_name': 'BERT',
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'auc': auc,
                'inference_time': inference_time,
                'inference_per_sample': inference_time / len(self.X_test),
                'confusion_matrix': cm.tolist(),
                'predictions': predictions
            }
            
            print(f" BERT - Temps d'inférence: {inference_time:.2f}s")
            print(f"   Accuracy: {accuracy:.4f}")
            print(f"   F1-Score: {f1:.4f}")
            print()
            
            self.results['BERT'] = bert_results
            return bert_results
            
        except Exception as e:
            print(f"❌ Erreur BERT: {e}")
            return None
    
    def compare_models(self):
        """Comparer les résultats des deux modèles"""
        if len(self.results) < 2:
            print("Pas assez de modèles pour la comparaison")
            return
        
        print(" COMPARAISON DES MODÈLES")
        print("=" * 50)
        
        svm_res = self.results.get('SVM', {})
        bert_res = self.results.get('BERT', {})
        
        # Tableau de comparaison
        metrics = ['accuracy', 'precision', 'recall', 'f1_score', 'auc', 'inference_time']
        
        print(f"{'Métrique':<15} {'SVM':<10} {'BERT':<10} {'Gagnant':<10}")
        print("-" * 50)
        
        svm_wins = 0
        bert_wins = 0
        
        for metric in metrics:
            svm_val = svm_res.get(metric, 0)
            bert_val = bert_res.get(metric, 0)
            
            if svm_val is None: svm_val = 0
            if bert_val is None: bert_val = 0
            
            # Pour le temps d'inférence, plus petit = mieux
            if metric == 'inference_time':
                winner = "SVM" if svm_val < bert_val and svm_val > 0 else "BERT"
                if winner == "SVM": svm_wins += 1
                else: bert_wins += 1
            else:
                winner = "SVM" if svm_val > bert_val else "BERT"
                if winner == "SVM": svm_wins += 1
                else: bert_wins += 1
            
            print(f"{metric:<15} {svm_val:<10.4f} {bert_val:<10.4f} {winner:<10}")
        
        print("-" * 50)
        print(f"Victoires SVM: {svm_wins}")
        print(f"Victoires BERT: {bert_wins}")
        
        # Recommandation
        overall_winner = "SVM" if svm_wins > bert_wins else "BERT"
        print(f"\n MODÈLE RECOMMANDÉ: {overall_winner}")
        
        # Analyse détaillée
        print(f"\n ANALYSE DÉTAILLÉE:")
        
        if overall_winner == "SVM":
            print(" Avantages SVM:")
            print("   - Plus rapide en inférence")
            print("   - Moins de ressources nécessaires")
            print("   - Plus simple à déployer")
        else:
            print(" Avantages BERT:")
            print("   - Meilleure compréhension contextuelle")
            print("   - Performance généralement supérieure")
            print("   - Transfert learning efficace")
        
        return overall_winner
    
    def save_results(self):
        """Sauvegarder les résultats"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"benchmark_results_{timestamp}.json"
        
        # Préparer les données pour JSON
        json_results = {}
        for model_name, results in self.results.items():
            json_results[model_name] = {}
            for key, value in results.items():
                if key == 'predictions':
                    continue  # Skip predictions (trop volumineux)
                elif isinstance(value, np.ndarray):
                    json_results[model_name][key] = value.tolist()
                elif isinstance(value, np.floating):
                    json_results[model_name][key] = float(value)
                else:
                    json_results[model_name][key] = value
        
        # Ajouter métadonnées
        json_results['metadata'] = {
            'timestamp': timestamp,
            'test_samples': len(self.X_test),
            'toxic_ratio': sum(self.y_test) / len(self.y_test),
        }
        
        with open(filename, 'w') as f:
            json.dump(json_results, f, indent=2)
        
        print(f"\n Résultats sauvegardés: {filename}")
        return filename
    
    def run_full_benchmark(self):
        """Exécuter le benchmark complet"""
        # Benchmarker SVM
        self.benchmark_svm()
        
        # Benchmarker BERT
        self.benchmark_bert()
        
        # Comparer
        winner = self.compare_models()
        
        # Sauvegarder
        self.save_results()
        
        return winner

# Lancer le benchmark
benchmark = ModelBenchmark()
winner = benchmark.run_full_benchmark()
    
print(f"\n CONCLUSION:")
print(f"Le modèle recommandé est: {winner}")

## Étape 3 : Déploiement du modèle en API Cloud

### Exportons le modèle et le déployons dans le Cloud

### Création d'une API Fast API recevant un texte et renvoyant un score.