1️⃣ Module Core - core_modules.py

In [51]:
%%writefile core_modules.py
from dataclasses import dataclass
from typing import Dict, List, Optional
import torch
import logging   # ← remplacement simple

@dataclass
class PredictionResult:
    text: str
    predicted_label: str
    confidence: float
    all_scores: Dict[str, float]
    context: Optional[List[str]] = None
    processing_time: float = 0.0

class ClimateConfig:
    def __init__(self):
        self.model_name = "distilbert-base-uncased"
        self.max_length = 256
        self.batch_size = 16
        self.learning_rate = 2e-4
        self.epochs = 3
        self.lora_r = 16
        self.lora_alpha = 32
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.output_dir = "outputs/final_model"
        self.qa_model_name = 'all-MiniLM-L6-v2'
        self.qa_similarity_threshold = 0.1

Overwriting core_modules.py


2️⃣ Module Data Processing - data_modules.py

In [52]:
%%writefile data_modules.py
import pandas as pd
from datasets import Dataset
from sklearn.model_selection import train_test_split
import numpy as np
import re
from typing import Tuple

class DataProcessor:
    def __init__(self):
        self.text_col = None
        self.label_col = None
        self.label_mapping = {}
        self.reverse_label_mapping = {}

    def detect_columns(self, df: pd.DataFrame) -> Tuple[str, str]:
        text_keywords = ['text', 'content', 'message', 'comment', 'body', 'description', 'self_text']
        label_keywords = ['label', 'sentiment', 'category', 'class', 'target', 'comment_sentiment']
        text_col = next((c for c in df.columns if any(k in str(c).lower() for k in text_keywords)), None)
        label_col = next((c for c in df.columns if any(k in str(c).lower() for k in label_keywords)), None)
        if not text_col:
            text_col = df.select_dtypes(include=['object']).columns[0]
        if not label_col:
            label_col = df.columns[-1]
        return text_col, label_col

    def clean_text(self, text: str) -> str:
        if pd.isna(text) or str(text).strip().lower() in ['nan', 'none', '', 'null']:
            return None
        text = str(text).strip()
        text = re.sub(r'&gt;|&lt;|&amp;', lambda m: {'&gt;': '>', '&lt;': '<', '&amp;': '&'}[m.group()], text)
        text = re.sub(r'http\S+', '', text)
        text = re.sub(r'\s+', ' ', text)
        return text.strip() if text.strip() else None

    def prepare_datasets(self, df: pd.DataFrame, sample_size: int = 8000) -> Tuple[Dataset, Dataset, Dataset]:
        self.text_col, self.label_col = self.detect_columns(df)
        df_clean = df[[self.text_col, self.label_col]].copy()
        df_clean.columns = ['text', 'label']
        df_clean['text'] = df_clean['text'].apply(self.clean_text)
        df_clean['label'] = df_clean['label'].astype(str)
        df_clean = df_clean.dropna().reset_index(drop=True)
        df_clean = df_clean[df_clean['text'].str.len() >= 10]

        if len(df_clean) > sample_size:
            df_clean = df_clean.sample(n=sample_size, random_state=42)

        unique_labels = sorted(df_clean['label'].unique())
        self.label_mapping = {str(l): i for i, l in enumerate(unique_labels)}
        df_clean['label_id'] = df_clean['label'].map(self.label_mapping)

        # Nettoyage final NaN
        df_clean = df_clean.dropna(subset=['label_id'])
        df_clean['label_id'] = df_clean['label_id'].astype(int)

        if df_clean.empty:
            raise ValueError("❌ Aucune donnée valide après nettoyage.")

        train_df, temp = train_test_split(df_clean, test_size=0.4, stratify=df_clean['label_id'], random_state=42)
        val_df, test_df = train_test_split(temp, test_size=0.5, stratify=temp['label_id'], random_state=42)

        return (
            Dataset.from_pandas(train_df[['text', 'label_id']]),
            Dataset.from_pandas(val_df[['text', 'label_id']]),
            Dataset.from_pandas(test_df[['text', 'label_id']])
        )

Overwriting data_modules.py


3️⃣ Module Modèle - model_modules.py

In [53]:
%%writefile model_modules.py
import os
import logging
import warnings
import torch
import numpy as np
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    DataCollatorWithPadding,
    EarlyStoppingCallback,
)
from peft import LoraConfig, get_peft_model, TaskType
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

logging.getLogger("transformers").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")

class ModelManager:
    def __init__(self, config):
        self.config = config
        self.tokenizer = None
        self.peft_model = None

    def setup_tokenizer(self):
        self.tokenizer = AutoTokenizer.from_pretrained(self.config.model_name)
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        return self.tokenizer

    def setup_model(self, num_labels: int):
        base_model = AutoModelForSequenceClassification.from_pretrained(
            self.config.model_name,
            num_labels=num_labels,
            torch_dtype=torch.float32,
            problem_type="single_label_classification"
        )
        lora_config = LoraConfig(
            task_type=TaskType.SEQ_CLS,
            r=self.config.lora_r,
            lora_alpha=self.config.lora_alpha,
            lora_dropout=0.1,
            target_modules=["q_lin", "v_lin"],
            bias="none",
        )
        self.peft_model = get_peft_model(base_model, lora_config)
        return self.peft_model

    def tokenize_function(self, examples):
        return self.tokenizer(
            examples["text"],
            truncation=True,
            padding=False,
            max_length=self.config.max_length,
        )

    def compute_metrics(self, eval_pred):
        predictions, labels = eval_pred
        preds = np.argmax(predictions, axis=1)
        return {
            "accuracy": accuracy_score(labels, preds),
            "f1_weighted": f1_score(labels, preds, average="weighted", zero_division=0),
            "precision": precision_score(labels, preds, average="weighted", zero_division=0),
            "recall": recall_score(labels, preds, average="weighted", zero_division=0),
        }

    def setup_training_args(self):
        os.makedirs(self.config.output_dir, exist_ok=True)
        return TrainingArguments(
            output_dir=self.config.output_dir,
            num_train_epochs=self.config.epochs,
            per_device_train_batch_size=self.config.batch_size,
            per_device_eval_batch_size=self.config.batch_size * 2,
            learning_rate=self.config.learning_rate,
            warmup_steps=200,
            weight_decay=0.01,
            eval_strategy="epoch",
            save_strategy="epoch",
            logging_strategy="steps",
            logging_steps=50,
            save_steps=500,
            load_best_model_at_end=True,
            metric_for_best_model="eval_accuracy",
            greater_is_better=True,
            fp16=False,
            bf16=False,
            fp16_full_eval=False,
            bf16_full_eval=False,
            save_total_limit=2,
            report_to="none",
            remove_unused_columns=False,
            dataloader_pin_memory=False,
        )

    def setup_trainer(self, train_dataset, val_dataset):
        return Trainer(
            model=self.peft_model,
            args=self.setup_training_args(),
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            tokenizer=self.tokenizer,
            compute_metrics=self.compute_metrics,
            data_collator=DataCollatorWithPadding(tokenizer=self.tokenizer),
            callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],
        )

Overwriting model_modules.py


4. visualization_modules.py

In [54]:
%%writefile visualization_modules.py
# visualization_modules.py
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import streamlit as st
import numpy as np
import os
import json
from sklearn.metrics import classification_report, confusion_matrix
from collections import Counter
import re
from typing import List, Dict

# --- NLTK / BLEU / ROUGE ---
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer

# Téléchargement silencieux des ressources NLTK
nltk.download('punkt_tab', quiet=True)
nltk.download('punkt', quiet=True)

plt.style.use('default')

class VisualizationManager:
    """Gestionnaire de visualisations pour Climate Analyzer."""

    # --------------------------------------------------
    # Outils internes BLEU / ROUGE
    # --------------------------------------------------
    _smoothie = SmoothingFunction().method4
    _rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)

    @staticmethod
    def _bleu(ref: str, hyp: str) -> float:
        """Calcule le BLEU score entre deux textes."""
        ref_tok = nltk.word_tokenize(ref.lower())
        hyp_tok = nltk.word_tokenize(hyp.lower())
        return sentence_bleu([ref_tok], hyp_tok, smoothing_function=VisualizationManager._smoothie)

    @staticmethod
    def _rouge_score(ref: str, hyp: str) -> dict:
        """Calcule les scores ROUGE entre deux textes."""
        scores = VisualizationManager._rouge_scorer.score(ref.lower(), hyp.lower())
        return {'rouge-1': scores['rouge1'].fmeasure,
                'rouge-l': scores['rougeL'].fmeasure}

    # --------------------------------------------------
    # 1) Courbes d’entraînement
    # --------------------------------------------------
    @staticmethod
    def plot_training_curves(log_dir: str = "outputs/final_model"):
        try:
            log_file = os.path.join(log_dir, "trainer_state.json")
            if not os.path.exists(log_file):
                st.warning("📄 Aucun log d'entraînement trouvé.")
                return

            with open(log_file, 'r', encoding='utf-8') as f:
                logs = json.load(f)

            history = logs.get('log_history', [])
            if not history:
                st.warning("📉 Aucune donnée d'historique trouvée.")
                return

            epochs, train_loss, eval_loss, eval_acc, eval_f1 = [], [], [], [], []

            for entry in history:
                if 'eval_loss' in entry:
                    epochs.append(entry.get('epoch', 0))
                    eval_loss.append(entry.get('eval_loss', 0))
                    eval_acc.append(entry.get('eval_accuracy', 0))
                    eval_f1.append(entry.get('eval_f1_weighted', 0))
                elif 'train_loss' in entry:
                    train_loss.append(entry.get('train_loss', 0))

            fig, axes = plt.subplots(2, 2, figsize=(15, 10))
            fig.suptitle("📈 Évolution de l'entraînement", fontsize=16)

            if train_loss and eval_loss:
                train_steps = np.linspace(0, max(epochs) if epochs else 1, len(train_loss))
                axes[0, 0].plot(train_steps, train_loss, 'b-', label='Train Loss', alpha=0.7)
                axes[0, 0].plot(epochs[:len(eval_loss)], eval_loss, 'r-o', label='Eval Loss', markersize=4)
                axes[0, 0].set_title('Loss Evolution')
                axes[0, 0].legend()
                axes[0, 0].grid(True, alpha=0.3)

            if eval_acc:
                axes[0, 1].plot(epochs[:len(eval_acc)], eval_acc, 'g-o', label='Accuracy', markersize=4)
                axes[0, 1].set_title('Accuracy Evolution')
                axes[0, 1].legend()
                axes[0, 1].grid(True, alpha=0.3)

            if eval_f1:
                axes[1, 0].plot(epochs[:len(eval_f1)], eval_f1, 'm-o', label='F1-Weighted', markersize=4)
                axes[1, 0].set_title('F1-Score Evolution')
                axes[1, 0].legend()
                axes[1, 0].grid(True, alpha=0.3)

            if eval_acc and eval_f1:
                final_metrics = ['Accuracy', 'F1-Score']
                final_values = [eval_acc[-1], eval_f1[-1]]
                bars = axes[1, 1].bar(final_metrics, final_values, color=['green', 'purple'], alpha=0.7)
                axes[1, 1].set_title('Final Metrics')
                axes[1, 1].set_ylim(0, 1)
                for bar, value in zip(bars, final_values):
                    axes[1, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                                   f'{value:.3f}', ha='center', va='bottom')

            plt.tight_layout()
            st.pyplot(fig)
        except Exception as e:
            st.error(f"❌ Erreur lors de l'affichage des courbes : {e}")

    # --------------------------------------------------
    # 2) Matrice de confusion
    # --------------------------------------------------
    @staticmethod
    def show_confusion_matrix(trainer, test_dataset, label_names: List[str]):
        try:
            predictions_output = trainer.predict(test_dataset)
            predictions = predictions_output.predictions.argmax(axis=1)
            true_labels = predictions_output.label_ids

            cm = confusion_matrix(true_labels, predictions)

            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                       xticklabels=label_names, yticklabels=label_names, ax=ax1)
            ax1.set_title("Matrice de confusion")
            ax1.set_xlabel("Prédictions")
            ax1.set_ylabel("Vraies valeurs")

            cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
            sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues',
                       xticklabels=label_names, yticklabels=label_names, ax=ax2)
            ax2.set_title("Matrice de confusion (normalisée)")
            plt.tight_layout()
            st.pyplot(fig)

            report = classification_report(true_labels, predictions,
                                         target_names=label_names,
                                         output_dict=True, zero_division=0)
            report_df = pd.DataFrame(report).transpose()
            st.subheader("📊 Rapport de classification")
            st.dataframe(report_df.round(3))
        except Exception as e:
            st.error(f"❌ Erreur lors de l'affichage de la matrice de confusion : {e}")

    # --------------------------------------------------
    # 3) Distribution des classes
    # --------------------------------------------------
    @staticmethod
    def plot_class_distribution(labels, label_names: List[str] = None, title: str = "Distribution des classes"):
        try:
            if hasattr(labels, 'tolist'):
                labels = labels.tolist()
            labels = [int(x) for x in labels]
            label_counts = Counter(labels)

            fig, ax = plt.subplots(figsize=(10, 6))
            if label_names:
                x_labels = [label_names[i] if i < len(label_names) else f"Classe {i}" for i in sorted(label_counts.keys())]
                counts = [label_counts[i] for i in sorted(label_counts.keys())]
            else:
                x_labels = [f"Classe {i}" for i in sorted(label_counts.keys())]
                counts = [label_counts[i] for i in sorted(label_counts.keys())]

            bars = ax.bar(range(len(x_labels)), counts, color=plt.cm.Set3(np.linspace(0, 1, len(x_labels))))
            ax.set_title(title, fontsize=14, fontweight='bold')
            ax.set_xlabel("Classes")
            ax.set_ylabel("Nombre d'échantillons")
            ax.set_xticks(range(len(x_labels)))
            ax.set_xticklabels(x_labels, rotation=45 if max(map(len, x_labels)) > 10 else 0)

            for bar, count in zip(bars, counts):
                ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(counts)*0.01,
                       str(count), ha='center', va='bottom')
            total = sum(counts)
            ax.text(0.02, 0.98, f"Total: {total}\nClasses: {len(x_labels)}",
                   transform=ax.transAxes, va='top', ha='left',
                   bbox=dict(boxstyle="round,pad=0.3", facecolor="lightgray", alpha=0.7))
            plt.tight_layout()
            st.pyplot(fig)
        except Exception as e:
            st.error(f"❌ Erreur lors de l'affichage de la distribution : {e}")

    # --------------------------------------------------
    # 4) Analyse des résultats Q&A
    # --------------------------------------------------
    @staticmethod
    def plot_qa_results_analysis(qa_results: List[Dict], question: str):
        if not qa_results:
            st.info("Aucun résultat à analyser")
            return
        try:
            scores = [r['score'] for r in qa_results]
            ranks = [r['rank'] for r in qa_results]

            fig, axes = plt.subplots(2, 2, figsize=(15, 10))
            fig.suptitle(f"Analyse des résultats pour: '{question[:50]}...'", fontsize=14)

            axes[0, 0].hist(scores, bins=min(10, len(scores)), alpha=0.7, color='skyblue', edgecolor='black')
            axes[0, 0].set_title("Distribution des scores de similarité")
            axes[0, 0].axvline(np.mean(scores), color='red', linestyle='--', label=f'Moyenne: {np.mean(scores):.3f}')
            axes[0, 0].legend()

            axes[0, 1].bar(ranks, scores, color='lightcoral', alpha=0.7)
            axes[0, 1].set_title("Scores par rang")
            axes[0, 1].set_xlabel("Rang")

            text_lengths = [len(r['text']) for r in qa_results]
            axes[1, 0].scatter(text_lengths, scores, alpha=0.6, color='green')
            axes[1, 0].set_title("Score vs Longueur du texte")
            axes[1, 0].set_xlabel("Longueur du texte")

            top_scores = scores[:min(5, len(scores))]
            top_ranks = ranks[:min(5, len(ranks))]
            axes[1, 1].barh(range(len(top_scores)), top_scores, color='purple', alpha=0.7)
            axes[1, 1].set_title("Top 5 des scores")
            axes[1, 1].set_yticks(range(len(top_scores)))
            axes[1, 1].set_yticklabels([f"Rang {r}" for r in top_ranks])

            plt.tight_layout()
            st.pyplot(fig)

            st.subheader("📈 Statistiques détaillées")
            stats_df = pd.DataFrame({
                "Métrique": ["Score moyen", "Score médian", "Score max", "Score min", "Écart-type"],
                "Valeur": [np.mean(scores), np.median(scores), np.max(scores), np.min(scores), np.std(scores)]
            })
            st.dataframe(stats_df.round(4))
        except Exception as e:
            st.error(f"❌ Erreur lors de l'analyse des résultats Q&A : {e}")

    # --------------------------------------------------
    # 5) Méthodes BLEU / ROUGE manquantes
    # --------------------------------------------------
    def calculate_bleu_score(self, reference: str, candidate: str) -> float:
        """Calcule le BLEU score entre deux textes."""
        return self._bleu(reference, candidate)

    def calculate_rouge_score(self, reference: str, candidate: str) -> dict:
        """Calcule les scores ROUGE entre deux textes."""
        return self._rouge_score(reference, candidate)

    def visualize_bleu_rouge_scores(self, qa_results, references):
        """Visualisation BLEU & ROUGE pour chaque paire (ref, résultat)."""
        bleus, r1s, rls = [], [], []
        for ref, res in zip(references, qa_results):
            bleus.append(self._bleu(ref, res['text']))
            r1s.append(self._rouge_score(ref, res['text'])['rouge-1'])
            rls.append(self._rouge_score(ref, res['text'])['rouge-l'])

        x = list(range(1, len(bleus)+1))
        plt.figure(figsize=(10, 4))
        plt.bar([i-0.2 for i in x], bleus, 0.4, label='BLEU')
        plt.bar([i+0.2 for i in x], r1s, 0.4, label='ROUGE-1')
        plt.xlabel('Rang')
        plt.ylabel('Score')
        plt.title('BLEU & ROUGE vs références')
        plt.legend()
        plt.tight_layout()
        st.pyplot(plt.gcf())

    def evaluate_qa_performance(self, qa_module, questions, references):
        """Évaluation complète Q-A avec scores BLEU/ROUGE."""
        bleus, r1s, rls = [], [], []
        for q, ref in zip(questions, references):
            res = qa_module.query_with_fallback(q, top_k=1)
            if res:
                cand = res[0]['text']
                bleus.append(self._bleu(ref, cand))
                r1s.append(self._rouge_score(ref, cand)['rouge-1'])
                rls.append(self._rouge_score(ref, cand)['rouge-l'])

        st.write("### 📊 Global Q-A metrics")
        col1, col2, col3 = st.columns(3)
        col1.metric("Avg BLEU", f"{np.mean(bleus):.4f}")
        col2.metric("Avg ROUGE-1", f"{np.mean(r1s):.4f}")
        col3.metric("Avg ROUGE-L", f"{np.mean(rls):.4f}")

Overwriting visualization_modules.py


5. qa_modules.py

In [55]:
%%writefile qa_modules.py
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import re
from typing import List, Dict, Any

class QAModule:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        self.model_name = model_name
        self.encoder = None
        self.corpus_texts = []
        self.labels = []
        self.corpus_embeddings = None
        self.is_fitted = False

        try:
            print(f"Chargement du modèle de similarité: {model_name}")
            self.encoder = SentenceTransformer(model_name)
            print("✅ Modèle de similarité chargé avec succès")
        except Exception as e:
            print(f"❌ Erreur lors du chargement du modèle de similarité: {e}")
            self.encoder = None

    def fit(self, dataset: List[Dict[str, Any]]):
        """Entraînement du module Q&A avec le dataset"""
        try:
            if self.encoder is None:
                print("❌ Encodeur non disponible pour le module Q&A")
                return False

            print("Initialisation du module Q&A...")

            # Extraction des textes et labels
            self.corpus_texts = []
            self.labels = []

            for item in dataset:
                if 'text' in item and item['text']:
                    text = str(item['text']).strip()
                    if len(text) > 10:  # Filtrer les textes trop courts
                        self.corpus_texts.append(text)
                        self.labels.append(item.get('label_id', 0))

            if not self.corpus_texts:
                print("❌ Aucun texte valide trouvé dans le dataset")
                return False

            print(f"Génération des embeddings pour {len(self.corpus_texts)} textes...")

            # Génération des embeddings
            self.corpus_embeddings = self.encoder.encode(
                self.corpus_texts,
                convert_to_tensor=False,
                show_progress_bar=True,
                batch_size=32
            )

            self.is_fitted = True
            print(f"✅ Module Q&A initialisé avec {len(self.corpus_texts)} textes")
            return True

        except Exception as e:
            print(f"❌ Erreur lors de l'initialisation du module Q&A: {e}")
            return False

    def query(self, question: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """Recherche sémantique"""
        try:
            if not self.is_fitted or self.encoder is None:
                print("Module Q&A non initialisé")
                return []

            if not question.strip():
                return []

            # Génération de l'embedding de la question
            question_embedding = self.encoder.encode([question.strip()], convert_to_tensor=False)

            # Calcul des similarités
            similarities = cosine_similarity(question_embedding, self.corpus_embeddings)[0]

            # Tri des résultats
            top_indices = np.argsort(similarities)[-top_k:][::-1]

            results = []
            for rank, idx in enumerate(top_indices):
                if similarities[idx] > 0:  # Seuil minimal de similarité
                    results.append({
                        "text": self.corpus_texts[idx],
                        "label_id": int(self.labels[idx]),
                        "score": float(similarities[idx]),
                        "rank": rank + 1
                    })

            return results

        except Exception as e:
            print(f"Erreur lors de la recherche sémantique: {e}")
            return []

    def keyword_search(self, question: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """Recherche par mots-clés (fallback)"""
        try:
            if not self.corpus_texts:
                return []

            if not question.strip():
                return []

            # Extraction des mots-clés de la question
            question_words = set(re.findall(r'\b\w+\b', question.lower()))

            if not question_words:
                return []

            scored_results = []

            for i, text in enumerate(self.corpus_texts):
                text_words = set(re.findall(r'\b\w+\b', text.lower()))

                # Score basé sur l'intersection des mots
                common_words = question_words & text_words
                if common_words:
                    score = len(common_words) / max(len(question_words | text_words), 1)

                    scored_results.append({
                        "text": text,
                        "label_id": int(self.labels[i]),
                        "score": score,
                        "rank": 0,
                        "common_words": list(common_words)
                    })

            # Tri par score décroissant
            scored_results.sort(key=lambda x: x['score'], reverse=True)

            # Attribution des rangs et limitation des résultats
            for rank, result in enumerate(scored_results[:top_k]):
                result['rank'] = rank + 1

            return scored_results[:top_k]

        except Exception as e:
            print(f"Erreur lors de la recherche par mots-clés: {e}")
            return []

    def query_with_fallback(self, question: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """Recherche avec fallback automatique"""
        # Tentative de recherche sémantique
        results = self.query(question, top_k)

        # Si pas de résultats satisfaisants, utiliser la recherche par mots-clés
        if not results or not any(r['score'] > 0.1 for r in results):
            print("Fallback vers la recherche par mots-clés")
            results = self.keyword_search(question, top_k)

        return results

    def get_debug_info(self) -> Dict[str, Any]:
        """Informations de debug"""
        return {
            "is_fitted": self.is_fitted,
            "encoder_available": self.encoder is not None,
            "corpus_size": len(self.corpus_texts),
            "embeddings_generated": self.corpus_embeddings is not None,
            "model_name": self.model_name
        }

Overwriting qa_modules.py


4️⃣ Module Knowledge Base - knowledge_modules.py

In [56]:
%%writefile knowledge_modules.py

# knowledge_modules.py
import numpy as np
from typing import List, Optional
import re

class KnowledgeBase:
    """Gestion de la base de connaissances sans sentence-transformers"""

    def __init__(self):
        self.knowledge_base = []
        self.setup_knowledge_base()

    def setup_knowledge_base(self):
        """Configuration de la base de connaissances"""
        self.knowledge_base = [
            "Le réchauffement climatique est principalement causé par les émissions de gaz à effet de serre d'origine humaine.",
            "Les énergies renouvelables comme le solaire et l'éolien sont essentielles pour décarboner notre économie.",
            "La déforestation massive contribue significativement au changement climatique.",
            "Le secteur des transports représente environ 24% des émissions mondiales de gaz à effet de serre.",
            "L'amélioration de l'efficacité énergétique des bâtiments peut réduire jusqu'à 50% de leur consommation.",
            "L'agriculture durable et régénératrice peut séquestrer du carbone tout en produisant de la nourriture.",
            "Les océans absorbent 25% du CO2 atmosphérique mais s'acidifient, menaçant les écosystèmes marins.",
            "Les politiques de taxation du carbone incitent les entreprises à réduire leurs émissions.",
            "L'adaptation au changement climatique est aussi cruciale que l'atténuation des émissions.",
            "Les technologies de capture et stockage du carbone pourraient permettre d'atteindre la neutralité carbone."
        ]
        print("✅ Base de connaissances initialisée avec recherche par mots-clés")

    def find_context(self, query: str, top_k: int = 3) -> List[str]:
        """Recherche de contexte pertinent par similarité textuelle simple"""
        if not query or not self.knowledge_base:
            return []

        try:
            # Nettoyage et tokenisation simple
            query_clean = query.lower()
            query_words = set(re.findall(r'\b\w+\b', query_clean))

            # Score de similarité basé sur les mots communs
            scored_docs = []

            for doc in self.knowledge_base:
                doc_clean = doc.lower()
                doc_words = set(re.findall(r'\b\w+\b', doc_clean))

                # Calcul du score Jaccard
                intersection = len(query_words & doc_words)
                union = len(query_words | doc_words)

                if union > 0:
                    jaccard_score = intersection / union
                    scored_docs.append((doc, jaccard_score))

            # Tri par score décroissant
            scored_docs.sort(key=lambda x: x[1], reverse=True)

            # Retour des top_k documents avec score > 0.1
            relevant_docs = []
            for doc, score in scored_docs[:top_k]:
                if score > 0.1:  # Seuil de pertinence
                    relevant_docs.append(doc)

            return relevant_docs

        except Exception as e:
            print(f"⚠️ Erreur recherche contexte: {e}")
            return []

    def add_knowledge(self, new_knowledge: str):
        """Ajouter une nouvelle connaissance"""
        if new_knowledge and new_knowledge not in self.knowledge_base:
            self.knowledge_base.append(new_knowledge)
            print(f"✅ Nouvelle connaissance ajoutée: {new_knowledge[:50]}...")

    def get_stats(self):
        """Statistiques de la base de connaissances"""
        return {
            "total_documents": len(self.knowledge_base),
            "avg_length": np.mean([len(doc) for doc in self.knowledge_base]) if self.knowledge_base else 0,
        }

Overwriting knowledge_modules.py


5️⃣ Module Streamlit - streamlit_app.py

In [57]:
%%writefile streamlit_app.py
# streamlit_app.py
import streamlit as st
import pandas as pd
import os
import torch
import json
import sys
from datasets import Dataset
from contextlib import contextmanager

st.set_page_config(page_title="🌍 Climate Analyzer", page_icon="🌍", layout="wide")

# ---------------------------------------------------------
# Barre de progression persistante
# ---------------------------------------------------------
@contextmanager
def st_progress(title="Progress", max_value=100):
    bar = st.progress(0, text=title)
    try:
        yield bar
    finally:
        bar.empty()

# Persistance session_state - Initialisation correcte
if "trainer" not in st.session_state:
    st.session_state.trainer = None
if "label_names" not in st.session_state:
    st.session_state.label_names = None
if "test_ds" not in st.session_state:
    st.session_state.test_ds = None
if "training" not in st.session_state:
    st.session_state.training = False
if "raw_train_data" not in st.session_state:
    st.session_state.raw_train_data = None

sys.path.append('/content')
from core_modules import ClimateConfig
from data_modules import DataProcessor
from model_modules import ModelManager
from qa_modules import QAModule
from visualization_modules import VisualizationManager


class ClimateAnalyzerApp:
    def __init__(self):
        self.config = ClimateConfig()
        self.data_processor = DataProcessor()
        self.model_manager = ModelManager(self.config)
        self.qa_module = QAModule()
        self.visualizer = VisualizationManager()
        self.load_saved_model()

    # ------------------------------------------------------------------
    # Chargement automatique du modèle si déjà présent
    # ------------------------------------------------------------------
    def load_saved_model(self):
        if st.session_state.trainer is None and os.path.exists("outputs/final_model/config.json"):
            try:
                self.model_manager.setup_tokenizer()
                num_labels = len(self.data_processor.label_mapping) or 2
                self.model_manager.setup_model(num_labels)
                trainer = self.model_manager.setup_trainer(None, None)
                trainer.model = trainer.model.from_pretrained("outputs/final_model")
                st.session_state.trainer = trainer
                st.session_state.label_names = list(self.data_processor.label_mapping.keys())
                st.success("✅ Modèle chargé depuis le disque.")
            except Exception as e:
                st.warning(f"⚠️ Chargement impossible : {e}")

    # ------------------------------------------------------------------
    # MENU PRINCIPAL
    # ------------------------------------------------------------------
    def run(self):
        st.title("🌍 Climate Sentiment Analyzer")
        mode = st.sidebar.selectbox(
            "Mode",
            ["🚀 Pipeline Complet", "❓ Q&A", "📈 Visualisations"]
        )

        if mode == "🚀 Pipeline Complet":
            self.run_complete_pipeline()
        elif mode == "❓ Q&A":
            self.run_qa_interface()
        elif mode == "📈 Visualisations":
            self.run_visualizations()

    # ------------------------------------------------------------------
    # 1) PIPELINE COMPLET
    # ------------------------------------------------------------------
    def run_complete_pipeline(self):
        uploaded_file = st.file_uploader("Téléchargez votre CSV", type=["csv"])
        if uploaded_file:
            df = pd.read_csv(uploaded_file)
            st.dataframe(df.head())

            # SLIDERS
            sample_size = st.slider("Taille échantillon", 1000, 10000, value=4000)
            self.config.epochs = st.slider("Epochs", 1, 5, value=3)

            # Vérification de l'état d'entraînement
            is_training = st.session_state.get("training", False)
            if is_training is None:
                is_training = False
                st.session_state.training = False

            if st.button(
                "🚀 Lancer l'entraînement",
                type="primary",
                disabled=bool(is_training)
            ):
                st.session_state.training = True
                try:
                    self.train_pipeline(df, sample_size)
                finally:
                    st.session_state.training = False

    def train_pipeline(self, df: pd.DataFrame, sample_size: int):
        try:
            # 1/4 — Analyse des données
            with st_progress("1/4  Analyse des données …") as bar:
                train_ds, val_ds, test_ds = self.data_processor.prepare_datasets(df, sample_size)
                bar.progress(25)

            # CORRECTION: Sauvegarder les données brutes avant tokenisation
            raw_train_data = []
            for item in train_ds:
                raw_train_data.append({
                    "text": item["text"],
                    "label_id": item["label_id"]
                })
            st.session_state.raw_train_data = raw_train_data

            # 2/4 — Tokenizer
            with st_progress("2/4  Chargement du tokenizer …") as bar:
                self.model_manager.setup_tokenizer()
                bar.progress(50)

            # 3/4 — Modèle
            with st_progress("3/4  Initialisation du modèle …") as bar:
                num_labels = len(self.data_processor.label_mapping)
                self.model_manager.setup_model(num_labels)
                bar.progress(75)

            # 4/4 — Tokenisation + format torch
            def prep(ds):
                with st_progress("4/4  Tokenisation …") as bar:
                    ds = ds.map(
                        self.model_manager.tokenize_function,
                        batched=True,
                        desc="Tokenisation"
                    )
                    ds = ds.rename_column("label_id", "labels")
                    # Garder seulement les colonnes nécessaires
                    keep = {"input_ids", "attention_mask", "labels"}
                    for col in list(ds.column_names):
                        if col not in keep:
                            ds = ds.remove_columns(col)
                    ds.set_format(type="torch", columns=list(keep))
                    bar.progress(100)
                    return ds

            train_ds_processed, val_ds_processed, test_ds_processed = map(prep, (train_ds, val_ds, test_ds))

            trainer = self.model_manager.setup_trainer(train_ds_processed, val_ds_processed)

            with st.spinner("Entraînement en cours … (cela peut prendre quelques minutes)"):
                trainer.train()

            trainer.save_model("outputs/final_model")
            trainer.state.save_to_json("outputs/final_model/trainer_state.json")

            # CORRECTION: Utiliser les données brutes sauvegardées pour Q&A
            if st.session_state.raw_train_data:
                self.qa_module.fit(st.session_state.raw_train_data)

            st.session_state.trainer = trainer
            st.session_state.label_names = list(self.data_processor.label_mapping.keys())
            st.session_state.test_ds = test_ds_processed
            st.success("🎉 Entraînement terminé !")
            st.balloons()

        except Exception as e:
            st.error(f"❌ Erreur : {e}")
            import traceback
            st.error(f"Détail: {traceback.format_exc()}")
            st.session_state.training = False

    # ------------------------------------------------------------------
    # 2) INTERFACE Q&A
    # ------------------------------------------------------------------
    def run_qa_interface(self):
        st.header("❓ Interface Q&A")
        if st.session_state.trainer is None:
            st.warning("⚠️ Aucun modèle entraîné.")
            return

        question = st.text_input("Posez votre question :")
        mode = st.selectbox("Mode de recherche", ["Auto", "Sémantique", "Mots-clés"])
        top_k = st.slider("Résultats", 1, 10, value=5)

        if question:
            try:
                if mode == "Sémantique":
                    res = self.qa_module.query(question, top_k)
                elif mode == "Mots-clés":
                    res = self.qa_module.keyword_search(question, top_k)
                else:
                    res = self.qa_module.query_with_fallback(question, top_k)

                if res:
                    for r in res:
                        with st.expander(f"Score: {r['score']:.3f}"):
                            st.write(r["text"])
                else:
                    st.info("Aucun résultat trouvé.")

            except Exception as e:
                st.error(f"❌ Erreur lors de la recherche : {e}")

    # ------------------------------------------------------------------
    # 3) VISUALISATIONS
    # ------------------------------------------------------------------
    def run_visualizations(self):
        st.header("📈 Visualisations")
        if st.session_state.trainer is None:
            st.warning("⚠️ Aucun modèle entraîné.")
            return

        viz = st.selectbox(
            "Choisir",
            ["Distribution des classes", "Matrice de confusion", "Courbes d'entraînement",
             "📊 Métriques BLEU/ROUGE", "🔍 Évaluation Q&A"]
        )
        test_ds = st.session_state.get("test_ds")
        label_names = st.session_state.get("label_names")

        try:
            if viz == "Distribution des classes" and test_ds:
                self.visualizer.plot_class_distribution(test_ds["labels"], label_names)
            elif viz == "Matrice de confusion" and test_ds:
                self.visualizer.show_confusion_matrix(st.session_state.trainer, test_ds, label_names)
            elif viz == "Courbes d'entraînement":
                self.visualizer.plot_training_curves("outputs/final_model")
            elif viz == "📊 Métriques BLEU/ROUGE":
                self.run_bleu_rouge_analysis()
            elif viz == "🔍 Évaluation Q&A":
                self.run_qa_evaluation()
        except Exception as e:
            st.error(f"❌ Erreur lors de la visualisation : {e}")
            import traceback
            st.error(f"Détail: {traceback.format_exc()}")

    def run_bleu_rouge_analysis(self):
        """Interface pour l'analyse BLEU/ROUGE"""
        st.subheader("📊 Analyse des métriques BLEU et ROUGE")

        # Section 1: Test avec textes personnalisés
        st.write("### 🧪 Test avec vos propres textes")

        col1, col2 = st.columns(2)
        with col1:
            reference_text = st.text_area(
                "Texte de référence:",
                "Le réchauffement climatique est un phénomène global causé par les activités humaines.",
                height=100
            )

        with col2:
            candidate_text = st.text_area(
                "Texte candidat:",
                "Le changement climatique est un problème mondial dû aux actions humaines.",
                height=100
            )

        if st.button("Calculer les scores"):
            bleu_score = self.visualizer.calculate_bleu_score(reference_text, candidate_text)
            rouge_scores = self.visualizer.calculate_rouge_score(reference_text, candidate_text)

            col1, col2, col3 = st.columns(3)
            with col1:
                st.metric("BLEU Score", f"{bleu_score:.4f}")
            with col2:
                st.metric("ROUGE-1", f"{rouge_scores['rouge-1']:.4f}")
            with col3:
                st.metric("ROUGE-L", f"{rouge_scores['rouge-l']:.4f}")

        # Section 2: Analyse des résultats Q&A
        st.write("### 🔍 Analyse des résultats de recherche")

        if st.session_state.raw_train_data:
            test_question = st.text_input(
                "Question de test:",
                "Quelles sont les causes du réchauffement climatique ?"
            )

            if test_question and st.button("Analyser la question"):
                # Obtenir les résultats de la recherche
                results = self.qa_module.query_with_fallback(test_question, top_k=5)

                if results:
                    # Prendre les premiers textes comme références
                    sample_references = [item['text'] for item in st.session_state.raw_train_data[:len(results)]]

                    # Visualiser les métriques
                    self.visualizer.visualize_bleu_rouge_scores(results, sample_references)

                    # Afficher les résultats détaillés
                    st.subheader("📋 Résultats détaillés")
                    for i, result in enumerate(results):
                        with st.expander(f"Résultat {i+1} - Score: {result['score']:.3f}"):
                            st.write("**Texte trouvé:**")
                            st.write(result['text'])

                            if i < len(sample_references):
                                bleu = self.visualizer.calculate_bleu_score(sample_references[i], result['text'])
                                rouge = self.visualizer.calculate_rouge_score(sample_references[i], result['text'])

                                col1, col2, col3 = st.columns(3)
                                with col1:
                                    st.metric("BLEU", f"{bleu:.4f}")
                                with col2:
                                    st.metric("ROUGE-1", f"{rouge['rouge-1']:.4f}")
                                with col3:
                                    st.metric("ROUGE-L", f"{rouge['rouge-l']:.4f}")
                else:
                    st.info("Aucun résultat trouvé pour cette question.")
        else:
            st.info("Entraînez d'abord un modèle pour utiliser cette fonctionnalité.")

    def run_qa_evaluation(self):
        """Interface pour l'évaluation complète du système Q&A"""
        st.subheader("🔍 Évaluation complète du système Q&A")

        if not st.session_state.raw_train_data:
            st.info("Entraînez d'abord un modèle pour utiliser cette fonctionnalité.")
            return

        # Questions de test prédéfinies pour le climat
        default_questions = [
            "Quelles sont les principales causes du réchauffement climatique ?",
            "Comment les énergies renouvelables peuvent-elles aider ?",
            "Quel est l'impact de la déforestation sur le climat ?",
            "Comment réduire les émissions de gaz à effet de serre ?",
            "Que peut-on faire pour s'adapter au changement climatique ?"
        ]

        # Interface pour personnaliser les questions
        st.write("### 📝 Questions de test")

        use_default = st.checkbox("Utiliser les questions prédéfinies", value=True)

        if use_default:
            test_questions = default_questions
            st.write("Questions utilisées:")
            for i, q in enumerate(test_questions, 1):
                st.write(f"{i}. {q}")
        else:
            test_questions = []
            num_questions = st.number_input("Nombre de questions", min_value=1, max_value=10, value=3)

            for i in range(num_questions):
                question = st.text_input(f"Question {i+1}:", key=f"q_{i}")
                if question:
                    test_questions.append(question)

        # Évaluation
        if test_questions and st.button("🚀 Lancer l'évaluation"):

            # Générer des références basées sur les données d'entraînement
            reference_answers = []
            for question in test_questions:
                # Trouver les textes les plus pertinents comme références
                results = self.qa_module.query_with_fallback(question, top_k=1)
                if results:
                    reference_answers.append(results[0]['text'])
                else:
                    # Utiliser un texte aléatoire des données d'entraînement
                    import random
                    reference_answers.append(random.choice(st.session_state.raw_train_data)['text'])

            # Lancer l'évaluation
            self.visualizer.evaluate_qa_performance(
                self.qa_module,
                test_questions,
                reference_answers
            )


if __name__ == "__main__":
    app = ClimateAnalyzerApp()
    app.run()

Overwriting streamlit_app.py


6️⃣ Script d'Installation - setup_pipeline.py

In [58]:
%%writefile setup_pipeline.py
import subprocess
import sys

def install_dependencies():
    packages = [
        "transformers>=4.36.0",
        "datasets>=2.16.0",
        "torch>=2.1.0",
        "peft>=0.7.0",
        "sentence-transformers>=2.2.0",
        "faiss-cpu>=1.7.0",
        "streamlit>=1.29.0",
        "plotly>=5.17.0",
        "scikit-learn>=1.3.0",
        "matplotlib>=3.7.0",
        "seaborn>=0.12.0",
        "pandas>=1.5.0",
        "numpy>=1.24.0",
        "rouge-score",
        "nltk"
    ]

    for package in packages:
        cmd = [sys.executable, "-m", "pip", "install", package]
        try:
            subprocess.check_call(cmd)
            print(f"✅ {package} installé")
        except subprocess.CalledProcessError as e:
            print(f"⚠️ Erreur avec {package}: {e}")

    # Téléchargement des ressources NLTK
    import nltk
    nltk.download('punkt_tab', quiet=True)
    nltk.download('punkt', quiet=True)
    print("✅ Ressources NLTK prêtes")

if __name__ == "__main__":
    install_dependencies()

Overwriting setup_pipeline.py


In [59]:
!python setup_pipeline.py

✅ transformers>=4.36.0 installé
✅ datasets>=2.16.0 installé
✅ torch>=2.1.0 installé
✅ peft>=0.7.0 installé
✅ sentence-transformers>=2.2.0 installé
✅ faiss-cpu>=1.7.0 installé
✅ streamlit>=1.29.0 installé
✅ plotly>=5.17.0 installé
✅ scikit-learn>=1.3.0 installé
✅ matplotlib>=3.7.0 installé
✅ seaborn>=0.12.0 installé
✅ pandas>=1.5.0 installé
✅ numpy>=1.24.0 installé
✅ rouge-score installé
✅ nltk installé
✅ Ressources NLTK prêtes


In [60]:
!pip install streamlit



In [61]:
!pip install pyngrok



In [63]:
# 🔧 Lancement Streamlit + ngrok (version corrigée)
import subprocess
import time
from pyngrok import ngrok

# 1️⃣ Token ngrok
TOKEN = "30Nciu2LDo3NzmKva2zibt2sCFL_7Ag5r9kUYyBCha12WSZ3"
!ngrok authtoken {TOKEN}

# 2️⃣ Lancer l'application principale
subprocess.Popen(
    ["streamlit", "run", "streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"],
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL
)

# 3️⃣ Attendre et créer le tunnel
time.sleep(5)
public_url = ngrok.connect(8501)
print("🚀 Interface Streamlit disponible à :")
print(public_url)

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml
🚀 Interface Streamlit disponible à :
NgrokTunnel: "https://ca05fe247825.ngrok-free.app" -> "http://localhost:8501"
