In [1]:
import os
import numpy as np
import pandas as pd
from collections import defaultdict
from typing import List, Dict, Tuple
import re
import json

In [5]:
import sys
!{sys.executable} -m pip install nltk


Collecting nltk
  Downloading nltk-3.9.2-py3-none-any.whl (1.5 MB)
                                              0.0/1.5 MB ? eta -:--:--
                                              0.0/1.5 MB ? eta -:--:--
                                              0.0/1.5 MB ? eta -:--:--
     -                                        0.1/1.5 MB 1.9 MB/s eta 0:00:01
     --                                       0.1/1.5 MB 1.3 MB/s eta 0:00:02
     ---                                      0.1/1.5 MB 901.1 kB/s eta 0:00:02
     ---                                      0.1/1.5 MB 774.0 kB/s eta 0:00:02
     ----                                     0.2/1.5 MB 702.7 kB/s eta 0:00:02
     ----                                     0.2/1.5 MB 702.7 kB/s eta 0:00:02
     ----                                     0.2/1.5 MB 581.0 kB/s eta 0:00:03
     ----                                     0.2/1.5 MB 581.0 kB/s eta 0:00:03
     -----                                    0.2/1.5 MB 452.9 kB/s eta 0:00:03
    


[notice] A new release of pip is available: 23.1.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
pip install rouge_score

Collecting rouge_score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: rouge_score
  Building wheel for rouge_score (setup.py): started
  Building wheel for rouge_score (setup.py): finished with status 'done'
  Created wheel for rouge_score: filename=rouge_score-0.1.2-py3-none-any.whl size=24972 sha256=2602ea1f5f6aca5bfc2a61aee731dff356aa31d54ddf7c6f3bb0fdf0bed5d7d5
  Stored in directory: c:\users\asus\appdata\local\pip\cache\wheels\1e\19\43\8a442dc83660ca25e163e1bd1f89919284ab0d0c1475475148
Successfully built rouge_score
Installing collected packages: rouge_score
Successfully installed rouge_score-0.1.2
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.1.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [9]:
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from rouge_score import rouge_scorer

In [10]:
class BLEUScorer:
    
    def __init__(self):
        self.smoothing = SmoothingFunction()
    
    def compute_sentence_bleu(self, reference: List[str], hypothesis: str, n: int = 4) -> float:
        # Tokenize
        ref_tokens = [ref.split() for ref in reference]
        hyp_tokens = hypothesis.split()
        
        # Set weights based on n
        if n == 1:
            weights = (1.0, 0, 0, 0)
        elif n == 2:
            weights = (0.5, 0.5, 0, 0)
        elif n == 3:
            weights = (0.33, 0.33, 0.33, 0)
        else:  # n == 4
            weights = (0.25, 0.25, 0.25, 0.25)
        
        # Compute BLEU
        score = sentence_bleu(
            ref_tokens, 
            hyp_tokens, 
            weights=weights,
            smoothing_function=self.smoothing.method1
        )
        
        return score
    
    def compute_corpus_bleu(self, references: List[List[str]], hypotheses: List[str]) -> Dict[str, float]:
        # Tokenize all
        refs_tokens = [[ref.split() for ref in refs] for refs in references]
        hyps_tokens = [hyp.split() for hyp in hypotheses]
        
        results = {}
        
        # BLEU-1
        results['BLEU-1'] = corpus_bleu(
            refs_tokens, hyps_tokens, 
            weights=(1.0, 0, 0, 0),
            smoothing_function=self.smoothing.method1
        )
        
        # BLEU-2
        results['BLEU-2'] = corpus_bleu(
            refs_tokens, hyps_tokens, 
            weights=(0.5, 0.5, 0, 0),
            smoothing_function=self.smoothing.method1
        )
        
        # BLEU-3
        results['BLEU-3'] = corpus_bleu(
            refs_tokens, hyps_tokens, 
            weights=(0.33, 0.33, 0.33, 0),
            smoothing_function=self.smoothing.method1
        )
        
        # BLEU-4
        results['BLEU-4'] = corpus_bleu(
            refs_tokens, hyps_tokens, 
            weights=(0.25, 0.25, 0.25, 0.25),
            smoothing_function=self.smoothing.method1
        )
        
        return results

In [11]:
class METEORScorer:
    
    def compute_sentence_meteor(self, reference: List[str], hypothesis: str) -> float:
        """Compute METEOR for single sentence"""
        # METEOR expects single reference and hypothesis as strings
        # We'll average over multiple references
        scores = []
        for ref in reference:
            score = meteor_score([ref.split()], hypothesis.split())
            scores.append(score)
        
        return np.mean(scores)
    
    def compute_corpus_meteor(self, references: List[List[str]], hypotheses: List[str]) -> float:
        """Compute METEOR for entire corpus"""
        scores = []
        
        for refs, hyp in zip(references, hypotheses):
            score = self.compute_sentence_meteor(refs, hyp)
            scores.append(score)
        
        return np.mean(scores)

In [12]:
class ROUGEScorer:
    """
    ROUGE-L: Measures longest common subsequence
    Focus on recall rather than precision
    
    Range: 0-1 (higher is better)
    Returns: Precision, Recall, F1-score
    Good F1 score: > 0.4
    """
    
    def __init__(self):
        self.scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
    
    def compute_sentence_rouge(self, reference: List[str], hypothesis: str) -> Dict[str, float]:
        """Compute ROUGE-L for single sentence"""
        scores_list = []
        
        for ref in reference:
            score = self.scorer.score(ref, hypothesis)
            scores_list.append({
                'precision': score['rougeL'].precision,
                'recall': score['rougeL'].recall,
                'f1': score['rougeL'].fmeasure
            })
        
        # Average over references
        avg_scores = {
            'precision': np.mean([s['precision'] for s in scores_list]),
            'recall': np.mean([s['recall'] for s in scores_list]),
            'f1': np.mean([s['f1'] for s in scores_list])
        }
        
        return avg_scores
    
    def compute_corpus_rouge(self, references: List[List[str]], hypotheses: List[str]) -> Dict[str, float]:
        """Compute ROUGE-L for entire corpus"""
        all_precisions = []
        all_recalls = []
        all_f1s = []
        
        for refs, hyp in zip(references, hypotheses):
            scores = self.compute_sentence_rouge(refs, hyp)
            all_precisions.append(scores['precision'])
            all_recalls.append(scores['recall'])
            all_f1s.append(scores['f1'])
        
        return {
            'ROUGE-L-P': np.mean(all_precisions),
            'ROUGE-L-R': np.mean(all_recalls),
            'ROUGE-L-F1': np.mean(all_f1s)
        }

In [13]:
class CIDErScorer:
    """
    CIDEr: Specialized metric for image captioning
    Measures consensus between generated caption and human captions
    Uses TF-IDF weighting
    
    Range: 0-10+ (higher is better)
    Good score: > 0.8
    """
    
    def __init__(self, n=4, sigma=6.0):
        """
        Args:
            n: max n-gram order
            sigma: standard deviation for Gaussian penalty
        """
        self.n = n
        self.sigma = sigma
    
    def _compute_doc_freq(self, refs_ngrams):
        """Compute document frequency for IDF"""
        doc_freq = defaultdict(int)
        
        for ngrams_dict in refs_ngrams:
            for ngram in ngrams_dict.keys():
                doc_freq[ngram] += 1
        
        return doc_freq
    
    def _get_ngrams(self, tokens, n):
        """Get n-grams from tokens"""
        ngrams = defaultdict(int)
        
        for i in range(len(tokens) - n + 1):
            ngram = tuple(tokens[i:i+n])
            ngrams[ngram] += 1
        
        return ngrams
    
    def compute_cider(self, references: List[List[str]], hypotheses: List[str]) -> float:
        """
        Compute CIDEr score
        
        This is a simplified implementation. For exact CIDEr scores,
        use the official pycocoevalcap package.
        """
        scores = []
        
        # Get all n-grams
        all_refs_ngrams = []
        all_hyps_ngrams = []
        
        for refs, hyp in zip(references, hypotheses):
            # Tokenize
            ref_tokens_list = [ref.split() for ref in refs]
            hyp_tokens = hyp.split()
            
            # Compute n-grams for references
            refs_ngrams = []
            for ref_tokens in ref_tokens_list:
                ngrams_dict = {}
                for n in range(1, self.n + 1):
                    ngrams_dict.update(self._get_ngrams(ref_tokens, n))
                refs_ngrams.append(ngrams_dict)
            
            # Compute n-grams for hypothesis
            hyp_ngrams = {}
            for n in range(1, self.n + 1):
                hyp_ngrams.update(self._get_ngrams(hyp_tokens, n))
            
            all_refs_ngrams.append(refs_ngrams)
            all_hyps_ngrams.append(hyp_ngrams)
        
        # Compute document frequencies
        all_ngrams = []
        for refs_ngrams in all_refs_ngrams:
            for ngrams_dict in refs_ngrams:
                all_ngrams.append(ngrams_dict)
        
        doc_freq = self._compute_doc_freq(all_ngrams)
        num_docs = len(all_ngrams)
        
        # Compute CIDEr for each hypothesis
        for refs_ngrams, hyp_ngrams in zip(all_refs_ngrams, all_hyps_ngrams):
            # Compute TF-IDF vectors
            vec_hyp = {}
            vec_refs = []
            
            # Hypothesis vector
            for ngram, count in hyp_ngrams.items():
                tf = count / len(hyp_ngrams)
                idf = np.log((num_docs + 1) / (doc_freq[ngram] + 1))
                vec_hyp[ngram] = tf * idf
            
            # Reference vectors (average)
            for ref_ngrams in refs_ngrams:
                vec_ref = {}
                for ngram, count in ref_ngrams.items():
                    tf = count / len(ref_ngrams)
                    idf = np.log((num_docs + 1) / (doc_freq[ngram] + 1))
                    vec_ref[ngram] = tf * idf
                vec_refs.append(vec_ref)
            
            # Compute cosine similarity
            similarities = []
            for vec_ref in vec_refs:
                # Dot product
                dot_product = sum(vec_hyp.get(k, 0) * v for k, v in vec_ref.items())
                
                # Norms
                norm_hyp = np.sqrt(sum(v**2 for v in vec_hyp.values()))
                norm_ref = np.sqrt(sum(v**2 for v in vec_ref.values()))
                
                if norm_hyp > 0 and norm_ref > 0:
                    sim = dot_product / (norm_hyp * norm_ref)
                else:
                    sim = 0.0
                
                similarities.append(sim)
            
            # Average similarity
            score = np.mean(similarities) * 10.0  # Scale to 0-10
            scores.append(score)
        
        return np.mean(scores)


In [14]:
class CaptionEvaluator:
    """
    Complete evaluator for image captioning
    Computes all major metrics
    """
    
    def __init__(self):
        self.bleu_scorer = BLEUScorer()
        self.meteor_scorer = METEORScorer()
        self.rouge_scorer = ROUGEScorer()
        self.cider_scorer = CIDErScorer()
    
    def evaluate(self, references: List[List[str]], hypotheses: List[str], 
                 verbose: bool = True) -> Dict[str, float]:
        """
        Evaluate generated captions against references
        
        Args:
            references: List of [list of reference captions] for each image
            hypotheses: List of generated captions (one per image)
            verbose: Print results
        
        Returns:
            Dictionary with all metric scores
        """
        if len(references) != len(hypotheses):
            raise ValueError("Number of references and hypotheses must match")
        
        results = {}
        
        # 1. BLEU scores
        if verbose:
            print("Computing BLEU scores...")
        bleu_scores = self.bleu_scorer.compute_corpus_bleu(references, hypotheses)
        results.update(bleu_scores)
        
        # 2. METEOR score
        if verbose:
            print("Computing METEOR score...")
        meteor = self.meteor_scorer.compute_corpus_meteor(references, hypotheses)
        results['METEOR'] = meteor
        
        # 3. ROUGE-L score
        if verbose:
            print("Computing ROUGE-L scores...")
        rouge_scores = self.rouge_scorer.compute_corpus_rouge(references, hypotheses)
        results.update(rouge_scores)
        
        # 4. CIDEr score
        if verbose:
            print("Computing CIDEr score...")
        cider = self.cider_scorer.compute_cider(references, hypotheses)
        results['CIDEr'] = cider
        
        if verbose:
            print("\n" + "="*60)
            print("EVALUATION RESULTS")
            print("="*60)
            print(f"Number of samples: {len(hypotheses)}")
            print()
            print("BLEU Scores:")
            print(f"  BLEU-1:  {results['BLEU-1']:.4f}")
            print(f"  BLEU-2:  {results['BLEU-2']:.4f}")
            print(f"  BLEU-3:  {results['BLEU-3']:.4f}")
            print(f"  BLEU-4:  {results['BLEU-4']:.4f}")
            print()
            print(f"METEOR:   {results['METEOR']:.4f}")
            print()
            print("ROUGE-L Scores:")
            print(f"  Precision: {results['ROUGE-L-P']:.4f}")
            print(f"  Recall:    {results['ROUGE-L-R']:.4f}")
            print(f"  F1-Score:  {results['ROUGE-L-F1']:.4f}")
            print()
            print(f"CIDEr:    {results['CIDEr']:.4f}")
            print("="*60)
        
        return results
    
    def evaluate_single(self, references: List[str], hypothesis: str) -> Dict[str, float]:
        """Evaluate single caption"""
        results = {}
        
        # BLEU
        for n in [1, 2, 3, 4]:
            score = self.bleu_scorer.compute_sentence_bleu(references, hypothesis, n)
            results[f'BLEU-{n}'] = score
        
        # METEOR
        results['METEOR'] = self.meteor_scorer.compute_sentence_meteor(references, hypothesis)
        
        # ROUGE-L
        rouge = self.rouge_scorer.compute_sentence_rouge(references, hypothesis)
        results['ROUGE-L-F1'] = rouge['f1']
        
        return results

In [15]:
def evaluate_model(model, test_images: List[str], caption_dict: Dict[str, List[str]], 
                   img_folder: str, tokenizer, config, 
                   use_beam_search: bool = False) -> Dict[str, float]:
    """
    Evaluate model on test set
    
    Args:
        model: Trained captioning model
        test_images: List of test image names
        caption_dict: Dictionary mapping image names to reference captions
        img_folder: Folder containing images
        tokenizer: Fitted tokenizer
        config: Configuration object
        use_beam_search: Use beam search for generation
    
    Returns:
        Dictionary with all evaluation metrics
    """
    from image_captioning import generate_caption, generate_beam_search
    
    print(f"Evaluating on {len(test_images)} test images...")
    
    references = []
    hypotheses = []
    
    for i, img_name in enumerate(test_images):
        if (i + 1) % 50 == 0:
            print(f"Progress: {i+1}/{len(test_images)}")
        
        img_path = os.path.join(img_folder, img_name)
        
        if not os.path.exists(img_path):
            continue
        
        # Get reference captions
        refs = caption_dict[img_name]
        references.append(refs)
        
        # Generate caption
        try:
            if use_beam_search:
                hyp = generate_beam_search(model, img_path, tokenizer, config)
            else:
                hyp = generate_caption(model, img_path, tokenizer, config)
            hypotheses.append(hyp)
        except Exception as e:
            print(f"Error generating caption for {img_name}: {e}")
            hypotheses.append("")  # Empty caption for failed generation
    
    # Evaluate
    evaluator = CaptionEvaluator()
    results = evaluator.evaluate(references, hypotheses, verbose=True)
    
    return results, references, hypotheses

In [16]:
def analyze_predictions(references: List[List[str]], hypotheses: List[str], 
                        image_names: List[str] = None, n_samples: int = 10):
    """
    Show qualitative examples of predictions
    
    Args:
        references: Reference captions
        hypotheses: Generated captions
        image_names: Optional image names
        n_samples: Number of samples to show
    """
    evaluator = CaptionEvaluator()
    
    print("\n" + "="*80)
    print("QUALITATIVE ANALYSIS")
    print("="*80)
    
    # Select random samples
    indices = np.random.choice(len(hypotheses), min(n_samples, len(hypotheses)), replace=False)
    
    for idx in indices:
        refs = references[idx]
        hyp = hypotheses[idx]
        
        # Compute metrics for this sample
        scores = evaluator.evaluate_single(refs, hyp)
        
        print(f"\nSample {idx+1}")
        if image_names:
            print(f"Image: {image_names[idx]}")
        print("-" * 80)
        print("Reference Captions:")
        for i, ref in enumerate(refs, 1):
            print(f"  {i}. {ref}")
        print(f"\nGenerated Caption:")
        print(f"  â†’ {hyp}")
        print(f"\nScores:")
        print(f"  BLEU-4: {scores['BLEU-4']:.4f}")
        print(f"  METEOR: {scores['METEOR']:.4f}")
        print(f"  ROUGE-L: {scores['ROUGE-L-F1']:.4f}")
        print("=" * 80)

In [17]:
def save_evaluation_results(results: Dict[str, float], references: List[List[str]], 
                            hypotheses: List[str], image_names: List[str] = None,
                            output_dir: str = "evaluation_results"):
    """Save evaluation results to files"""
    os.makedirs(output_dir, exist_ok=True)
    
    # 1. Save metrics
    with open(os.path.join(output_dir, "metrics.json"), 'w') as f:
        json.dump(results, f, indent=2)
    
    # 2. Save predictions
    predictions_data = []
    for i, (refs, hyp) in enumerate(zip(references, hypotheses)):
        entry = {
            'id': i,
            'references': refs,
            'hypothesis': hyp
        }
        if image_names:
            entry['image'] = image_names[i]
        predictions_data.append(entry)
    
    with open(os.path.join(output_dir, "predictions.json"), 'w') as f:
        json.dump(predictions_data, f, indent=2)
    
    # 3. Save as CSV for easy viewing
    df = pd.DataFrame({
        'image': image_names if image_names else range(len(hypotheses)),
        'generated': hypotheses,
        'reference_1': [refs[0] if len(refs) > 0 else "" for refs in references],
        'reference_2': [refs[1] if len(refs) > 1 else "" for refs in references],
    })
    df.to_csv(os.path.join(output_dir, "predictions.csv"), index=False)
    
    print(f"\nResults saved to {output_dir}/")

In [18]:
def example_usage():
    """Example of how to use the evaluation functions"""
    
    # Example data
    references = [
        ["a dog playing in the park", "a brown dog running on grass"],
        ["a cat sitting on a chair", "an orange cat on furniture"],
        ["a car on the street", "a red vehicle on the road"]
    ]
    
    hypotheses = [
        "a dog running in a park",
        "a cat sitting on a chair",
        "a red car on the street"
    ]
    
    # Evaluate
    evaluator = CaptionEvaluator()
    results = evaluator.evaluate(references, hypotheses, verbose=True)
    
    # Show qualitative analysis
    analyze_predictions(references, hypotheses, n_samples=3)
    
    return results

In [19]:
def main_evaluation(model_path: str, test_images: List[str], 
                    caption_dict: Dict[str, List[str]], img_folder: str,
                    tokenizer_path: str, config, use_beam_search: bool = True):
    """
    Complete evaluation pipeline
    
    Args:
        model_path: Path to saved model
        test_images: List of test image filenames
        caption_dict: Dictionary of captions
        img_folder: Folder containing images
        tokenizer_path: Path to saved tokenizer
        config: Configuration object
        use_beam_search: Use beam search for generation
    """
    import tensorflow as tf
    import pickle
    
    # Load model and tokenizer
    print("Loading model and tokenizer...")
    model = tf.keras.models.load_model(model_path)
    
    with open(tokenizer_path, 'rb') as f:
        tokenizer = pickle.load(f)
    
    # Evaluate
    results, references, hypotheses = evaluate_model(
        model, test_images, caption_dict, img_folder, 
        tokenizer, config, use_beam_search
    )
    
    # Qualitative analysis
    analyze_predictions(references, hypotheses, test_images, n_samples=10)
    
    # Save results
    save_evaluation_results(results, references, hypotheses, test_images)
    
    return results

In [20]:
if __name__ == "__main__":
    # Simple test
    print("Testing evaluation metrics...\n")
    example_usage()

Testing evaluation metrics...

Computing BLEU scores...
Computing METEOR score...


LookupError: 
**********************************************************************
  Resource [93mwordnet[0m not found.
  Please use the NLTK Downloader to obtain the resource:

  [31m>>> import nltk
  >>> nltk.download('wordnet')
  [0m
  For more information see: https://www.nltk.org/data.html

  Attempted to load [93mcorpora/wordnet[0m

  Searched in:
    - 'C:\\Users\\ASUS/nltk_data'
    - 'c:\\Users\\ASUS\\AppData\\Local\\Programs\\Python\\Python311\\nltk_data'
    - 'c:\\Users\\ASUS\\AppData\\Local\\Programs\\Python\\Python311\\share\\nltk_data'
    - 'c:\\Users\\ASUS\\AppData\\Local\\Programs\\Python\\Python311\\lib\\nltk_data'
    - 'C:\\Users\\ASUS\\AppData\\Roaming\\nltk_data'
    - 'C:\\nltk_data'
    - 'D:\\nltk_data'
    - 'E:\\nltk_data'
**********************************************************************
