<a href="https://colab.research.google.com/github/ashwin-yedte/visual-intelligence-travel-finance/blob/main/notebooks/Visual%20Intelligence%20Layer/image_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**VLM INTELLIGENCE LAYER **- STEP 1: IMAGE ANALYSIS
Comprehensive image validation, preprocessing, and CLIP embedding extraction

Features:
- Strict validation (size, format, integrity)
- EXIF orientation fixing
- Aspect ratio preservation
- Color statistics extraction
- Error handling with user-friendly messages

# =================================================================
Step 1: CONFIGURATION
# =================================================================


In [None]:
print("="*80)
print("STEP 1: IMAGE ANALYSIS - VLM INTELLIGENCE LAYER")
print("="*80)

class Config:
    """Centralized configuration for image analysis"""

    # Model configuration
    CLIP_MODEL_NAME = "openai/clip-vit-base-patch32"
    EMBEDDING_DIMENSION = 512

    # Image validation
    TARGET_IMAGE_SIZE = (224, 224)
    MAX_IMAGE_SIZE_MB = 10.0
    SUPPORTED_FORMATS = ['jpg', 'jpeg', 'png']
    MIN_DIMENSION = 100
    MAX_DIMENSION = 4000

    # Batch processing
    MIN_IMAGES = 1
    MAX_IMAGES = 5

    # Color analysis
    NUM_DOMINANT_COLORS = 3
    COLOR_SAMPLE_SIZE = 10000

    # Output
    OUTPUT_FILE = "step1_user_analysis.json"

print("Configuration loaded")
print("="*80)

STEP 1: IMAGE ANALYSIS - VLM INTELLIGENCE LAYER
Configuration loaded


# =================================================================
Step 2: IMAGE VALIDATION AND PREPROCESSING CLASS
# =================================================================


In [None]:
import io
import numpy as np
from PIL import Image, ImageOps
from typing import Dict, Any, List, Tuple
from sklearn.cluster import KMeans

class ImageValidator:
    """
    Comprehensive image validation with detailed error reporting.
    Returns user-friendly error messages for frontend display.
    """

    def __init__(self):
        self.max_size_mb = Config.MAX_IMAGE_SIZE_MB
        self.supported_formats = Config.SUPPORTED_FORMATS
        self.min_dimension = Config.MIN_DIMENSION
        self.max_dimension = Config.MAX_DIMENSION

    def validate_image(self, image_bytes: bytes, filename: str) -> Dict[str, Any]:
        """
        Comprehensive validation with user-friendly error messages.

        Returns:
            Dictionary with validation results
        """

        # Check 1: File size
        size_mb = len(image_bytes) / (1024 * 1024)

        if size_mb > self.max_size_mb:
            return {
                'valid': False,
                'error': "Image is too large. Maximum allowed is 10MB. Please compress or resize the image.",
                'error_code': 'FILE_TOO_LARGE',
                'size_mb': size_mb,
                'format': None,
                'dimensions': None
            }

        if size_mb == 0:
            return {
                'valid': False,
                'error': "Image appears to be empty. Please select a valid image file.",
                'error_code': 'FILE_EMPTY',
                'size_mb': 0,
                'format': None,
                'dimensions': None
            }

        try:
            # Attempt to open image
            img = Image.open(io.BytesIO(image_bytes))

            # Check 2: Format validation
            img_format = img.format.lower() if img.format else 'unknown'

            if img_format not in self.supported_formats:
                return {
                    'valid': False,
                    'error': "Unsupported format. Please upload JPG or PNG images only.",
                    'error_code': 'UNSUPPORTED_FORMAT',
                    'size_mb': size_mb,
                    'format': img_format,
                    'dimensions': None
                }

            # Check 3: Dimensions validation
            width, height = img.size

            if width < self.min_dimension or height < self.min_dimension:
                return {
                    'valid': False,
                    'error': "Image is too small. Minimum size is 100x100 pixels.",
                    'error_code': 'IMAGE_TOO_SMALL',
                    'size_mb': size_mb,
                    'format': img_format,
                    'dimensions': (width, height)
                }

            if width > self.max_dimension or height > self.max_dimension:
                return {
                    'valid': False,
                    'error': "Image is too large. Maximum size is 4000x4000 pixels. Please resize.",
                    'error_code': 'IMAGE_TOO_LARGE',
                    'size_mb': size_mb,
                    'format': img_format,
                    'dimensions': (width, height)
                }

            # Check 4: Image integrity
            img.verify()

            # Re-open after verify
            img = Image.open(io.BytesIO(image_bytes))

            # Try to load pixel data
            try:
                img.load()
            except Exception as e:
                return {
                    'valid': False,
                    'error': "Image appears to be corrupted. Please try a different image.",
                    'error_code': 'IMAGE_CORRUPTED',
                    'size_mb': size_mb,
                    'format': img_format,
                    'dimensions': (width, height)
                }

            # All checks passed
            return {
                'valid': True,
                'error': None,
                'error_code': None,
                'size_mb': size_mb,
                'format': img_format,
                'dimensions': (width, height)
            }

        except IOError:
            return {
                'valid': False,
                'error': "Unable to read file. The file may be corrupted or not a valid image.",
                'error_code': 'INVALID_IMAGE_FILE',
                'size_mb': size_mb,
                'format': None,
                'dimensions': None
            }

        except Exception as e:
            return {
                'valid': False,
                'error': "Error processing image: " + str(e),
                'error_code': 'PROCESSING_ERROR',
                'size_mb': size_mb,
                'format': None,
                'dimensions': None
            }


In [None]:
class ImagePreprocessor:
    """
    CLIP-optimized image preprocessing with comprehensive transformations.
    """

    def __init__(self):
        self.target_size = Config.TARGET_IMAGE_SIZE
        self.validator = ImageValidator()

    def preprocess_image(self, image_bytes: bytes) -> Image.Image:
        """
        Complete preprocessing pipeline for CLIP.

        Returns:
            PIL Image ready for CLIP (224x224, RGB)
        """

        # Load image
        img = Image.open(io.BytesIO(image_bytes))

        # Fix orientation from EXIF
        img = self._fix_orientation(img)

        # Convert to RGB
        if img.mode != 'RGB':
            if img.mode == 'RGBA':
                # Handle transparency
                background = Image.new('RGB', img.size, (255, 255, 255))
                background.paste(img, mask=img.split()[3] if len(img.split()) == 4 else None)
                img = background
            else:
                img = img.convert('RGB')

        # Resize with padding
        img = self._resize_with_padding(img, self.target_size)

        return img

    def _fix_orientation(self, img: Image.Image) -> Image.Image:
        """Auto-rotate image based on EXIF orientation tag."""
        try:
            img = ImageOps.exif_transpose(img)
        except Exception:
            pass
        return img

    def _resize_with_padding(self, img: Image.Image, target_size: Tuple[int, int]) -> Image.Image:
        """Resize maintaining aspect ratio, add white padding."""

        # Calculate aspect ratios
        img_ratio = img.width / img.height
        target_ratio = target_size[0] / target_size[1]

        # Determine new dimensions
        if img_ratio > target_ratio:
            new_width = target_size[0]
            new_height = int(new_width / img_ratio)
        else:
            new_height = target_size[1]
            new_width = int(new_height * img_ratio)

        # Resize using LANCZOS filter
        img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)

        # Create white canvas
        canvas = Image.new('RGB', target_size, (255, 255, 255))

        # Center image
        offset_x = (target_size[0] - new_width) // 2
        offset_y = (target_size[1] - new_height) // 2
        canvas.paste(img, (offset_x, offset_y))

        return canvas

    def extract_color_statistics(self, img: Image.Image) -> Dict[str, Any]:
        """Extract color features for analysis."""

        img_array = np.array(img)

        return {
            'dominant_colors': self._get_dominant_colors(img_array),
            'brightness': float(np.mean(img_array)),
            'color_variance': float(np.var(img_array))
        }

    def _get_dominant_colors(self, img_array: np.ndarray) -> List[List[int]]:
        """Use K-means clustering to find dominant colors."""

        # Reshape to list of pixels
        pixels = img_array.reshape(-1, 3)

        # Sample if too many pixels
        if len(pixels) > Config.COLOR_SAMPLE_SIZE:
            indices = np.random.choice(len(pixels), Config.COLOR_SAMPLE_SIZE, replace=False)
            pixels = pixels[indices]

        # K-means clustering
        kmeans = KMeans(n_clusters=Config.NUM_DOMINANT_COLORS, random_state=42, n_init=10)
        kmeans.fit(pixels)

        # Get cluster centers and counts
        colors = kmeans.cluster_centers_.astype(int)
        counts = np.bincount(kmeans.labels_)

        # Sort by frequency
        sorted_indices = np.argsort(-counts)
        dominant_colors = colors[sorted_indices].tolist()

        return dominant_colors


print("Validation and Preprocessing classes loaded")
print("="*80)

Validation and Preprocessing classes loaded


# =================================================================
Step 3: INITIALIZE PREPROCESSOR
# =================================================================


In [None]:
print("\nInitializing preprocessor...")
preprocessor = ImagePreprocessor()
print("Preprocessor ready")
print("="*80)



Initializing preprocessor...
Preprocessor ready


# =================================================================
Step 4: LOAD CLIP MODEL
# =================================================================


In [None]:
print("\n" + "="*80)
print("LOADING CLIP MODEL")
print("="*80)

import torch
from transformers import CLIPModel, CLIPProcessor

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device: " + device)

model = CLIPModel.from_pretrained(Config.CLIP_MODEL_NAME)
processor = CLIPProcessor.from_pretrained(Config.CLIP_MODEL_NAME)

model.to(device)
model.eval()

print("Model loaded: " + Config.CLIP_MODEL_NAME)
print("Embedding dimension: " + str(Config.EMBEDDING_DIMENSION))
print("="*80)



LOADING CLIP MODEL
Device: cpu


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/605M [00:00<?, ?B/s]

Loading weights:   0%|          | 0/398 [00:00<?, ?it/s]

CLIPModel LOAD REPORT from: openai/clip-vit-base-patch32
Key                                  | Status     |  | 
-------------------------------------+------------+--+-
text_model.embeddings.position_ids   | UNEXPECTED |  | 
vision_model.embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

The image processor of type `CLIPImageProcessor` is now loaded as a fast processor by default, even if the model checkpoint was saved with a slow processor. This is a breaking change and may produce slightly different outputs. To continue using the slow processor, instantiate this class with `use_fast=False`. 


tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

Model loaded: openai/clip-vit-base-patch32
Embedding dimension: 512


# =================================================================
Step 5: CLIP EMBEDDING EXTRACTION
# =================================================================


In [None]:
def extract_clip_features(outputs):
    """Universal tensor extraction from CLIP outputs."""
    if torch.is_tensor(outputs):
        return outputs

    if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
        return outputs.pooler_output

    if hasattr(outputs, 'image_embeds') and outputs.image_embeds is not None:
        return outputs.image_embeds

    if hasattr(outputs, 'last_hidden_state') and outputs.last_hidden_state is not None:
        return outputs.last_hidden_state[:, 0, :]

    raise ValueError("Cannot extract features from output type: " + str(type(outputs)))


def extract_clip_embedding(image: Image.Image) -> np.ndarray:
    """
    Extract CLIP embedding from preprocessed image.

    Returns:
        Normalized 512-dim embedding as numpy array
    """

    try:
        # Process with CLIP
        inputs = processor(images=image, return_tensors="pt", padding=True)
        inputs = {k: v.to(device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = model.get_image_features(**inputs)
            image_features = extract_clip_features(outputs)
            # L2 normalization
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)

        return image_features.cpu().numpy()[0]

    except Exception as e:
        raise Exception("CLIP embedding extraction failed: " + str(e))


print("CLIP extraction functions ready")
print("="*80)


CLIP extraction functions ready


# =================================================================
Step 6: MAIN ANALYSIS FUNCTION
# =================================================================


In [None]:
def analyze_user_images(files_dict: Dict[str, bytes]) -> Dict[str, Any]:
    """
    Complete Step 1: Validate, preprocess, and extract embeddings.

    Args:
        files_dict: Dictionary mapping filenames to image bytes

    Returns:
        Dictionary with status, results, embeddings, and summary
    """

    print("\n" + "="*80)
    print("ANALYZING " + str(len(files_dict)) + " USER IMAGES")
    print("="*80)

    # Check batch size
    if len(files_dict) < Config.MIN_IMAGES:
        return {
            'status': 'error',
            'error': "Please upload at least " + str(Config.MIN_IMAGES) + " image(s).",
            'error_code': 'TOO_FEW_IMAGES'
        }

    if len(files_dict) > Config.MAX_IMAGES:
        return {
            'status': 'error',
            'error': "Maximum " + str(Config.MAX_IMAGES) + " images allowed. You uploaded " + str(len(files_dict)) + ".",
            'error_code': 'TOO_MANY_IMAGES'
        }

    validation_errors = []
    results = []
    embeddings = []

    for filename, image_bytes in files_dict.items():
        print("\nProcessing: " + filename)

        # Step 1: Validate
        validation = preprocessor.validator.validate_image(image_bytes, filename)

        if not validation['valid']:
            validation_errors.append({
                'filename': filename,
                'error': validation['error'],
                'error_code': validation['error_code']
            })
            print("  Validation failed: " + validation['error'])
            continue

        print("  Validated (" + validation['format'].upper() + ", " +
              str(validation['dimensions'][0]) + "x" + str(validation['dimensions'][1]) +
              ", " + str(round(validation['size_mb'], 2)) + "MB)")

        try:
            # Step 2: Preprocess
            processed_img = preprocessor.preprocess_image(image_bytes)
            print("  Preprocessed to " + str(processed_img.size))

            # Step 3: Extract colors
            color_stats = preprocessor.extract_color_statistics(processed_img)
            print("  Color analysis complete")

            # Step 4: Extract CLIP embedding
            embedding = extract_clip_embedding(processed_img)
            print("  CLIP embedding extracted (" + str(embedding.shape) + ")")

            # Store results
            results.append({
                'filename': filename,
                'original_dimensions': validation['dimensions'],
                'file_size_mb': validation['size_mb'],
                'format': validation['format'],
                'color_statistics': color_stats,
                'embedding_shape': embedding.shape
            })

            embeddings.append(embedding)

        except Exception as e:
            validation_errors.append({
                'filename': filename,
                'error': "Processing failed: " + str(e),
                'error_code': 'PROCESSING_FAILED'
            })
            print("  Processing error: " + str(e))

    # Check if we have any successful results
    if len(embeddings) == 0:
        return {
            'status': 'error',
            'error': 'All images failed validation or processing. Please check the error messages and try again.',
            'error_code': 'ALL_IMAGES_FAILED',
            'validation_errors': validation_errors
        }

    # Calculate summary statistics
    avg_brightness = np.mean([r['color_statistics']['brightness'] for r in results])

    print("\n" + "="*80)
    print("STEP 1 COMPLETE")
    print("="*80)
    print("Successfully processed: " + str(len(embeddings)) + "/" + str(len(files_dict)) + " images")
    if validation_errors:
        print("Failed: " + str(len(validation_errors)) + " images")
    print("Average brightness: " + str(round(avg_brightness, 1)))
    print("="*80)

    return {
        'status': 'success' if len(embeddings) > 0 else 'partial',
        'num_uploaded': len(files_dict),
        'num_processed': len(embeddings),
        'num_failed': len(validation_errors),
        'validation_errors': validation_errors,
        'results': results,
        'embeddings': embeddings,
        'summary': {
            'avg_brightness': float(avg_brightness),
            'total_images': len(embeddings)
        }
    }


print("Main analysis function ready")
print("="*80)

Main analysis function ready


# =================================================================
Step 7: SAVE EMBEDDINGS FOR NEXT STEPS
# =================================================================


In [None]:
def save_step1_outputs(analysis_output: Dict) -> None:
    """
    Save Step 1 outputs for use in Steps 2 and 3.

    Saves:
    - user_embeddings.npy: Individual embeddings
    - step1_analysis.json: Full analysis results
    """

    if analysis_output['status'] != 'success':
        print("Cannot save - analysis did not succeed")
        return

    # Save embeddings as numpy array
    embeddings_array = np.array(analysis_output['embeddings'])
    np.save('/content/user_embeddings.npy', embeddings_array)
    print("\nSaved embeddings: " + str(embeddings_array.shape))

    # Save full analysis
    import json
    analysis_json = {
        'status': analysis_output['status'],
        'num_processed': analysis_output['num_processed'],
        'results': analysis_output['results'],
        'summary': analysis_output['summary'],
        'validation_errors': analysis_output['validation_errors']
    }

    with open('/content/' + Config.OUTPUT_FILE, 'w') as f:
        json.dump(analysis_json, f, indent=2)

    print("Saved analysis: " + Config.OUTPUT_FILE)
    print("\nReady for Step 2: Destination Matching")


print("Save functions ready")
print("="*80)
print("\nSTEP 1 INITIALIZED - Ready to analyze images")
print("="*80)

Save functions ready

STEP 1 INITIALIZED - Ready to analyze images
