In [1]:
!pip install -q torch torchvision torchaudio transformers
!pip install -q openai-whisper pillow opencv-python scikit-image

!pip install -q git+https://github.com/facebookresearch/ImageBind.git

  Preparing metadata (setup.py) ... [?25l[?25hdone


In [2]:
!pip install torchcodec




In [3]:
import torch
import numpy as np
from PIL import Image
from typing import Dict, List, Tuple, Optional
import warnings
import torchcodec
from imagebind import data
from imagebind.models import imagebind_model
from imagebind.models.imagebind_model import ModalityType
import tempfile
import whisper
import cv2
from transformers import CLIPModel, CLIPProcessor

warnings.filterwarnings('ignore')

In [4]:
class ImageBindEvaluator:

    def __init__(self, device=None):
        print("Loading ImageBind")

        try:

            self.data = data
            self.ModalityType = ModalityType

            if device is None:
                device = 'cuda' if torch.cuda.is_available() else 'cpu'
            self.device = device

            self.model = imagebind_model.imagebind_huge(pretrained=True)
            self.model.eval()
            self.model.to(self.device)

            print(f"ImageBind loaded on {self.device}")

            if self.device == 'cuda':
                mem = torch.cuda.memory_allocated() / 1024**3
                print(f"   GPU Memory: {mem:.2f} GB")

        except Exception as e:
            print(f" ImageBind loading failed: {e}")

            raise

    def compute_audio_image_similarity(
        self,
        audio_path: str,
        image: Image.Image
    ) -> float:

        with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp:
            image.save(tmp.name)
            img_path = tmp.name

        try:
            inputs = {
                self.ModalityType.AUDIO: self.data.load_and_transform_audio_data(
                    [audio_path], self.device
                ),
                self.ModalityType.VISION: self.data.load_and_transform_vision_data(
                    [img_path], self.device
                ),
            }


            with torch.no_grad():
                embeddings = self.model(inputs)


            audio_emb = embeddings[self.ModalityType.AUDIO]
            vision_emb = embeddings[self.ModalityType.VISION]

            audio_emb = audio_emb / audio_emb.norm(dim=-1, keepdim=True)
            vision_emb = vision_emb / vision_emb.norm(dim=-1, keepdim=True)

            # Cosine similarity
            similarity = (audio_emb @ vision_emb.T).squeeze().item()


            if self.device == 'cuda':
                torch.cuda.empty_cache()


            normalized = (similarity + 0.3) / 0.8
            normalized = max(0.0, min(1.0, normalized))
            print(float(normalized))
            return float(normalized)

        finally:

            import os
            os.unlink(img_path)


class WhisperCLIPEvaluator:

    def __init__(
        self,
        whisper_model_size: str = 'base',
        clip_model_name: str = 'openai/clip-vit-base-patch32',
        device: str = None
    ):

        if device is None:
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.device = device

        print("Loading Whisper + CLIP...")
        self.whisper_model = whisper.load_model(whisper_model_size, device=device)
        print("Whisper ready")

        self.clip_model = CLIPModel.from_pretrained(clip_model_name).to(device)
        self.clip_processor = CLIPProcessor.from_pretrained(clip_model_name)
        self.clip_model.eval()
        print(" CLIP ready")


        if device == 'cuda':
            mem = torch.cuda.memory_allocated() / 1024**3

    def transcribe_audio(self, audio_path: str) -> str:
        result = self.whisper_model.transcribe(
            audio_path,
            language='ru',
            fp16=(self.device == 'cuda')
        )
        return result["text"].strip()

    def compute_text_image_similarity(
        self,
        text: str,
        image: Image.Image
    ) -> float:

        inputs = self.clip_processor(
            text=[text],
            images=[image],
            return_tensors="pt",
            padding=True
        ).to(self.device)

        # Embeddings
        with torch.no_grad():
            outputs = self.clip_model(**inputs)

            text_embeds = outputs.text_embeds
            image_embeds = outputs.image_embeds

            # Normalize
            text_embeds = text_embeds / text_embeds.norm(dim=-1, keepdim=True)
            image_embeds = image_embeds / image_embeds.norm(dim=-1, keepdim=True)

            # Cosine similarity
            similarity = (text_embeds @ image_embeds.T).squeeze().item()


        if self.device == 'cuda':
            torch.cuda.empty_cache()

        return float(similarity)

    def compute_audio_image_similarity(
        self,
        audio_path: str,
        image: Image.Image,
        verbose: bool = False
    ) -> Tuple[float, str]:

        if verbose:
            print(f"Transcribing audio...", end=" ")

        text = self.transcribe_audio(audio_path)

        if verbose:
            print(f'"{text}"')

        similarity = self.compute_text_image_similarity(text, image)

        if verbose:
            print(f"{similarity:.4f}")

        return similarity, text

class PerceptualQualityEvaluator:

    def __init__(self):
        pass

    def compute_sharpness(self, image: Image.Image) -> float:

        # PIL -> grayscale numpy
        img_array = np.array(image.convert('L')).astype(np.uint8)

        # Laplacian
        laplacian = cv2.Laplacian(img_array, cv2.CV_64F)
        variance = laplacian.var()

        # Normalizing
        normalized = min(1.0, variance / 500.0)

        return float(normalized)

    def compute_contrast(self, image: Image.Image) -> float:
        img_array = np.array(image.convert('L')).astype(np.float32)

        # RMS contrast
        contrast = img_array.std() / 128.0
        contrast = min(1.0, contrast)

        return float(contrast)

    def compute_color_diversity(self, image: Image.Image) -> float:

        if image.mode != 'RGB':
            image = image.convert('RGB')

        img_array = np.array(image).astype(np.float32)


        std_per_channel = img_array.std(axis=(0, 1))
        avg_std = std_per_channel.mean()

        # Normalizing
        normalized = min(1.0, avg_std / 80.0)

        return float(normalized)

    def compute_brightness(self, image: Image.Image) -> float:
        img_array = np.array(image.convert('L')).astype(np.float32)
        mean_brightness = img_array.mean()

        deviation = abs(mean_brightness - 128) / 128.0
        score = 1.0 - deviation

        return float(max(0.0, score))

    def evaluate(self, image: Image.Image) -> Dict[str, float]:

        scores = {
            'sharpness': self.compute_sharpness(image),
            'contrast': self.compute_contrast(image),
            'color_diversity': self.compute_color_diversity(image),
            'brightness': self.compute_brightness(image)
        }

        # Combined quality score
        scores['quality_score'] = (
            scores['sharpness'] * 0.35 +
            scores['contrast'] * 0.25 +
            scores['color_diversity'] * 0.25 +
            scores['brightness'] * 0.15
        )

        return scores


class EditConsistencyEvaluator:
    def __init__(self):
        pass

    def compute_change_magnitude(
        self,
        original: Image.Image,
        edited: Image.Image
    ) -> float:

        # Resize
        if original.size != edited.size:
            edited = edited.resize(original.size, Image.LANCZOS)

        # Ensure RGB
        if original.mode != 'RGB':
            original = original.convert('RGB')
        if edited.mode != 'RGB':
            edited = edited.convert('RGB')

        orig_array = np.array(original).astype(np.float32)
        edit_array = np.array(edited).astype(np.float32)

        # MSE
        mse = np.mean((orig_array - edit_array) ** 2)

        # Normalizing
        normalized = min(1.0, mse / 10000.0)

        return float(normalized)

    def compute_structural_preservation(
        self,
        original: Image.Image,
        edited: Image.Image
    ) -> float:
        from skimage.metrics import structural_similarity as ssim

        if original.size != edited.size:
            edited = edited.resize(original.size, Image.LANCZOS)

        # Ensure RGB
        if original.mode != 'RGB':
            original = original.convert('RGB')
        if edited.mode != 'RGB':
            edited = edited.convert('RGB')

        orig_array = np.array(original).astype(np.uint8)
        edit_array = np.array(edited).astype(np.uint8)

        try:
            score = ssim(orig_array, edit_array, channel_axis=2, data_range=255)
        except Exception as e:
            print(f" error: {e}, returning 0.5")
            score = 0.5

        return float(score)

    def evaluate(
        self,
        original: Image.Image,
        edited: Image.Image
    ) -> Dict[str, float]:
        change_mag = self.compute_change_magnitude(original, edited)
        struct_pres = self.compute_structural_preservation(original, edited)

        change_penalty = 1.0 if change_mag > 0.1 else change_mag * 10

        struct_penalty = struct_pres

        consistency = (change_penalty * 0.3 + struct_penalty * 0.7)

        return {
            'change_magnitude': change_mag,
            'structural_preservation': struct_pres,
            'consistency_score': float(consistency)
        }


class BlindEvaluator:
    def __init__(
        self,
        alignment_method: str = 'imagebind',  # 'imagebind' или 'whisper_clip'
        weights: Optional[Dict[str, float]] = None,
        device: str = None
    ):

        print(f"Alignment method: {alignment_method}")
        print()

        # Default weights
        if weights is None:
            weights = {
                'alignment': 0.50,      # Главное - инструкция выполнена
                'consistency': 0.35,    # Изменилось только нужное
                'quality': 0.15        # Без явного брака
            }
        self.weights = weights


        # Audio-Image Alignment
        if alignment_method == 'imagebind':
            try:
                self.alignment_evaluator = ImageBindEvaluator(device=device)
                self.alignment_method = 'imagebind'
            except Exception as e:
                print(f"ImageBind failed, falling back to Whisper+CLIP")
                self.alignment_evaluator = WhisperCLIPEvaluator(device=device)
                self.alignment_method = 'whisper_clip'
        else:
            self.alignment_evaluator = WhisperCLIPEvaluator(device=device)
            self.alignment_method = 'whisper_clip'


        self.quality_evaluator = PerceptualQualityEvaluator()
        print("Quality Evaluator ready")

        # 3. Edit Consistency
        self.consistency_evaluator = EditConsistencyEvaluator()
        print("Consistency Evaluator ready")


        print(" Evaluator initialized!")


    def evaluate_single(
        self,
        original_image: Image.Image,
        edited_image: Image.Image,
        audio_path: str,
        verbose: bool = False
    ) -> Dict[str, any]:

        if verbose:
            print("Evaluating...")

        results = {}

        # 1. Audio-Image Alignment
        if verbose:
            print("  1. Audio-Image Alignment...", end=" ")

        if self.alignment_method == 'imagebind':
            alignment_score = self.alignment_evaluator.compute_audio_image_similarity(
                audio_path, edited_image
            )
            results['alignment_score'] = alignment_score
            results['transcribed_text'] = None
        else:  # whisper_clip
            alignment_score, transcribed = self.alignment_evaluator.compute_audio_image_similarity(
                audio_path, edited_image, verbose=False
            )
            results['alignment_score'] = alignment_score
            results['transcribed_text'] = transcribed

        if verbose:
            print(f"{alignment_score:.4f}")
            if results['transcribed_text']:
                print(f"Transcription: \"{results['transcribed_text']}\"")

        # 2. Perceptual Quality
        if verbose:
            print(" 2. Perceptual Quality...", end=" ")

        quality_scores = self.quality_evaluator.evaluate(edited_image)
        results['quality_scores'] = quality_scores
        results['quality_score'] = quality_scores['quality_score']

        if verbose:
            print(f"{results['quality_score']:.4f}")

        # 3. Edit Consistency
        if verbose:
            print("  3. Edit Consistency...", end=" ")

        consistency_scores = self.consistency_evaluator.evaluate(
            original_image, edited_image
        )
        results['consistency_scores'] = consistency_scores
        results['consistency_score'] = consistency_scores['consistency_score']

        if verbose:
            print(f"{results['consistency_score']:.4f}")

        # Combined Score
        combined = (
            results['alignment_score'] * self.weights['alignment'] +
            results['quality_score'] * self.weights['quality'] +
            results['consistency_score'] * self.weights['consistency']
        )

        results['combined_score'] = float(combined)

        if verbose:
            print(f"\n  → Combined Score: {combined:.4f}")

        return results

    def evaluate_batch(
        self,
        original_image: Image.Image,
        edited_images: List[Image.Image],
        audio_path: str,
        model_names: Optional[List[str]] = None,
        verbose: bool = True
    ) -> List[Dict[str, any]]:
        if model_names is None:
            model_names = [f"Model_{i+1}" for i in range(len(edited_images))]

        results = []

        for i, (edited_img, model_name) in enumerate(zip(edited_images, model_names)):
            if verbose:
                print(f"\n[{i+1}/{len(edited_images)}] {model_name}:")

            scores = self.evaluate_single(
                original_image, edited_img, audio_path, verbose=verbose
            )

            scores['model_name'] = model_name
            scores['model_index'] = i

            results.append(scores)

        return results

    def select_best(
        self,
        original_image: Image.Image,
        edited_images: List[Image.Image],
        audio_path: str,
        model_names: Optional[List[str]] = None
    ) -> Tuple[int, Dict[str, any]]:
        results = self.evaluate_batch(
            original_image, edited_images, audio_path, model_names
        )

        best_idx = max(range(len(results)),
                      key=lambda i: results[i]['combined_score'])

        return best_idx, results[best_idx]



In [5]:
evaluator = BlindEvaluator(
    weights={
        'alignment': 0.40,
        'consistency': 0.50,
        'quality': 0.10
    }
)


Alignment method: imagebind

Loading ImageBind
ImageBind loaded on cpu
Quality Evaluator ready
Consistency Evaluator ready
 Evaluator initialized!


In [6]:
original = Image.open('/content/original.jpg')
edited1 = Image.open('/content/ground_truth.jpg')
edited2 = Image.open('/content/model_output.jpg')
audio = '/content/test222.wav'

results = evaluator.evaluate_batch(
    original,
    [edited1, edited2],
    audio,
    model_names=['Model1', 'Model2'],
    verbose=True
)



[1/2] Model1:
Evaluating...
  1. Audio-Image Alignment... 0.276425614953041
0.2764
 2. Perceptual Quality... 0.7430
  3. Edit Consistency... 0.6767

  → Combined Score: 0.5232

[2/2] Model2:
Evaluating...
  1. Audio-Image Alignment... 0.2990792170166969
0.2991
 2. Perceptual Quality... 0.9141
  3. Edit Consistency... 0.7274

  → Combined Score: 0.5748
