# Module 1: Calibration of CCD Imaging Process

We implement the algorithm described in "Statistical Calibration of CCD Imaging Process", Yanghai Tsin, Visvanathan Ramesh and Takeo Kanade, Conference Paper, Proceedings of (ICCV) International Conference on Computer Vision, Vol. 1, pp. 480 - 487, July, 2001 
https://ri.cmu.edu/pub_files/pub3/tsin_yanghai_2001_3/tsin_yanghai_2001_3.pdf

In [11]:
import math
import multiprocessing
import pickle
import glob

import cv2
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")

from joblib import Parallel, delayed
from scipy.optimize import minimize

# for the later heavy-duty calculations, we use several CPU cores
# during debugging, this could be set to 1
num_cores = multiprocessing.cpu_count()


First we need some helper functions to load a series of images from either a folder or a video file.
These are the values _I_ in the original paper.

In [12]:
def get_images_from_folder(pattern, subsampling=32):
    # use the provided file name pattern to get all image files
    files = glob.glob(pattern)
    num_files = len(files)

    # read first file to get image dimensions
    img = cv2.imread(files[0])
    NUM_PIXELS = (img.shape[0] // subsampling) * (img.shape[1] // subsampling)
    data = np.ndarray((num_files, NUM_PIXELS, 3))
    for i, f in enumerate(files):
        img = cv2.imread(f)
        data[i, :, :] = img[::subsampling, ::subsampling, :].reshape((NUM_PIXELS, 3))
    return data

def get_images_from_video(filename, subsampling=32):
    video = cv2.VideoCapture(filename)

    num_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    vidcap = cv2.VideoCapture(filename)
    success, img = vidcap.read()

    NUM_PIXELS = math.ceil(img.shape[0] / subsampling) * math.ceil(img.shape[1] / subsampling)
    data = np.ndarray((num_frames, NUM_PIXELS, 3))

    i = 0
    while success:
        data[i, :, :] = img[::subsampling, ::subsampling, :].reshape((NUM_PIXELS, 3))
        success, img = vidcap.read()
        i += 1
    return data

For this example we use an video file with pedestrians walking across an empty place.

In [3]:
!wget https://github.com/ccc-frankfurt/aisel-hands-on/blob/main/data/epflpedestshort.avi?raw=true

Will not apply HSTS. The HSTS database must be a regular and non-world-writable file.
ERROR: could not open HSTS store at '/home/thraki/.wget-hsts'. HSTS will be disabled.
--2020-11-18 10:45:48--  https://github.com/ccc-frankfurt/aisel-hands-on/blob/main/data/epflpedestshort.avi?raw=true
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/ccc-frankfurt/aisel-hands-on/raw/main/data/epflpedestshort.avi [following]
--2020-11-18 10:45:48--  https://github.com/ccc-frankfurt/aisel-hands-on/raw/main/data/epflpedestshort.avi
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ccc-frankfurt/aisel-hands-on/main/data/epflpedestshort.avi [following]
--2020-11-18 10:45:49--  https://raw.githubusercontent.com/ccc-frankfurt/aisel-hands-on/main/data/epflpedestshort.avi


In [5]:
# get the pixel data for all
data = get_images_from_video("./epflpedestshort.avi?raw=true")
num_files = data.shape[0]
num_pixels = data.shape[1]

Next we initialize all arrays we will later on use with the correct dimensions.

In [6]:
a = [1 for i in range(num_files)]
b = [0 for i in range(num_files)]
g = [i for i in range(256)]
E = np.zeros(num_pixels)
w = np.ones(num_files) / num_files
sigma = np.ones(num_files)
F = np.zeros(num_files)

### Step 1: Predict irradiance for each pixel

First step in each loop will be the calculation of the predicted irradiance _E_ (formula 9 in the paper):

$E = \sum_i w_i \frac{g(I_i) - b_i}{a_i}$

In [7]:
# we use a function instead of a array index so we can np.vectorize it
def g_func(x):
    return g[int(x)]
g_func = np.vectorize(g_func)

def calc_E(w, data, b, a):
    def calc_E_single(w, data, b, a, i):
        # using numpy array broadcast the irradiance is calculated without looping over each pixel
        return w[i] * (g_func(data[i, :]) - b[i]) / a[i]

    # calculate the irradiance for each file seperately and sum the results
    results = Parallel(n_jobs=num_cores, mmap_mode=None)(
        delayed(calc_E_single)(w, data, b, a, i) for i in range(num_files))
    return sum(results)

### Step 2: calculated residual error for each pixel

Next we calculate the residual errors _e_ and the median errors across each pool of pixels (formulas 11 and 12).

In [8]:
def pool(data, k, z, eps=1):
    # find each pool of pixels around z with a maximum difference of epsilon=1
    image = data[k, :]
    mask = np.abs(image - z) < eps
    return mask.nonzero()


def calc_e(data, E, a, b):
    # we use the median instead of the 66-percentage median due to numpy not having a vectorized version of the latter
    pool_errors = np.ndarray((num_pixels, num_files))
    median_errors = np.ndarray((num_files,))
    for k in range(num_files):
        errors_per_pixel = a[k] * E + b[k] - g_func(data[k, :])
        for i, z in enumerate(data[k, :]):
            pool_z = pool(data, k, z)
            pool_errors[i, k] = np.mean([errors_per_pixel[y] for y in pool_z])

        # the value c=1.265 is due to us correcting the green bands only (paragraph 3.1)
        # if we want to correct other colors, use c = 1.333
        median_errors[k] = 1.265 * np.median(errors_per_pixel - pool_errors[:, k])
    return pool_errors, median_errors

### Step 3: Optimize the parameters

After we have calculated the mean errors we optimize the formula 15 with regards to our vectors  _g , a and b_

In [9]:
def calc_gradients():
    #since our optimzing library only supports one vector argument, pack delta_a, delta_b and delta_g into one vector
    args = np.zeros((2 * num_files + 256,))
    args[0:num_files] = 1

    res = minimize(calc_total_F_error, args, options={"maxiter": 10})

    # unpack the optimized values
    da = res.x[0: num_files]
    db = res.x[num_files:2 * num_files]
    dg = res.x[2 * num_files:]
    return da, db, dg


def calc_total_F_error(args):
    #caluclates formula 15, the variance-weighted difference between our measured residuals and the theoretical error
    def calc_total_F_error_single(E, da, db, dg, w, data, k):
        F = calc_F(E, da, db, dg, w, data, k)
        return (e[:, k] - F) ** 2 / sigma[k]

    #unpack the arguments
    da = args[0:num_files]
    db = args[num_files:2 * num_files]
    dg = args[2 * num_files:]

    results = [calc_total_F_error_single(E, da, db, dg, w, green_channel, i) for i in range(num_files)]
    results = sum(results).sum()
    return results


def calc_F(E, da, db, dg, w, data, k):
    #calucate formula 13, the theoretical error at each iteration
    def calc_F_single(E, da, db, dg, w, data, k, i):
        if i == k:
            return 0
        dg_of_data = np.ndarray((num_pixels,))
        dg_of_data[:] = [dg[int(x)] for x in data[i, :]]
        return w[i] * a[k] / da[i] * (dg_of_data - db[i] - E[:] * da[i])

    results = Parallel(n_jobs=num_cores, mmap_mode=None)(
        delayed(calc_F_single)(E, da, db, dg, w, data, k, i) for i in range(num_files))
    total = sum(results)
    total += (w[k] - 1) * (g_func(data[k, :]) - b[k] - E[:] * a[k])
    return total

### Step 4: Iterating the previous steps

Last step is putting it all together, and running several iterations.

In [10]:
LEARN_RATE = 0.1
NUM_LEARNING_ITERATIONS = 10

for j in range(NUM_LEARNING_ITERATIONS):
    # we are interested in green only
    green_channel = data[:, :, 1]

    # see step 1
    E = calc_E(w, green_channel, b, a)

    # see step 2
    e, median_errors = calc_e(green_channel, E, a, b)
    w = 1 / median_errors
    sigma = median_errors

    # see step 3
    da, db, dg = calc_gradients()

    #update the process parameters (formula 16)
    a = a - LEARN_RATE * da
    b = b - LEARN_RATE * db
    g = g - LEARN_RATE * dg

    #plot img over time
    example_img = data[0]
    plt.imshow(calibrate(example_img))
    plt.canvas.draw()


# lastly, save our calibration parameters in the file system so it can later be used
pickle.dump(a, open("a.param", "wb"))
pickle.dump(b, open("b.param", "wb"))
pickle.dump(g, open("g.param", "wb"))

  del sys.path[0]


KeyboardInterrupt: 

To calibrate using the estimated parameters is a simple function:
$\frac{g(I) - b}{a}$

In [None]:
def calibrate(img, a, b, g):
    def apply_g(x):
        return g[int(x)]
    apply_g = np.vectorize(apply_g)

    return (apply_g(img) - b) / a