# Parallelism in Computer Vision

## Setup

### Imports

In [1]:
import time
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from pathlib import Path
import zipfile
import urllib.request
from typing import List

### Setup Data paths for Caltech101 dataset

In [2]:
DATA_ROOT = Path("../data")
ARCHIVE_URL = "https://www.kaggle.com/api/v1/datasets/download/imbikramsaha/caltech-101"
ARCHIVE_PATH = DATA_ROOT / "caltech101.zip"
EXTRACT_DIR = DATA_ROOT / "caltech-101"


In [3]:
def time_it(func):
    """Decorator to measure execution time of functions."""
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} took {end - start:.4f}s")
        return result
    return wrapper

### Download Caltech101 dataset

In [None]:
def ensure_caltech101():
    if EXTRACT_DIR.exists() and any(EXTRACT_DIR.iterdir()):
        print("Caltech101 dataset already exists.")
        return
    print("Downloading Caltech101 archive...")
    urllib.request.urlretrieve(ARCHIVE_URL, ARCHIVE_PATH)
    print("Extracting Caltech101 archive...")
    with zipfile.ZipFile(ARCHIVE_PATH, "r") as zip_ref:
        zip_ref.extractall(path=DATA_ROOT)
    print("Caltech101 dataset extracted to", EXTRACT_DIR)

ensure_caltech101()


Downloading Caltech101 archive...
Extracting Caltech101 archive...
Caltech101 dataset extracted to ../data/caltech101


### Sample Images

In [4]:
def sample_images(root: Path, limit: int=500) -> List[Path]:
    extensions = {".jpg", ".jpeg", ".png", ".bmp"}
    paths = [path for path in root.rglob("*") if path.suffix.lower() in extensions]
    return paths[:limit]

paths = sample_images(EXTRACT_DIR)
len(paths), paths[:5]


(500,
 [PosixPath('../data/caltech-101/yin_yang/image_0042.jpg'),
  PosixPath('../data/caltech-101/yin_yang/image_0002.jpg'),
  PosixPath('../data/caltech-101/yin_yang/image_0053.jpg'),
  PosixPath('../data/caltech-101/yin_yang/image_0009.jpg'),
  PosixPath('../data/caltech-101/yin_yang/image_0019.jpg')])

## Serial Preprocessing

### Resize, grayscale, Sobel edges.

In [8]:
def load_resize(path, size=(224,224)):
    """
    Open an image, convert it to RGB to ensure we have 3 channels, and resize it.
    Returns a NumPy array of shape (height, width, 3).
    """
    with Image.open(path) as img:
        img = img.convert("RGB").resize(size, Image.BILINEAR)
        return np.array(img, dtype=np.uint8)

def to_gray(img_rgb):
    """
    Convert an RGB image to grayscale using perceptual weights.
    Returns a NumPy array of shape (height, width).
    """
    # Use ellipsis (...) to select all pixels
    r, g, b = img_rgb[..., 0], img_rgb[..., 1], img_rgb[..., 2]
    # Apply ITU-R 601 to calculate luminance
    # Cast float32 to avoid overflow later
    return (0.299 * r + 0.587 * g + 0.114 * b).astype(np.float32)

def sobel_edges(img_gray):
    """
    Apply the Sobel operator to a grayscale image of shape (height, width) to detect edges.
    Returns a NumPy array of the same shape as the input image.
    """
    # Create two 3x3 Sobel kernels
    sobel_x = np.array([[-1, 0, 1],
                        [-2, 0, 2],
                        [-1, 0, 1]], dtype=np.float32)
    sobel_y = np.array([[ 1, 2, 1],
                        [ 0, 0, 0],
                        [-1,-2,-1]], dtype=np.float32)

    grad = np.pad(img_gray, ((1, 1), (1, 1)), mode='edge')
    grad_x = (
        grad[:-2, :-2] * sobel_x[0, 0] + grad[:-2, 1:-1] * sobel_x[0, 1] + grad[:-2, 2:] * sobel_x[0, 2] +
        grad[1:-1, :-2] * sobel_x[1, 0] + grad[1:-1, 1:-1] * sobel_x[1, 1] + grad[1:-1, 2:] * sobel_x[1, 2] +
        grad[2:, :-2] * sobel_x[2, 0] + grad[2:, 1:-1] * sobel_x[2, 1] + grad[2:, 2:] * sobel_x[2, 2]
    )
    grad_y = (
        grad[:-2, :-2] * sobel_y[0, 0] + grad[:-2, 1:-1] * sobel_y[0, 1] + grad[:-2, 2:] * sobel_y[0, 2] +
        grad[1:-1, :-2] * sobel_y[1, 0] + grad[1:-1, 1:-1] * sobel_y[1, 1] + grad[1:-1, 2:] * sobel_y[1, 2] +
        grad[2:, :-2] * sobel_y[2, 0] + grad[2:, 1:-1] * sobel_y[2, 1] + grad[2:, 2:] * sobel_y[2, 2]
    )
    return np.sqrt(grad_x**2 + grad_y**2).astype(np.float32)

### Serial Preprocessing (full dictionary)

In [5]:
def preprocess_one(path, size=(224, 224), bins=32):
    """
    Preprocess pipeline for one image.
    Returns a dictionary containing components for reuse.
    """
    img_resized = load_resize(path, size)
    img_gray = to_gray(img_resized)
    img_edges = sobel_edges(img_gray)

    hists = []
    for channel in range(3):
        hist_channel, _ = np.histogram(img_resized[..., channel], bins=bins, range=(0, 255))
        hists.append(hist_channel.astype(np.float32))

    color_hist = np.concatenate(hists, axis=0)

    return {
        "path": str(path),
        "resized": img_resized,
        "gray": img_gray,
        "edges": img_edges,
        "color_hist": color_hist,
        "size": size,
        "bins": bins
    }

def preprocess_all_serial(image_paths, size=(224, 224), bins=32, limit=None):
    """
    Preprocess pipeline for a list of images.
    Returns a list of dictionaries, each containing components for reuse.
    """
    if limit is not None:
        image_paths = image_paths[:limit]
    return [preprocess_one(path, size=size, bins=bins) for path in image_paths]


### Serial Preprocessing (minimal dictionary)

In [None]:
def preprocess_one_min(path, size=(224,224), bins=32):
    img = load_resize(path, size)
    gray = to_gray(img)
    edges = sobel_edges(gray)
    # Simple feature: edge mean + color hist (density)
    hists = []
    for c in range(3):
        h, _ = np.histogram(img[..., c], bins=bins, range=(0,255), density=True)
        hists.append(h.astype(np.float32))
    color_hist = np.concatenate(hists)
    return {
        "path": str(path),
        "edge_mean": float(edges.mean()),
        "edge_max": float(edges.max()),
        "color_hist": color_hist
    }


### Performance test on different image size

In [6]:
import time, statistics as stats

def measure_components(paths, limit=60, size=(224,224)):
    t_load = []
    t_compute = []
    for p in paths[:limit]:
        t0 = time.perf_counter()
        img = load_resize(p, size=size)
        t1 = time.perf_counter()
        gray = to_gray(img)
        edges = sobel_edges(gray)
        t2 = time.perf_counter()
        t_load.append(t1 - t0)
        t_compute.append(t2 - t1)
    return {
        "n": limit,
        "load_mean": stats.mean(t_load),
        "compute_mean": stats.mean(t_compute),
        "load_total": sum(t_load),
        "compute_total": sum(t_compute)
    }


#### 224x224

In [40]:
component_stats_224 = measure_components(paths, limit=80, size=(224,224))
component_stats_224

{'n': 80,
 'load_mean': 0.001820724237950344,
 'compute_mean': 0.0012321253870140937,
 'load_total': 0.14565793903602753,
 'compute_total': 0.09857003096112749}

#### 112x112

In [41]:
component_stats_112 = measure_components(paths, limit=80, size=(112,112))
component_stats_112

{'n': 80,
 'load_mean': 0.0013956687629615772,
 'compute_mean': 0.0005102350501147157,
 'load_total': 0.11165350103692617,
 'compute_total': 0.040818804009177256}

#### 56x56

In [51]:
component_stats_56 = measure_components(paths, limit=80, size=(56,56))
component_stats_56

{'n': 80,
 'load_mean': 0.0012900930746582162,
 'compute_mean': 0.00035793070046565847,
 'load_total': 0.1032074459726573,
 'compute_total': 0.028634456037252676}

## Parallel Preprocessing

### Parallel Preprocessing (full dictionary)

In [9]:
from concurrent.futures import ProcessPoolExecutor, as_completed
import os, math

def preprocess_parallel(paths, size=(224,224), bins=32, max_workers=None, limit=None):
    if limit: paths = paths[:limit]
    t0 = time.perf_counter()
    out = []
    with ProcessPoolExecutor(max_workers=max_workers) as ex:
        futures = [ex.submit(preprocess_one, p, size, bins) for p in paths]
        for f in as_completed(futures):
            out.append(f.result())
    elapsed = time.perf_counter() - t0
    return out, elapsed

par_samples, par_time = preprocess_parallel(paths, limit=200)
print(f"ProcessPool full objects: {len(par_samples)} images in {par_time:.2f}s")

ProcessPool full objects: 200 images in 1.35s


### Parallel Preprocessing (minimal dictionary)

In [11]:
def preprocess_parallel_min(paths, size=(224,224), bins=32, max_workers=None, limit=None):
    if limit: paths = paths[:limit]
    t0 = time.perf_counter()
    out = []
    with ProcessPoolExecutor(max_workers=max_workers) as ex:
        futures = [ex.submit(preprocess_one_min, p, size, bins) for p in paths]
        for f in as_completed(futures):
            out.append(f.result())
    elapsed = time.perf_counter() - t0
    return out, elapsed

par_min_samples, par_min_time = preprocess_parallel_min(paths, limit=200)
print(f"ProcessPool minimal dict: {len(par_min_samples)} images in {par_min_time:.2f}s")

ProcessPool minimal dict: 200 images in 1.20s


### Preprocessing using threads

In [12]:
from concurrent.futures import ThreadPoolExecutor

def preprocess_threads(paths, size=(224,224), bins=32, limit=None):
    if limit: paths = paths[:limit]
    t0 = time.perf_counter()
    with ThreadPoolExecutor() as ex:
        results = list(ex.map(lambda p: preprocess_one(p, size, bins), paths))
    return results, time.perf_counter() - t0

thr_samples, thr_time = preprocess_threads(paths, limit=200)
print(f"ThreadPool: {len(thr_samples)} images in {thr_time:.2f}s")

ThreadPool: 200 images in 2.81s


### Evaluating Process Speedup

In [14]:
t0 = time.perf_counter()
serial_subset = preprocess_all_serial(paths[:200])
serial_time = time.perf_counter() - t0
print(f"Serial (200 images): {serial_time:.2f}s")

print(f"Speedups vs serial:")
print(f"  Processes (full objects): {serial_time / par_time:.2f}x")
print(f"  Processes (minimal dict): {serial_time / par_min_time:.2f}x")
print(f"  Threads: {serial_time / thr_time:.2f}x")

Serial (200 images): 1.22s
Speedups vs serial:
  Processes (full objects): 0.90x
  Processes (minimal dict): 1.02x
  Threads: 0.44x
