In [3]:
import numpy as np
from PIL import Image
from pathlib import Path
from typing import Dict, Tuple
import uuid
from scipy.interpolate import RegularGridInterpolator


def parse_cube_file(cube_path: Path) -> Tuple[int, np.ndarray, float, float]:
    """Parse a .cube LUT file."""
    with open(cube_path, "r") as f:
        lines = f.readlines()
    
    lut_size = None
    lut_data = []
    domain_min = 0.0
    domain_max = 1.0
    
    for line in lines:
        line = line.strip()
        
        if not line or line.startswith("#"):
            continue
        
        upper = line.upper()
        
        if upper.startswith("LUT_3D_SIZE"):
            lut_size = int(line.split()[1])
            continue
        
        if upper.startswith("DOMAIN_MIN"):
            domain_min = float(line.split()[1])
            continue
        
        if upper.startswith("DOMAIN_MAX"):
            domain_max = float(line.split()[1])
            continue
        
        if upper.startswith(("TITLE", "LUT_1D_SIZE", "LUT_1D_INPUT_RANGE", "LUT_3D_INPUT_RANGE")):
            continue
        
        parts = line.split()
        if len(parts) >= 3:
            try:
                r, g, b = float(parts[0]), float(parts[1]), float(parts[2])
                lut_data.append([r, g, b])
            except ValueError:
                continue
    
    if lut_size is None:
        raise ValueError(f"LUT_3D_SIZE not found in {cube_path}")
    
    expected = lut_size ** 3
    if len(lut_data) != expected:
        raise ValueError(f"Expected {expected} entries, got {len(lut_data)} in {cube_path}")
    
    lut_data = np.array(lut_data, dtype=np.float32)
    lut_3d = lut_data.reshape(lut_size, lut_size, lut_size, 3)
    lut_3d = np.transpose(lut_3d, (2, 1, 0, 3))
    
    return lut_size, lut_3d, domain_min, domain_max


def apply_lut_to_image(
    lut_path: str | Path,
    image_path: str | Path,
    output_dir: str | Path,
    lut_cache: dict = None,
) -> Dict[str, str]:
    """Apply a 3D LUT to an image and save with a unique UUID filename."""
    lut_path = Path(lut_path)
    image_path = Path(image_path)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    cache_key = str(lut_path)
    if lut_cache is not None and cache_key in lut_cache:
        lut_size, lut_3d, domain_min, domain_max = lut_cache[cache_key]
    else:
        lut_size, lut_3d, domain_min, domain_max = parse_cube_file(lut_path)
        if lut_cache is not None:
            lut_cache[cache_key] = (lut_size, lut_3d, domain_min, domain_max)
    
    with Image.open(image_path) as img:
        if img.mode != "RGB":
            img = img.convert("RGB")
        img_array = np.array(img, dtype=np.float32)
    
    h, w = img_array.shape[:2]
    img_array /= 255.0
    
    domain_range = domain_max - domain_min
    if domain_range != 1.0 or domain_min != 0.0:
        img_array = img_array * domain_range + domain_min
    
    np.clip(img_array, domain_min, domain_max, out=img_array)
    
    rgb = img_array.reshape(-1, 3)
    del img_array
    
    grid_coords = np.linspace(domain_min, domain_max, lut_size)
    interpolator = RegularGridInterpolator(
        (grid_coords, grid_coords, grid_coords),
        lut_3d,
        method="linear",
        bounds_error=False,
        fill_value=None,
    )
    
    transformed = interpolator(rgb)
    del rgb
    
    transformed = transformed.reshape(h, w, 3)
    np.clip(transformed, 0.0, 1.0, out=transformed)
    transformed *= 255.0
    output_array = transformed.astype(np.uint8)
    del transformed
    
    output_img = Image.fromarray(output_array, mode="RGB")
    del output_array
    
    unique_id = str(uuid.uuid4())
    output_path = output_dir / f"{unique_id}.jpg"
    output_img.save(output_path, quality=95)
    output_img.close()
    
    return {
        "uuid": unique_id,
        "source_image": image_path.name,
        "lut_file": lut_path.name,
        "output_path": str(output_path),
    }

In [4]:
import numpy as np
from PIL import Image
from pathlib import Path
from typing import Dict, List, Tuple
import uuid
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock

def process_images_with_luts(
    image_paths: List[str | Path],
    lut_paths: List[str | Path],
    output_dir: str | Path,
    metadata_file: str | Path = "metadata.json",
    max_workers: int = 4,
) -> None:
    """Process all images with all LUTs and generate metadata."""
    if isinstance(image_paths, (str, Path)):
        image_paths = [image_paths]
    if isinstance(lut_paths, (str, Path)):
        lut_paths = [lut_paths]
    
    image_paths = [Path(p).resolve() for p in image_paths]
    lut_paths = [Path(p).resolve() for p in lut_paths]
    
    for p in image_paths:
        if not p.is_file():
            raise FileNotFoundError(f"Image not found: {p}")
    for p in lut_paths:
        if not p.is_file():
            raise FileNotFoundError(f"LUT not found: {p}")
    
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    metadata_file = Path(metadata_file)
    
    # Pre-cache all LUTs
    lut_cache = {}
    for lut_path in lut_paths:
        cache_key = str(lut_path)
        lut_cache[cache_key] = parse_cube_file(lut_path)
    print(f"[INFO] Cached {len(lut_cache)} LUTs")
    
    tasks = [(img, lut) for img in image_paths for lut in lut_paths]
    total_tasks = len(tasks)
    
    print(f"[INFO] Processing {len(image_paths)} images x {len(lut_paths)} LUTs = {total_tasks} combinations")
    
    metadata = []
    metadata_lock = Lock()
    completed = 0
    completed_lock = Lock()
    
    def process_task(image_path: Path, lut_path: Path):
        nonlocal completed
        try:
            result = apply_lut_to_image(lut_path, image_path, output_dir, lut_cache)
            
            with metadata_lock:
                metadata.append(result)
            
            with completed_lock:
                completed += 1
                if completed % 10 == 0 or completed == total_tasks:
                    print(f"[PROGRESS] {completed}/{total_tasks} ({completed * 100 // total_tasks}%)")
            
            return "ok", result["uuid"]
        except Exception as e:
            with completed_lock:
                completed += 1
            return "error", f"{image_path.name} + {lut_path.name}: {e}"
    
    errors = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_task, img, lut) for img, lut in tasks]
        
        for future in as_completed(futures):
            status, info = future.result()
            if status == "error":
                errors.append(info)
                print(f"[ERROR] {info}")
    
    with open(metadata_file, "w") as f:
        json.dump(metadata, f, indent=2)
    
    print(f"\nComplete: {len(metadata)} processed, {len(errors)} errors")
    print(f"Output: {output_dir}, Metadata: {metadata_file}")

In [5]:
import random
from pathlib import Path
from typing import List


def get_random_images(
    image_dir: str | Path,
    num_images: int,
    seed: int = 42,
    extensions: List[str] = None,
) -> List[str]:
    """
    Get a random sample of images from a directory.
    
    Args:
        image_dir: Directory containing images
        num_images: Number of random images to select
        seed: Random seed for reproducibility
        extensions: List of image extensions to include (default: ['jpg', 'jpeg', 'png'])
    
    Returns:
        List of file paths as strings
    """
    if extensions is None:
        extensions = ['jpg', 'jpeg', 'png', 'JPG', 'JPEG', 'PNG']
    
    image_dir = Path(image_dir).resolve()
    
    if not image_dir.is_dir():
        raise FileNotFoundError(f"Directory not found: {image_dir}")
    
    # Gather all images with specified extensions
    all_images = [
        str(f) for ext in extensions
        for f in image_dir.glob(f"*.{ext}")
        if f.is_file()
    ]
    
    if not all_images:
        raise ValueError(f"No images found in {image_dir}")
    
    # Set random seed for reproducibility
    random.seed(seed)
    
    # Sample images
    num_to_select = min(num_images, len(all_images))
    selected = random.sample(all_images, num_to_select)
    
    if num_to_select < num_images:
        print(f"[WARNING] Only {num_to_select} images available, requested {num_images}")
    
    return sorted(selected)

In [6]:
import random
from pathlib import Path
from typing import List


def get_lut_files(
    lut_dir: str | Path,
    limit: int = None,
    extensions: List[str] = None,
    seed: int = None,
) -> List[str]:
    """
    Get .cube LUT files from a directory.
    
    Args:
        lut_dir: Directory containing LUT files
        limit: Maximum number of files to return (None = all)
        extensions: LUT extensions to search for (default: ['cube', 'CUBE'])
        seed: Random seed for sampling (None = no randomization, sorted order)
    
    Returns:
        List of file paths as strings
    """
    if extensions is None:
        extensions = ["cube", "CUBE"]
    
    lut_dir = Path(lut_dir).resolve()
    
    if not lut_dir.is_dir():
        raise FileNotFoundError(f"Directory not found: {lut_dir}")
    
    lut_files = [
        str(f) for ext in extensions
        for f in lut_dir.glob(f"*.{ext}")
        if f.is_file()
    ]
    
    if not lut_files:
        raise ValueError(f"No LUT files found in {lut_dir}")
    
    if seed is not None:
        random.seed(seed)
        random.shuffle(lut_files)
    else:
        lut_files.sort()
    
    if limit is not None:
        lut_files = lut_files[:limit]
    
    print(f"[INFO] Selected {len(lut_files)} LUT files from {lut_dir}" + 
          (f" (seed={seed})" if seed is not None else ""))
    
    return lut_files

In [7]:
images = get_random_images("./unsplash_raw_images", num_images=200, seed=42)
luts = get_lut_files("./ml_luts", limit = 32, seed = 42)

[INFO] Selected 32 LUT files from /Users/joseluna/camera_apps_project/lut_ML_model/ml_luts (seed=42)


In [8]:
process_images_with_luts(
        image_paths=images,
        lut_paths=luts,
        output_dir="./processed_images",
        metadata_file="./processed_images/metadata.json",
        max_workers=8
    )

[INFO] Cached 32 LUTs
[INFO] Processing 200 images x 32 LUTs = 6400 combinations


  output_img = Image.fromarray(output_array, mode="RGB")


[PROGRESS] 10/6400 (0%)
[PROGRESS] 20/6400 (0%)
[PROGRESS] 30/6400 (0%)
[PROGRESS] 40/6400 (0%)
[PROGRESS] 50/6400 (0%)
[PROGRESS] 60/6400 (0%)
[PROGRESS] 70/6400 (1%)
[PROGRESS] 80/6400 (1%)
[PROGRESS] 90/6400 (1%)
[PROGRESS] 100/6400 (1%)
[PROGRESS] 110/6400 (1%)
[PROGRESS] 120/6400 (1%)
[PROGRESS] 130/6400 (2%)
[PROGRESS] 140/6400 (2%)
[PROGRESS] 150/6400 (2%)
[PROGRESS] 160/6400 (2%)
[PROGRESS] 170/6400 (2%)
[PROGRESS] 180/6400 (2%)
[PROGRESS] 190/6400 (2%)
[PROGRESS] 200/6400 (3%)
[PROGRESS] 210/6400 (3%)
[PROGRESS] 220/6400 (3%)
[PROGRESS] 230/6400 (3%)
[PROGRESS] 240/6400 (3%)
[PROGRESS] 250/6400 (3%)
[PROGRESS] 260/6400 (4%)
[PROGRESS] 270/6400 (4%)
[PROGRESS] 280/6400 (4%)
[PROGRESS] 290/6400 (4%)
[PROGRESS] 300/6400 (4%)
[PROGRESS] 310/6400 (4%)
[PROGRESS] 320/6400 (5%)
[PROGRESS] 330/6400 (5%)
[PROGRESS] 340/6400 (5%)
[PROGRESS] 350/6400 (5%)
[PROGRESS] 360/6400 (5%)
[PROGRESS] 370/6400 (5%)
[PROGRESS] 380/6400 (5%)
[PROGRESS] 390/6400 (6%)
[PROGRESS] 400/6400 (6%)
[PROGRESS