In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision.models import vgg19
import numpy as np
import matplotlib.pyplot as plt
import time
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

Device: cuda


# Perceptual Quality Metrics

In [19]:
class PerceptualMetrics:
    """Industry-standard perceptual quality assessment"""
    
    def __init__(self):
        # Load VGG for perceptual metrics
        vgg = vgg19(pretrained=True).features
        self.vgg_layers = {
            'relu1_1': nn.Sequential(*vgg[:2]),
            'relu2_1': nn.Sequential(*vgg[2:7]),
            'relu3_1': nn.Sequential(*vgg[7:12]),
            'relu4_1': nn.Sequential(*vgg[12:21]),
            'relu5_1': nn.Sequential(*vgg[21:30])
        }
        
        # Freeze VGG
        for layer in self.vgg_layers.values():
            for param in layer.parameters():
                param.requires_grad = False
        
        # Move to device
        for name, layer in self.vgg_layers.items():
            self.vgg_layers[name] = layer.to(device)
        
        # Normalization for VGG
        self.normalize = transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    
    def extract_features(self, image):
        """Extract VGG features for perceptual metrics"""
        if image.dim() == 3:
            image = image.unsqueeze(0)
        
        # Normalize for VGG
        normalized = self.normalize(image)
        
        features = {}
        x = normalized
        
        for name, layer in self.vgg_layers.items():
            x = layer(x)
            features[name] = x
        
        return features
    
    def lpips_distance(self, img1, img2):
        """Learned Perceptual Image Patch Similarity"""
        features1 = self.extract_features(img1)
        features2 = self.extract_features(img2)
        
        lpips_score = 0
        weights = [0.0625, 0.125, 0.25, 0.5, 1.0]  # Standard LPIPS weights
        
        for i, layer in enumerate(['relu1_1', 'relu2_1', 'relu3_1', 'relu4_1', 'relu5_1']):
            # Normalize features
            feat1 = F.normalize(features1[layer], dim=1)
            feat2 = F.normalize(features2[layer], dim=1)
            
            # L2 distance
            diff = (feat1 - feat2) ** 2
            lpips_score += weights[i] * diff.mean()
        
        return lpips_score.item()
    
    def content_similarity(self, stylized, content):
        """Content preservation using relu4_1 features"""
        stylized_features = self.extract_features(stylized)
        content_features = self.extract_features(content)
        
        # Use relu4_1 for content (standard practice)
        stylized_content = stylized_features['relu4_1']
        original_content = content_features['relu4_1']
        
        # Cosine similarity
        similarity = F.cosine_similarity(
            stylized_content.flatten(),
            original_content.flatten(),
            dim=0
        )
        
        return similarity.item()
    
    def style_similarity(self, stylized, style):
        """Style similarity using Gram matrices"""
        stylized_features = self.extract_features(stylized)
        style_features = self.extract_features(style)
        
        def gram_matrix(features):
            b, c, h, w = features.size()
            features = features.view(b, c, h * w)
            gram = torch.bmm(features, features.transpose(1, 2))
            return gram / (c * h * w)
        
        style_similarity_scores = []
        style_layers = ['relu1_1', 'relu2_1', 'relu3_1', 'relu4_1', 'relu5_1']
        
        for layer in style_layers:
            stylized_gram = gram_matrix(stylized_features[layer])
            style_gram = gram_matrix(style_features[layer])
            
            # Cosine similarity between Gram matrices
            similarity = F.cosine_similarity(
                stylized_gram.flatten(),
                style_gram.flatten(),
                dim=0
            )
            style_similarity_scores.append(similarity.item())
        
        return np.mean(style_similarity_scores)


# Traditional Image Quality Metrics

In [20]:
class TraditionalMetrics:
    """Classical image quality metrics"""
    
    def __init__(self):
        pass
    
    def calculate_ssim(self, img1, img2):
        """Structural Similarity Index"""
        # Convert to numpy
        if isinstance(img1, torch.Tensor):
            img1 = img1.squeeze().cpu().numpy()
            if img1.ndim == 3:
                img1 = np.transpose(img1, (1, 2, 0))
        
        if isinstance(img2, torch.Tensor):
            img2 = img2.squeeze().cpu().numpy()
            if img2.ndim == 3:
                img2 = np.transpose(img2, (1, 2, 0))
        
        # Convert to grayscale if RGB
        if img1.ndim == 3:
            img1 = np.dot(img1, [0.299, 0.587, 0.114])
        if img2.ndim == 3:
            img2 = np.dot(img2, [0.299, 0.587, 0.114])
        
        # Calculate SSIM
        ssim_score = ssim(img1, img2, data_range=1.0)
        return ssim_score
    
    def calculate_psnr(self, img1, img2):
        """Peak Signal-to-Noise Ratio"""
        # Convert to numpy
        if isinstance(img1, torch.Tensor):
            img1 = img1.squeeze().cpu().numpy()
            if img1.ndim == 3:
                img1 = np.transpose(img1, (1, 2, 0))
        
        if isinstance(img2, torch.Tensor):
            img2 = img2.squeeze().cpu().numpy()
            if img2.ndim == 3:
                img2 = np.transpose(img2, (1, 2, 0))
        
        psnr_score = psnr(img1, img2, data_range=1.0)
        return psnr_score
    
    def calculate_mse(self, img1, img2):
        """Mean Squared Error"""
        if isinstance(img1, torch.Tensor) and isinstance(img2, torch.Tensor):
            return F.mse_loss(img1, img2).item()
        else:
            return np.mean((img1 - img2) ** 2)

# Industry Benchmark Evaluator

In [21]:
class IndustryBenchmark:
    """Complete industry-standard evaluation"""
    
    def __init__(self):
        self.perceptual_metrics = PerceptualMetrics()
        self.traditional_metrics = TraditionalMetrics()
        
    def evaluate_single_image(self, stylized, content, style):
        """Comprehensive evaluation of single image"""
        
        start_time = time.time()
        
        results = {}
        
        # Ensure all tensors are on the same device
        stylized = stylized.to(device)
        content = content.to(device)
        style = style.to(device)
        
        # Perceptual metrics
        results['lpips_content'] = self.perceptual_metrics.lpips_distance(stylized, content)
        results['lpips_style'] = self.perceptual_metrics.lpips_distance(stylized, style)
        results['content_similarity'] = self.perceptual_metrics.content_similarity(stylized, content)
        results['style_similarity'] = self.perceptual_metrics.style_similarity(stylized, style)
        
        # Traditional metrics (convert to CPU for skimage)
        stylized_cpu = stylized.cpu()
        content_cpu = content.cpu()
        
        results['ssim_content'] = self.traditional_metrics.calculate_ssim(stylized_cpu, content_cpu)
        results['psnr_content'] = self.traditional_metrics.calculate_psnr(stylized_cpu, content_cpu)
        results['mse_content'] = self.traditional_metrics.calculate_mse(stylized, content)
        
        # Composite scores (industry standards)
        results['content_preservation'] = (
            results['content_similarity'] * 0.4 +
            results['ssim_content'] * 0.3 +
            (1.0 - min(results['lpips_content'], 1.0)) * 0.3
        )
        
        results['style_quality'] = (
            results['style_similarity'] * 0.6 +
            (1.0 - min(results['lpips_style'], 1.0)) * 0.4
        )
        
        results['overall_quality'] = (
            results['content_preservation'] * 0.4 +
            results['style_quality'] * 0.6
        )
        
        results['evaluation_time'] = time.time() - start_time
        
        return results
    
    def evaluate_method(self, method_name, model, test_pairs, max_images=10):
        """Evaluate a method on test dataset"""
        
        print(f"\n=== Evaluating {method_name} ===")
        
        model.eval()
        all_results = []
        
        for i, (content, style) in enumerate(test_pairs[:max_images]):
            print(f"Processing image {i+1}/{min(len(test_pairs), max_images)}")
            
            # Generate stylized image
            with torch.no_grad():
                if hasattr(model, 'forward'):
                    stylized = model(content.unsqueeze(0).to(device), style.unsqueeze(0).to(device))
                    stylized = stylized.squeeze(0)
                else:
                    # For simple function-based models
                    stylized = model(content, style)
            
            # Evaluate
            results = self.evaluate_single_image(stylized, content, style)
            results['image_index'] = i
            all_results.append(results)
        
        # Calculate aggregate statistics
        aggregates = self.calculate_aggregates(all_results)
        
        return {
            'method_name': method_name,
            'individual_results': all_results,
            'aggregates': aggregates,
            'num_images': len(all_results)
        }
    
    def calculate_aggregates(self, results):
        """Calculate aggregate statistics"""
        
        metric_names = [
            'content_preservation', 'style_quality', 'overall_quality',
            'content_similarity', 'style_similarity', 'ssim_content',
            'lpips_content', 'lpips_style', 'evaluation_time'
        ]
        
        aggregates = {}
        
        for metric in metric_names:
            values = [r[metric] for r in results if metric in r]
            if values:
                aggregates[metric] = {
                    'mean': np.mean(values),
                    'std': np.std(values),
                    'min': np.min(values),
                    'max': np.max(values),
                    'median': np.median(values)
                }
        
        return aggregates

# Performance Benchmarking

In [22]:
class PerformanceBenchmark:
    """Performance and efficiency metrics"""
    
    def __init__(self):
        pass
    
    def benchmark_speed(self, model, input_size=(1, 3, 256, 256), iterations=50):
        """Benchmark inference speed"""
        
        model.eval()
        
        # Create test inputs
        content = torch.randn(input_size).to(device)
        style = torch.randn(input_size).to(device)
        
        # Warmup
        with torch.no_grad():
            for _ in range(10):
                if hasattr(model, 'forward'):
                    _ = model(content, style)
        
        # Benchmark
        torch.cuda.synchronize() if device.type == 'cuda' else None
        
        times = []
        with torch.no_grad():
            for _ in range(iterations):
                start_time = time.time()
                
                if hasattr(model, 'forward'):
                    output = model(content, style)
                else:
                    output = model(content, style)
                
                torch.cuda.synchronize() if device.type == 'cuda' else None
                times.append(time.time() - start_time)
        
        return {
            'mean_time': np.mean(times),
            'std_time': np.std(times),
            'min_time': np.min(times),
            'max_time': np.max(times),
            'fps': 1.0 / np.mean(times),
            'throughput': iterations / np.sum(times)
        }
    
    def benchmark_memory(self, model, input_size=(1, 3, 256, 256)):
        """Benchmark memory usage"""
        
        if device.type != 'cuda':
            return {'memory_mb': 0, 'peak_memory_mb': 0}
        
        torch.cuda.reset_peak_memory_stats()
        torch.cuda.empty_cache()
        
        content = torch.randn(input_size).to(device)
        style = torch.randn(input_size).to(device)
        
        model.eval()
        with torch.no_grad():
            if hasattr(model, 'forward'):
                _ = model(content, style)
        
        current_memory = torch.cuda.memory_allocated() / (1024 ** 2)
        peak_memory = torch.cuda.max_memory_allocated() / (1024 ** 2)
        
        return {
            'memory_mb': current_memory,
            'peak_memory_mb': peak_memory
        }
    
    def model_complexity(self, model):
        """Calculate model complexity metrics"""
        
        total_params = sum(p.numel() for p in model.parameters())
        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        
        # Estimate model size
        param_size = sum(p.numel() * p.element_size() for p in model.parameters())
        buffer_size = sum(b.numel() * b.element_size() for b in model.buffers())
        model_size_mb = (param_size + buffer_size) / (1024 ** 2)
        
        return {
            'total_params': total_params,
            'trainable_params': trainable_params,
            'model_size_mb': model_size_mb,
            'param_density': trainable_params / total_params if total_params > 0 else 0
        }

# Comparative Analysis

In [23]:
class ComparativeAnalysis:
    """Compare multiple methods using industry standards"""
    
    def __init__(self):
        self.benchmark = IndustryBenchmark()
        self.performance = PerformanceBenchmark()
    
    def compare_methods(self, methods_dict, test_pairs):
        """Compare multiple methods comprehensively"""
        
        print(f"\n=== Comparative Analysis ===")
        print(f"Evaluating {len(methods_dict)} methods on {len(test_pairs)} test cases")
        
        all_results = {}
        
        for method_name, model in methods_dict.items():
            print(f"\nEvaluating {method_name}...")
            
            # Quality evaluation
            quality_results = self.benchmark.evaluate_method(method_name, model, test_pairs)
            
            # Performance evaluation
            speed_results = self.performance.benchmark_speed(model)
            memory_results = self.performance.benchmark_memory(model)
            complexity_results = self.performance.model_complexity(model)
            
            all_results[method_name] = {
                'quality': quality_results,
                'speed': speed_results,
                'memory': memory_results,
                'complexity': complexity_results
            }
        
        # Generate comparison
        comparison = self.generate_comparison_report(all_results)
        
        return all_results, comparison
    
    def generate_comparison_report(self, results):
        """Generate comprehensive comparison report"""
        
        methods = list(results.keys())
        
        report = {
            'methods': methods,
            'winners': {},
            'rankings': {},
            'trade_offs': {}
        }
        
        # Quality rankings
        quality_metrics = ['overall_quality', 'content_preservation', 'style_quality']
        
        for metric in quality_metrics:
            scores = []
            for method in methods:
                if 'quality' in results[method] and 'aggregates' in results[method]['quality']:
                    score = results[method]['quality']['aggregates'].get(metric, {}).get('mean', 0)
                    scores.append({'method': method, 'score': score})
            
            scores.sort(key=lambda x: x['score'], reverse=True)
            report['rankings'][metric] = scores
            if scores:
                report['winners'][metric] = scores[0]['method']
        
        # Performance rankings
        performance_metrics = ['fps', 'memory_mb', 'total_params']
        
        for metric in performance_metrics:
            scores = []
            for method in methods:
                if metric == 'fps':
                    score = results[method]['speed'].get('fps', 0)
                    reverse = True  # Higher FPS is better
                elif metric == 'memory_mb':
                    score = results[method]['memory'].get('peak_memory_mb', float('inf'))
                    reverse = False  # Lower memory is better
                elif metric == 'total_params':
                    score = results[method]['complexity'].get('total_params', float('inf'))
                    reverse = False  # Fewer parameters is better
                
                scores.append({'method': method, 'score': score})
            
            scores.sort(key=lambda x: x['score'], reverse=reverse)
            report['rankings'][metric] = scores
            if scores:
                report['winners'][metric] = scores[0]['method']
        
        # Identify trade-offs
        report['trade_offs'] = self.identify_trade_offs(results)
        
        return report
    
    def identify_trade_offs(self, results):
        """Identify speed vs quality trade-offs"""
        
        trade_offs = {}
        
        for method, result in results.items():
            quality_score = result['quality']['aggregates'].get('overall_quality', {}).get('mean', 0)
            fps = result['speed'].get('fps', 0)
            memory_mb = result['memory'].get('peak_memory_mb', 0)
            
            trade_offs[method] = {
                'quality_score': quality_score,
                'fps': fps,
                'memory_mb': memory_mb,
                'quality_per_fps': quality_score / fps if fps > 0 else 0,
                'efficiency_score': quality_score / (memory_mb + 1)  # +1 to avoid division by zero
            }
        
        return trade_offs

# Industry Standards Validation

In [24]:
def validate_industry_standards(results):
    """Validate against industry benchmarks"""
    
    print(f"\n=== Industry Standards Validation ===")
    
    # Define industry thresholds
    standards = {
        'Adobe_Quality': {
            'overall_quality': 0.75,
            'content_preservation': 0.80,
            'style_quality': 0.70
        },
        'Google_Performance': {
            'fps': 20.0,
            'inference_time_ms': 50.0
        },
        'Apple_Mobile': {
            'model_size_mb': 50.0,
            'memory_mb': 200.0,
            'mobile_fps': 15.0
        },
        'Meta_RealTime': {
            'fps': 30.0,
            'latency_ms': 33.0
        }
    }
    
    validation_results = {}
    
    for method_name, method_results in results.items():
        validation = {}
        
        # Adobe quality standards
        quality = method_results['quality']['aggregates']
        validation['adobe_quality'] = {
            'overall_quality': quality.get('overall_quality', {}).get('mean', 0) >= standards['Adobe_Quality']['overall_quality'],
            'content_preservation': quality.get('content_preservation', {}).get('mean', 0) >= standards['Adobe_Quality']['content_preservation'],
            'style_quality': quality.get('style_quality', {}).get('mean', 0) >= standards['Adobe_Quality']['style_quality']
        }
        
        # Google performance standards
        speed = method_results['speed']
        validation['google_performance'] = {
            'fps': speed.get('fps', 0) >= standards['Google_Performance']['fps'],
            'inference_time': speed.get('mean_time', 1.0) * 1000 <= standards['Google_Performance']['inference_time_ms']
        }
        
        # Apple mobile standards
        memory = method_results['memory']
        complexity = method_results['complexity']
        validation['apple_mobile'] = {
            'model_size': complexity.get('model_size_mb', float('inf')) <= standards['Apple_Mobile']['model_size_mb'],
            'memory_usage': memory.get('peak_memory_mb', float('inf')) <= standards['Apple_Mobile']['memory_mb'],
            'mobile_performance': speed.get('fps', 0) * 0.5 >= standards['Apple_Mobile']['mobile_fps']  # Estimate mobile perf
        }
        
        # Meta real-time standards
        validation['meta_realtime'] = {
            'fps': speed.get('fps', 0) >= standards['Meta_RealTime']['fps'],
            'latency': speed.get('mean_time', 1.0) * 1000 <= standards['Meta_RealTime']['latency_ms']
        }
        
        validation_results[method_name] = validation
    
    # Print validation summary
    for method_name, validation in validation_results.items():
        print(f"\n{method_name} Industry Compliance:")
        
        adobe_pass = all(validation['adobe_quality'].values())
        google_pass = all(validation['google_performance'].values())
        apple_pass = all(validation['apple_mobile'].values())
        meta_pass = all(validation['meta_realtime'].values())
        
        print(f"  Adobe Quality Standards: {'✓' if adobe_pass else '✗'}")
        print(f"  Google Performance Standards: {'✓' if google_pass else '✗'}")
        print(f"  Apple Mobile Standards: {'✓' if apple_pass else '✗'}")
        print(f"  Meta Real-time Standards: {'✓' if meta_pass else '✗'}")
    
    return validation_results

# Helpers

In [25]:
def create_test_dataset(num_pairs=5):
    """Create synthetic test dataset"""
    
    test_pairs = []
    
    for i in range(num_pairs):
        # Create synthetic content and style images
        content = torch.randn(3, 256, 256)
        style = torch.randn(3, 256, 256)
        
        # Add some structure to make it more realistic
        content = torch.clamp(content * 0.5 + 0.5, 0, 1)
        style = torch.clamp(style * 0.5 + 0.5, 0, 1)
        
        test_pairs.append((content, style))
    
    return test_pairs

def create_dummy_models():
    """Create dummy models for demonstration"""
    
    class DummyFastModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.conv = nn.Conv2d(3, 3, 3, 1, 1)
        
        def forward(self, content, style):
            return torch.tanh(self.conv(content))
    
    class DummyQualityModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.conv1 = nn.Conv2d(3, 64, 3, 1, 1)
            self.conv2 = nn.Conv2d(64, 64, 3, 1, 1)
            self.conv3 = nn.Conv2d(64, 3, 3, 1, 1)
        
        def forward(self, content, style):
            x = F.relu(self.conv1(content))
            x = F.relu(self.conv2(x))
            return torch.tanh(self.conv3(x))
    
    return {
        'Fast_Model': DummyFastModel().to(device),
        'Quality_Model': DummyQualityModel().to(device)
    }

# Execute

In [26]:
# Create test data and models
test_pairs = create_test_dataset(num_pairs=8)
models = create_dummy_models()
    
print(f"Created {len(test_pairs)} test pairs and {len(models)} models")

Created 8 test pairs and 2 models


In [27]:
# Run comparative analysis
analyzer = ComparativeAnalysis()
results, comparison = analyzer.compare_methods(models, test_pairs)


=== Comparative Analysis ===
Evaluating 2 methods on 8 test cases

Evaluating Fast_Model...

=== Evaluating Fast_Model ===
Processing image 1/8
Processing image 2/8
Processing image 3/8
Processing image 4/8
Processing image 5/8
Processing image 6/8
Processing image 7/8
Processing image 8/8

Evaluating Quality_Model...

=== Evaluating Quality_Model ===
Processing image 1/8
Processing image 2/8
Processing image 3/8
Processing image 4/8
Processing image 5/8
Processing image 6/8
Processing image 7/8
Processing image 8/8


In [28]:
# Validate against industry standards
validation = validate_industry_standards(results)


=== Industry Standards Validation ===

Fast_Model Industry Compliance:
  Adobe Quality Standards: ✗
  Google Performance Standards: ✓
  Apple Mobile Standards: ✓
  Meta Real-time Standards: ✓

Quality_Model Industry Compliance:
  Adobe Quality Standards: ✗
  Google Performance Standards: ✓
  Apple Mobile Standards: ✓
  Meta Real-time Standards: ✓


In [29]:
for metric, winner in comparison['winners'].items():
    print(f"Best {metric}: {winner}")

Best overall_quality: Fast_Model
Best content_preservation: Fast_Model
Best style_quality: Fast_Model
Best fps: Fast_Model
Best memory_mb: Fast_Model
Best total_params: Fast_Model


In [30]:
trade_offs = comparison['trade_offs']

# Find best overall efficiency
best_efficiency = max(trade_offs.items(), key=lambda x: x[1]['efficiency_score'])
print(f"Most efficient model: {best_efficiency[0]} (Quality/Memory ratio: {best_efficiency[1]['efficiency_score']:.3f})")
    
# Find best quality/speed balance
best_balance = max(trade_offs.items(), key=lambda x: x[1]['quality_per_fps'])
print(f"Best quality/speed balance: {best_balance[0]} (Quality/FPS: {best_balance[1]['quality_per_fps']:.3f})")

Most efficient model: Fast_Model (Quality/Memory ratio: 0.004)
Best quality/speed balance: Quality_Model (Quality/FPS: 0.008)
