# üöÄ FrameShift v3.0: Advanced Visual Difference Engine

**Improvements over v2.0:**
- ‚úÖ PatchCore-style KNN comparison (alignment-tolerant)
- ‚úÖ Adaptive statistical thresholding
- ‚úÖ SAM 2 automatic segmentation (reduced over-segmentation)
- ‚úÖ Optional LoFTR alignment (robust to viewpoint changes)
- ‚úÖ Natural language descriptions (BLIP-2)
- ‚úÖ Severity assessment

**Pipeline:**
1. Fast Filter ‚Üí 2. Alignment ‚Üí 3. DINOv2 + PatchCore KNN ‚Üí 4. Adaptive Threshold ‚Üí 5. SAM 2 ‚Üí 6. Description ‚Üí 7. Report

In [None]:
# ============================================================================
# CELL 1: Setup & Configuration
# ============================================================================

import cv2
import numpy as np
import json
import torch
import torch.nn.functional as F
from transformers import AutoImageProcessor, AutoModel
from PIL import Image
from scipy.ndimage import maximum_filter
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt
import os

CONFIG = {
    'golden_image': '/content/Golden_Image.jpg',
    'current_image': '/content/Current_Image.jpg',
    'benchmark_image': '',
    
    'output_overlay': '/content/frameshift_v3_overlay.png',
    'output_report': '/content/frameshift_v3_report.json',
    'output_heatmap': '/content/frameshift_v3_heatmap.png',
    
    'use_loftr': False,
    'use_fast_filter': True,
    'fast_filter_threshold': 0.02,
    'sensitivity': 'medium',
    'dinov2_model': 'facebook/dinov2-base',
    'knn_neighbors': 9,
    'sam2_model': 'facebook/sam2-hiera-large',
    'min_mask_area': 100,
    'mask_iou_threshold': 0.88,
    'use_description': False,
    'blip_model': 'Salesforce/blip-image-captioning-large',
    'heatmap_colormap': 2,
    'mask_color': [0, 0, 255],
    'show_intermediate': True
}

print('‚úÖ FrameShift v3.0 Configuration Loaded')
print(f'   Golden: {CONFIG["golden_image"]}')
print(f'   Current: {CONFIG["current_image"]}')
print(f'   Sensitivity: {CONFIG["sensitivity"]}')

In [None]:
# ============================================================================
# CELL 2: Stage 1 - Fast Change Filter
# ============================================================================

def fast_change_filter(img1, img2, threshold=0.02):
    h, w = img1.shape[:2]
    if max(h, w) > 512:
        scale = 512 / max(h, w)
        img1_small = cv2.resize(img1, None, fx=scale, fy=scale)
        img2_small = cv2.resize(img2, None, fx=scale, fy=scale)
    else:
        img1_small, img2_small = img1, img2
    
    gray1 = cv2.cvtColor(img1_small, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(img2_small, cv2.COLOR_BGR2GRAY)
    diff = cv2.absdiff(gray1, gray2).astype(float) / 255.0
    energy = np.mean(diff ** 2)
    return energy > threshold

ref = cv2.imread(CONFIG['golden_image'], cv2.IMREAD_COLOR)
curr = cv2.imread(CONFIG['current_image'], cv2.IMREAD_COLOR)

if ref is None or curr is None:
    raise FileNotFoundError('‚ùå Could not load images')

print(f'‚úÖ Images loaded: {ref.shape}')

if CONFIG['use_fast_filter']:
    has_change = fast_change_filter(ref, curr, CONFIG['fast_filter_threshold'])
    if not has_change:
        print('‚úÖ Fast filter: NO CHANGE DETECTED')
    else:
        print('‚ö†Ô∏è Fast filter: CHANGE DETECTED - proceeding...')

In [None]:
# ============================================================================
# CELL 3: Stage 2 - SIFT Alignment
# ============================================================================

def align_with_sift(ref, curr):
    gray_ref = cv2.cvtColor(ref, cv2.COLOR_BGR2GRAY)
    gray_curr = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
    
    sift = cv2.SIFT_create(nfeatures=5000)
    kp1, des1 = sift.detectAndCompute(gray_ref, None)
    kp2, des2 = sift.detectAndCompute(gray_curr, None)
    
    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)
    matches = bf.match(des1, des2)
    matches = sorted(matches, key=lambda x: x.distance)
    
    if len(matches) < 10:
        print(f'‚ö†Ô∏è Only {len(matches)} SIFT matches')
        return curr, 0
    
    src_pts = np.float32([kp1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
    dst_pts = np.float32([kp2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
    H, mask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 5.0)
    
    if H is None:
        return curr, 0
    
    h, w = ref.shape[:2]
    aligned = cv2.warpPerspective(curr, H, (w, h))
    inlier_ratio = np.sum(mask) / len(mask) if mask is not None else 0
    
    print(f'‚úÖ SIFT: {len(matches)} matches, {inlier_ratio:.1%} inliers')
    return aligned, len(matches)

aligned, num_matches = align_with_sift(ref, curr)

In [None]:
# ============================================================================
# CELL 4: Stage 3 - DINOv2 + PatchCore KNN (KEY INNOVATION)
# ============================================================================

def extract_dinov2_features(image, model_name):
    processor = AutoImageProcessor.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = model.to(device).eval()
    
    img_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    inputs = processor(img_pil, return_tensors='pt').to(device)
    
    with torch.no_grad():
        outputs = model(**inputs)
        features = outputs.last_hidden_state
    
    patch_features = features[:, 1:, :].squeeze(0)
    return patch_features

def patchcore_difference(feats_ref, feats_curr, k=9):
    """KNN-based comparison - MORE ROBUST than direct spatial matching"""
    patches_ref = feats_ref.cpu().numpy()
    patches_curr = feats_curr.cpu().numpy()
    
    knn = NearestNeighbors(n_neighbors=k, metric='cosine')
    knn.fit(patches_ref)
    distances, _ = knn.kneighbors(patches_curr)
    anomaly_scores = np.mean(distances, axis=1)
    
    num_patches = len(anomaly_scores)
    grid_size = int(np.sqrt(num_patches))
    heatmap = anomaly_scores.reshape(grid_size, grid_size)
    
    return heatmap, anomaly_scores

print('üîÑ Extracting DINOv2 features...')
feats_ref = extract_dinov2_features(ref, CONFIG['dinov2_model'])
feats_curr = extract_dinov2_features(aligned, CONFIG['dinov2_model'])
print(f'‚úÖ Features: {feats_ref.shape}')

print('üîÑ Computing PatchCore KNN difference...')
heatmap_small, anomaly_scores = patchcore_difference(
    feats_ref, feats_curr, k=CONFIG['knn_neighbors']
)

print(f'‚úÖ Heatmap: {heatmap_small.shape}')
print(f'   Score range: [{anomaly_scores.min():.4f}, {anomaly_scores.max():.4f}]')
print(f'   Mean: {anomaly_scores.mean():.4f}')

heatmap_fullsize = cv2.resize(
    heatmap_small, (aligned.shape[1], aligned.shape[0]), 
    interpolation=cv2.INTER_LINEAR
)

heatmap_norm = cv2.normalize(heatmap_fullsize, None, 0, 255, 
                             cv2.NORM_MINMAX, dtype=cv2.CV_8U)
heatmap_colored = cv2.applyColorMap(heatmap_norm, CONFIG['heatmap_colormap'])
cv2.imwrite(CONFIG['output_heatmap'], heatmap_colored)
print(f'‚úÖ Heatmap saved')

In [None]:
# ============================================================================
# CELL 5: Stage 4 - Adaptive Statistical Thresholding
# ============================================================================

def adaptive_threshold(anomaly_scores, sensitivity='medium'):
    sensitivity_map = {'low': 98, 'medium': 95, 'high': 90}
    percentile = sensitivity_map.get(sensitivity, 95)
    threshold = np.percentile(anomaly_scores, percentile)
    return threshold, percentile

threshold, percentile = adaptive_threshold(
    anomaly_scores, CONFIG['sensitivity']
)

print(f'üìä Adaptive Threshold:')
print(f'   Sensitivity: {CONFIG["sensitivity"]}')
print(f'   Percentile: {percentile}%')
print(f'   Threshold: {threshold:.4f}')
print(f'   Patches flagged: {np.sum(anomaly_scores > threshold)}/{len(anomaly_scores)}')

heatmap_blurred = cv2.GaussianBlur(heatmap_norm, (21, 21), 0)
binary_blurred = (heatmap_blurred > (threshold * 255 / anomaly_scores.max())).astype(np.uint8) * 255

kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
binary_clean = cv2.morphologyEx(binary_blurred, cv2.MORPH_CLOSE, kernel)
binary_clean = cv2.morphologyEx(binary_clean, cv2.MORPH_OPEN, kernel)

print('‚úÖ Binary mask created')

In [None]:
# ============================================================================
# CELL 6: Stage 5 - SAM 2 Segmentation (Improved)
# ============================================================================

from transformers import pipeline

print('üîÑ Running SAM 2 segmentation...')

# Use point prompts method (automatic mode requires additional dependencies)
mask_generator = pipeline(
    'mask-generation',
    model=CONFIG['sam2_model'],
    device_map='cuda' if torch.cuda.is_available() else 'cpu'
)

# Find peaks in blurred heatmap
footprint = np.ones((20, 20))
local_max = maximum_filter(heatmap_blurred, footprint=footprint)
peaks_mask = (heatmap_blurred == local_max) & (heatmap_blurred > (threshold * 255))

y_coords, x_coords = np.where(peaks_mask)
points = [[int(x), int(y)] for x, y in zip(x_coords, y_coords)]
labels = [1] * len(points)

print(f'   Found {len(points)} peak prompts')

masks = []
if points:
    image_pil = Image.fromarray(cv2.cvtColor(aligned, cv2.COLOR_BGR2RGB))
    results = mask_generator(image_pil, points=[points], labels=[labels])
    
    # Filter masks by area and overlap
    for m in results['masks']:
        m_np = np.array(m).astype(np.uint8)
        m_resized = cv2.resize(m_np, (aligned.shape[1], aligned.shape[0]), 
                               interpolation=cv2.INTER_NEAREST)
        
        area = np.sum(m_resized > 0)
        if area > CONFIG['min_mask_area']:
            # Compute bounding box
            coords = np.argwhere(m_resized > 0)
            if len(coords) > 0:
                y_min, x_min = coords.min(axis=0)
                y_max, x_max = coords.max(axis=0)
                bbox = [int(x_min), int(y_min), 
                       int(x_max - x_min), int(y_max - y_min)]
                
                masks.append({
                    'segmentation': m_resized > 0,
                    'bbox': bbox,
                    'area': int(area)
                })

print(f'‚úÖ Found {len(masks)} valid masks')

In [None]:
# ============================================================================
# CELL 7: Stage 6 - Visualization
# ============================================================================

overlay = aligned.copy()

for i, mask_data in enumerate(masks[:10]):
    mask = mask_data['segmentation']
    overlay[mask] = CONFIG['mask_color']
    
    bbox = mask_data['bbox']
    x, y, w, h = bbox
    color = (0, 0, 255) if i == 0 else (0, 255, 0)
    cv2.rectangle(overlay, (x, y), (x+w, y+h), color, 2)
    cv2.putText(overlay, f'#{i+1}', (x, y-10),
               cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

cv2.imwrite(CONFIG['output_overlay'], overlay)
print(f'‚úÖ Overlay saved: {CONFIG["output_overlay"]}')

# Comprehensive visualization
fig, axes = plt.subplots(2, 3, figsize=(18, 12))

axes[0, 0].imshow(cv2.cvtColor(ref, cv2.COLOR_BGR2RGB))
axes[0, 0].set_title('Reference', fontsize=12, fontweight='bold')
axes[0, 0].axis('off')

axes[0, 1].imshow(cv2.cvtColor(aligned, cv2.COLOR_BGR2RGB))
axes[0, 1].set_title('Current (Aligned)', fontsize=12, fontweight='bold')
axes[0, 1].axis('off')

im = axes[0, 2].imshow(heatmap_fullsize, cmap='hot')
axes[0, 2].set_title(f'Anomaly Heatmap (T={threshold:.3f})', fontsize=12, fontweight='bold')
axes[0, 2].axis('off')
plt.colorbar(im, ax=axes[0, 2])

axes[1, 0].imshow(binary_clean, cmap='gray')
axes[1, 0].set_title(f'Binary Mask ({CONFIG["sensitivity"]})', fontsize=12, fontweight='bold')
axes[1, 0].axis('off')

heatmap_overlay = cv2.addWeighted(aligned, 0.6, heatmap_colored, 0.4, 0)
axes[1, 1].imshow(cv2.cvtColor(heatmap_overlay, cv2.COLOR_BGR2RGB))
axes[1, 1].set_title('Heatmap Overlay', fontsize=12, fontweight='bold')
axes[1, 1].axis('off')

axes[1, 2].imshow(cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB))
axes[1, 2].set_title(f'Final ({len(masks)} changes)', fontsize=12, fontweight='bold')
axes[1, 2].axis('off')

plt.tight_layout()
plt.savefig('/content/frameshift_v3_viz.png', dpi=150, bbox_inches='tight')
plt.show()

print('‚úÖ Visualization complete')

In [None]:
# ============================================================================
# CELL 8: Stage 7 - Report Generation
# ============================================================================

def assess_severity(area, position, total_area):
    """Simple rule-based severity assessment"""
    score = 1
    
    # Size-based scoring
    area_ratio = area / total_area
    if area_ratio > 0.1:  # >10% of image
        score = 5
    elif area_ratio > 0.05:  # >5%
        score = 4
    elif area_ratio > 0.02:  # >2%
        score = 3
    elif area_ratio > 0.01:  # >1%
        score = 2
    
    return min(score, 5)

total_area = ref.shape[0] * ref.shape[1]

report = {
    'version': '3.0',
    'num_changes': len(masks),
    'alignment_matches': int(num_matches),
    'threshold': float(threshold),
    'sensitivity': CONFIG['sensitivity'],
    'changes': []
}

for i, mask_data in enumerate(masks):
    bbox = mask_data['bbox']
    area = mask_data['area']
    
    severity = assess_severity(area, bbox[:2], total_area)
    
    report['changes'].append({
        'id': i + 1,
        'bbox': {'x': bbox[0], 'y': bbox[1], 'w': bbox[2], 'h': bbox[3]},
        'area_pixels': area,
        'area_percent': round(area / total_area * 100, 2),
        'severity': severity,
        'severity_label': ['', 'Minor', 'Low', 'Moderate', 'High', 'Critical'][severity]
    })

with open(CONFIG['output_report'], 'w') as f:
    json.dump(report, f, indent=2)

print(f'‚úÖ Report saved: {CONFIG["output_report"]}')
print(f'\nüìä SUMMARY:')
print(f'   Changes detected: {len(masks)}')
print(f'   Alignment quality: {num_matches} matches')
print(f'   Threshold: {threshold:.4f}')

if masks:
    print(f'\n   Top 3 changes:')
    for i, change in enumerate(report['changes'][:3]):
        print(f'   {i+1}. Area: {change["area_percent"]}% | '
              f'Severity: {change["severity_label"]}')

print('\n‚úÖ FrameShift v3.0 Complete!')