# Threshold Optimization on Real VLM Data

**Goal:** Find optimal IoU thresholds using real Qwen3-VL predictions under perturbation.

**Approach:** Information-theoretic optimization
$$t^* = \arg\max_t I(C_t(f(x)); S(x))$$

Where:
- $C_t$ = behavioral classes using thresholds $t$
- $S(x)$ = signal (perturbation magnitude OR embedding distance)

**Data:** RefCOCO small subset (51 samples × 25 perturbations = 1,275 data points)

**Expected runtime:** ~60 minutes (Qwen3-VL 8b)

In [1]:
# Fix imports after reorganization
import sys
sys.path.insert(0, 'scripts')

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import optimize
from sklearn.metrics import mutual_info_score
from PIL import Image, ImageEnhance, ImageFilter
import ollama_proxy as ollama
import io
import base64
import re
from tqdm.auto import tqdm
import time

from refcoco_loader import load_refcoco, get_sample_info

sns.set_style('whitegrid')
np.random.seed(42)

## 1. Load RefCOCO Small Subset

Load the 51 samples saved from notebook 01.

In [2]:
# Load small subset indices
small_indices = np.load('small_subset_indices.npy')
print(f"Loading {len(small_indices)} samples from RefCOCO...")

# Load full validation set
dataset = load_refcoco('val')

# Extract subset
samples = [dataset[int(idx)] for idx in small_indices]

print(f"\nLoaded {len(samples)} samples for threshold optimization")
print(f"First sample:")
info = get_sample_info(samples[0])
print(f"  Image size: {info['image_size']}")
print(f"  Expressions: {info['expressions'][:2]}")
print(f"  Bbox: {info['bbox_normalized']}")

Loading 51 samples from RefCOCO...

Loaded 51 samples for threshold optimization
First sample:
  Image size: (299, 409)
  Expressions: ['guy in plaid', 'mr plaid']
  Bbox: [0.00307692 0.02246944 0.38117058 0.43819071]


## 2. VLM Setup

Configure Qwen3-VL via Ollama for bbox prediction and embedding extraction.

In [3]:
class VLMPredictor:
    """
    Wrapper for Qwen3-VL bbox prediction.
    
    See QWEN3VL_GROUNDING.md for format details.
    """
    def __init__(self, model_name="qwen3-vl:8b", embed_model="qwen3-embedding:latest"):
        self.model_name = model_name
        self.embed_model = embed_model
        self.parse_failures = 0
        self.total_calls = 0
        print(f"Initialized VLM: {model_name}")
        print(f"Embedding model: {embed_model}")
    
    def image_to_base64(self, image: Image.Image) -> str:
        """Convert PIL image to base64."""
        # Ensure RGB mode (Qwen3-VL requires RGB)
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        buffered = io.BytesIO()
        image.save(buffered, format="PNG")
        return base64.b64encode(buffered.getvalue()).decode()
    
    def predict_bbox(self, image: Image.Image, expression: str) -> np.ndarray:
        """
        Predict bbox for referring expression.
        
        Returns: [x1, y1, x2, y2] normalized [0, 1]
        """
        self.total_calls += 1
        
        # Qwen3-VL uses [0, 1000] coordinate system
        prompt = f'Where is "{expression}" in this image? Output the bounding box in format: {{"bbox_2d": [x_min, y_min, x_max, y_max]}} using coordinates 0-1000.'
        
        img_b64 = self.image_to_base64(image)
        
        try:
            response = ollama.chat(
                model=self.model_name,
                messages=[{
                    'role': 'user',
                    'content': prompt,
                    'images': [img_b64]
                }]
            )
            
            content = response['message']['content']
            bbox = self.parse_bbox(content)
            
            return bbox
            
        except Exception as e:
            print(f"Error predicting bbox: {e}")
            self.parse_failures += 1
            return np.array([0.0, 0.0, 0.0, 0.0])  # Zero-IoU default
    
    def parse_bbox(self, text: str) -> np.ndarray:
        """
        Extract bbox from model response and convert to [0,1].
        
        Qwen3-VL returns coordinates in [0, 1000] range.
        """
        # Try to extract JSON format first
        json_match = re.search(r'\{"bbox_2d"\s*:\s*\[([^\]]+)\]\}', text)
        if json_match:
            coords_str = json_match.group(1)
            numbers = re.findall(r'[-+]?[0-9]*\.?[0-9]+', coords_str)
        else:
            # Fallback: extract any numbers
            numbers = re.findall(r'[-+]?[0-9]*\.?[0-9]+', text)
        
        if len(numbers) >= 4:
            # Parse from [0, 1000] range
            bbox_1000 = np.array([float(numbers[i]) for i in range(4)])
            
            # Convert to [0, 1]
            bbox = bbox_1000 / 1000.0
            bbox = np.clip(bbox, 0, 1)
            
            # Validate
            if bbox[0] >= bbox[2] or bbox[1] >= bbox[3]:
                self.parse_failures += 1
                if self.parse_failures <= 3:  # Only print first 3
                    print(f"Warning: Invalid bbox {bbox} from: '{text[:100]}'")
                return np.array([0.0, 0.0, 0.0, 0.0])  # Zero-area box = IoU 0
            
            return bbox
        else:
            self.parse_failures += 1
            if self.parse_failures <= 3:  # Only print first 3
                print(f"Warning: Could not parse bbox. Response length: {len(text)} chars")
                print(f"  First 200 chars: '{text[:200]}'")
            return np.array([0.0, 0.0, 0.0, 0.0])  # Zero-area box = IoU 0
    
    def get_embedding(self, text: str) -> np.ndarray:
        """Get text embedding."""
        try:
            response = ollama.embeddings(model=self.embed_model, prompt=text)
            return np.array(response['embedding'])
        except Exception as e:
            print(f"Error getting embedding: {e}")
            return np.zeros(4096)  # Fallback
    
    def print_stats(self):
        """Print parsing statistics."""
        success_rate = 100 * (1 - self.parse_failures / max(1, self.total_calls))
        print(f"\nVLM Statistics:")
        print(f"  Total calls: {self.total_calls}")
        print(f"  Parse failures: {self.parse_failures}")
        print(f"  Success rate: {success_rate:.1f}%")

# Initialize
vlm = VLMPredictor()

Initialized VLM: qwen3-vl:8b
Embedding model: qwen3-embedding:latest


## 3. Perturbation Functions

In [4]:
def apply_perturbation(image: Image.Image, brightness=0, contrast=1.0, blur=0, noise=0) -> Image.Image:
    """
    Apply perturbations to image.
    
    Args:
        brightness: -0.3 to 0.3 (additive)
        contrast: 0.7 to 1.3 (multiplicative)
        blur: 0 to 2 (sigma)
        noise: 0 to 0.1 (sigma)
    """
    img = image.copy()
    
    # Brightness
    if brightness != 0:
        enhancer = ImageEnhance.Brightness(img)
        img = enhancer.enhance(1.0 + brightness)
    
    # Contrast
    if contrast != 1.0:
        enhancer = ImageEnhance.Contrast(img)
        img = enhancer.enhance(contrast)
    
    # Blur
    if blur > 0:
        img = img.filter(ImageFilter.GaussianBlur(radius=blur*2))
    
    # Noise
    if noise > 0:
        img_array = np.array(img).astype(np.float32)
        noise_array = np.random.normal(0, noise*255, img_array.shape)
        img_array = np.clip(img_array + noise_array, 0, 255).astype(np.uint8)
        img = Image.fromarray(img_array)
    
    return img

def perturbation_magnitude(brightness=0, contrast=1.0, blur=0, noise=0) -> float:
    """Compute L2 norm of perturbation parameters."""
    return np.sqrt(brightness**2 + (contrast-1)**2 + blur**2 + noise**2)

# Define perturbation grid (5×5 = 25 perturbations)
perturbation_grid = [
    {'brightness': b, 'contrast': c, 'blur': 0, 'noise': 0}
    for b in [-0.2, -0.1, 0, 0.1, 0.2]
    for c in [0.8, 0.9, 1.0, 1.1, 1.2]
]

print(f"Perturbation grid: {len(perturbation_grid)} configurations")
print(f"Example: {perturbation_grid[0]}")
print(f"Magnitude range: [{min(perturbation_magnitude(**p) for p in perturbation_grid):.3f}, {max(perturbation_magnitude(**p) for p in perturbation_grid):.3f}]")

Perturbation grid: 25 configurations
Example: {'brightness': -0.2, 'contrast': 0.8, 'blur': 0, 'noise': 0}
Magnitude range: [0.000, 0.283]


## 4. IoU Computation

In [5]:
def compute_iou(bbox1: np.ndarray, bbox2: np.ndarray) -> float:
    """
    Compute IoU between two normalized bboxes [x1, y1, x2, y2].
    """
    x1_inter = max(bbox1[0], bbox2[0])
    y1_inter = max(bbox1[1], bbox2[1])
    x2_inter = min(bbox1[2], bbox2[2])
    y2_inter = min(bbox1[3], bbox2[3])
    
    intersection = max(0, x2_inter - x1_inter) * max(0, y2_inter - y1_inter)
    
    area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
    area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
    union = area1 + area2 - intersection
    
    return intersection / union if union > 0 else 0

## 5. Run Perturbation Experiment

**WARNING:** This cell will make ~1,275 API calls to Ollama (51 samples × 25 perturbations).

Expected runtime: ~60 minutes with Qwen3-VL 8b (~3 sec/inference)

In [None]:
# Storage for results
results = {
    'iou_original': [],
    'iou_perturbed': [],  # (n_samples, n_perturbations)
    'perturbation_magnitude': [],
    'embedding_distance': [],
    'sample_indices': small_indices.tolist()
}

print(f"Starting perturbation experiment...")
print(f"Samples: {len(samples)}")
print(f"Perturbations per sample: {len(perturbation_grid)}")
total_inferences = len(samples) * (1 + len(perturbation_grid))
print(f"Total inferences: {total_inferences}")
print()

start_time = time.time()

# Single progress bar for all inferences
pbar = tqdm(total=total_inferences, desc="Processing", unit="inference")

for sample_idx, sample in enumerate(samples):
    info = get_sample_info(sample)
    image = info['image']
    expression = info['expressions'][0]  # Use first expression
    bbox_gt = info['bbox_normalized']
    
    # Original prediction
    bbox_pred_orig = vlm.predict_bbox(image, expression)
    iou_orig = compute_iou(bbox_pred_orig, bbox_gt)
    embed_orig = vlm.get_embedding(expression)
    
    results['iou_original'].append(iou_orig)
    
    pbar.update(1)
    pbar.set_postfix({
        'Sample': f'{sample_idx+1}/{len(samples)}',
        'IoU_orig': f'{iou_orig:.3f}'
    })
    
    # Perturbed predictions
    ious_pert = []
    mags = []
    embed_dists = []
    
    for pert_idx, pert_config in enumerate(perturbation_grid):
        # Apply perturbation
        img_pert = apply_perturbation(image, **pert_config)
        
        # Predict
        bbox_pred_pert = vlm.predict_bbox(img_pert, expression)
        iou_pert = compute_iou(bbox_pred_pert, bbox_gt)
        
        # Get embedding
        embed_pert = vlm.get_embedding(expression)
        embed_dist = np.linalg.norm(embed_orig - embed_pert)
        
        ious_pert.append(iou_pert)
        mags.append(perturbation_magnitude(**pert_config))
        embed_dists.append(embed_dist)
        
        pbar.update(1)
        
        # Update postfix every 5 perturbations to reduce overhead
        if (pert_idx + 1) % 5 == 0 or (pert_idx + 1) == len(perturbation_grid):
            elapsed = time.time() - start_time
            pbar.set_postfix({
                'Sample': f'{sample_idx+1}/{len(samples)}',
                'Pert': f'{pert_idx+1}/{len(perturbation_grid)}',
                'IoU': f'{iou_pert:.3f}'
            })
    
    results['iou_perturbed'].append(ious_pert)
    results['perturbation_magnitude'].append(mags)
    results['embedding_distance'].append(embed_dists)

pbar.close()

# Convert to numpy
for key in ['iou_original', 'iou_perturbed', 'perturbation_magnitude', 'embedding_distance']:
    results[key] = np.array(results[key])

elapsed_total = time.time() - start_time

print(f"\n✓ Experiment complete!")
print(f"  Total time: {elapsed_total/60:.1f} minutes")
print(f"  Average per sample: {elapsed_total/len(samples):.1f} seconds")
print(f"  Mean IoU (original): {results['iou_original'].mean():.3f}")
print(f"  Mean IoU (perturbed): {results['iou_perturbed'].mean():.3f}")

# Print VLM parsing statistics
vlm.print_stats()

# Save results
np.savez('data/threshold_optimization_results.npz', **results)
print(f"\nSaved results to threshold_optimization_results.npz")

Starting perturbation experiment...
Samples: 51
Perturbations per sample: 25
Total inferences: 1326



Processing:   0%|          | 0/1326 [00:00<?, ?inference/s]

Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
[
	{"bbox_2d": [100, 0, 285, 266], "label": "top left water"}
]
```'
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out
Error predicting bbox: Ollama API call timed out


## 6. Data Quality Check

In [None]:
# Load results (if restarting from saved file)
# results = dict(np.load('data/threshold_optimization_results.npz'))

print("Dataset Quality Check:")
print(f"  Samples: {len(results['iou_original'])}")
print(f"  Perturbations per sample: {results['iou_perturbed'].shape[1]}")
print(f"\nIoU Statistics:")
print(f"  Original - mean: {results['iou_original'].mean():.3f}, std: {results['iou_original'].std():.3f}")
print(f"  Perturbed - mean: {results['iou_perturbed'].mean():.3f}, std: {results['iou_perturbed'].std():.3f}")
print(f"\nIoU Degradation:")
iou_drop = results['iou_original'][:, np.newaxis] - results['iou_perturbed']
print(f"  Mean drop: {iou_drop.mean():.3f}")
print(f"  Std drop: {iou_drop.std():.3f}")
print(f"  Max drop: {iou_drop.max():.3f}")

# Check for sensitivity
iou_std = results['iou_perturbed'].std(axis=1).mean()
print(f"\nPerturbation Sensitivity:")
print(f"  Mean IoU std across perturbations: {iou_std:.3f}")
if iou_std > 0.1:
    print(f"  ✓ Model shows perturbation sensitivity")
else:
    print(f"  ⚠ Model may be too robust (low sensitivity)")

## 7. Threshold Optimization Functions

In [None]:
def iou_to_class(iou: np.ndarray, thresholds: list) -> np.ndarray:
    """
    Discretize IoU values into classes.
    
    Args:
        iou: IoU values (any shape)
        thresholds: Sorted list [t1, t2, ...] creating len+1 classes
    
    Returns:
        Integer class labels (0, 1, 2, ...)
    """
    classes = np.zeros_like(iou, dtype=int)
    for i, t in enumerate(sorted(thresholds)):
        classes[iou >= t] = i + 1
    return classes

def compute_mutual_information(classes: np.ndarray, signal: np.ndarray, n_bins=20) -> float:
    """
    MI between discrete classes and continuous signal.
    
    Signal is binned into n_bins buckets.
    """
    classes_flat = classes.flatten()
    signal_flat = signal.flatten()
    
    # Bin signal
    signal_binned = np.digitize(signal_flat, np.linspace(signal_flat.min(), signal_flat.max(), n_bins))
    
    return mutual_info_score(classes_flat, signal_binned)

def optimize_thresholds(iou: np.ndarray, signal: np.ndarray, n_thresholds=3, n_restarts=10):
    """
    Find thresholds maximizing MI via multi-start L-BFGS-B.
    
    Returns:
        optimal_thresholds, best_mi
    """
    def objective(thresholds):
        thresholds = np.sort(np.clip(thresholds, 0.01, 0.99))
        classes = iou_to_class(iou, thresholds.tolist())
        mi = compute_mutual_information(classes, signal)
        return -mi  # Minimize negative MI
    
    best_result = None
    best_mi = -np.inf
    
    for _ in range(n_restarts):
        x0 = np.sort(np.random.uniform(0.1, 0.9, n_thresholds))
        bounds = [(0.05, 0.95)] * n_thresholds
        
        result = optimize.minimize(
            objective,
            x0,
            method='L-BFGS-B',
            bounds=bounds
        )
        
        if -result.fun > best_mi:
            best_mi = -result.fun
            best_result = result
    
    return np.sort(best_result.x), best_mi

## 8. Optimize Thresholds for Different Signals

In [None]:
# Define signals
signals = {
    'perturbation_magnitude': results['perturbation_magnitude'],
    'embedding_distance': results['embedding_distance'],
    'iou_drop': results['iou_original'][:, np.newaxis] - results['iou_perturbed']
}

# Optimize for each signal
optimization_results = {}

print("Optimizing thresholds for different signals...\n")

for signal_name, signal in signals.items():
    print(f"Signal: {signal_name}")
    
    # Optimize
    optimal_thresh, optimal_mi = optimize_thresholds(
        results['iou_perturbed'],
        signal,
        n_thresholds=3,
        n_restarts=10
    )
    
    # Baseline (uniform thresholds)
    baseline_thresh = [0.3, 0.5, 0.7]
    baseline_classes = iou_to_class(results['iou_perturbed'], baseline_thresh)
    baseline_mi = compute_mutual_information(baseline_classes, signal)
    
    improvement = ((optimal_mi - baseline_mi) / baseline_mi) * 100
    
    optimization_results[signal_name] = {
        'optimal_thresholds': optimal_thresh,
        'optimal_mi': optimal_mi,
        'baseline_mi': baseline_mi,
        'improvement_pct': improvement
    }
    
    print(f"  Optimal thresholds: {optimal_thresh.round(3)}")
    print(f"  Optimal MI: {optimal_mi:.4f}")
    print(f"  Baseline MI: {baseline_mi:.4f}")
    print(f"  Improvement: {improvement:+.1f}%")
    print()

# Save optimization results
np.savez('data/optimization_results.npz', **optimization_results)
print("Saved optimization results to optimization_results.npz")

## 9. Visualize Results

In [None]:
# Threshold comparison
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Threshold positions
ax = axes[0]
colors = plt.cm.Set2.colors

for i, (name, res) in enumerate(optimization_results.items()):
    thresholds = res['optimal_thresholds']
    y_pos = i
    
    for t in thresholds:
        ax.axvline(t, ymin=y_pos/len(optimization_results), 
                   ymax=(y_pos+0.8)/len(optimization_results), 
                   color=colors[i], linewidth=3)
    
    ax.text(0.02, y_pos + 0.4, 
            f"{name}\nMI={res['optimal_mi']:.3f} (+{res['improvement_pct']:.1f}%)",
            transform=ax.get_yaxis_transform(), fontsize=9)

ax.set_xlim(0, 1)
ax.set_ylim(-0.5, len(optimization_results))
ax.set_xlabel('IoU Threshold')
ax.set_yticks([])
ax.set_title('Optimal Thresholds by Signal Type')

# Plot 2: MI comparison
ax = axes[1]
signal_names = list(optimization_results.keys())
optimal_mis = [optimization_results[s]['optimal_mi'] for s in signal_names]
baseline_mis = [optimization_results[s]['baseline_mi'] for s in signal_names]

x = np.arange(len(signal_names))
width = 0.35

ax.bar(x - width/2, baseline_mis, width, label='Baseline [0.3, 0.5, 0.7]', alpha=0.7)
ax.bar(x + width/2, optimal_mis, width, label='Optimized', alpha=0.7)

ax.set_ylabel('Mutual Information')
ax.set_title('MI: Baseline vs Optimized Thresholds')
ax.set_xticks(x)
ax.set_xticklabels([s.replace('_', '\n') for s in signal_names], fontsize=8)
ax.legend()
ax.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()

## 10. Boundary Detection

In [None]:
# Use optimal thresholds from perturbation magnitude signal
optimal_thresh = optimization_results['perturbation_magnitude']['optimal_thresholds']

# Classify samples
classes_orig = iou_to_class(results['iou_original'], optimal_thresh)
classes_pert = iou_to_class(results['iou_perturbed'], optimal_thresh)

# Detect boundaries: samples where ANY perturbation changes class
class_changes = classes_pert != classes_orig[:, np.newaxis]
boundary_mask = class_changes.any(axis=1)

# Severity: max class jump
severity = np.abs(classes_pert - classes_orig[:, np.newaxis]).max(axis=1)

print(f"Boundary Detection Results:")
print(f"  Optimal thresholds: {optimal_thresh.round(3)}")
print(f"  Boundary samples: {boundary_mask.sum()} / {len(boundary_mask)} ({100*boundary_mask.mean():.1f}%)")
print(f"  Severity distribution: {np.bincount(severity)}")
print(f"\nClass Distribution:")
print(f"  Original: {np.bincount(classes_orig)}")
print(f"  Perturbed: {np.bincount(classes_pert.flatten())}")

# Sanity check
if boundary_mask.mean() < 0.2 or boundary_mask.mean() > 0.6:
    print(f"\n⚠ Warning: Boundary rate {100*boundary_mask.mean():.1f}% outside expected range (20-60%)")
else:
    print(f"\n✓ Boundary rate within realistic range")

## Summary

### Key Findings:
1. **MI Improvement:** Check if optimized thresholds beat baseline by >5%
2. **Signal Differences:** Do geometric and semantic signals yield different thresholds?
3. **Boundary Rate:** Is it realistic (20-40%)?

### Next Steps:
- If results look good → Scale to validation set (99 samples) in notebook 03
- If MI improvement <5% → Check data quality, model sensitivity
- Apply learned thresholds to new samples for boundary detection