# Camera Noise Model Estimation

## Includes

In [None]:
# mass includes
import os
import pickle
import pyexiv2 as exiv2
import numpy as np
import rawpy as rp
import scipy.stats as stat
import multiprocessing as par
import matplotlib.pyplot as plt
import pprint as pp
from matplotlib import rc
from tqdm.notebook import tqdm

## Initialization

In [None]:
# path to dataset
data_root = '/home/lab/Documents/SSD/PlieCNR/cameras/DJI'

# set font type and size
# font = {'family': 'Times New Roman', 'weight': 'normal', 'size': 14}
# rc('font', **font)

In [None]:
# make folder to saves
save_root = os.path.join(data_root, 'noiseModel')
if not os.path.exists(save_root):
    os.makedirs(save_root)

# get file list
data_folder = os.path.join(data_root, 'noise')
file_list = [
    file for file in os.listdir(data_folder) if '.dng' or '.DNG' in file
]
file_list.sort()

cam_iso_list = {}
noise_profile_list = {}
for file in file_list:
    # read metadata
    img_md = exiv2.ImageMetadata(os.path.join(data_folder, file))
    img_md.read()

    # extract metadata
    cam_iso = None
    cam_noise = None
    for key in img_md:
        if 'ISOSpeedRatings' in key:
            cam_iso = img_md[key].value
            continue

        if 'NoiseProfile' in key:
            cam_noise = img_md[key].raw_value.split()
            cam_noise = np.array(cam_noise, dtype=np.float32)
            continue

    if cam_iso is None:
        raise AttributeError('No ISOSpeedRatings found.')
    elif cam_noise is None:
        raise AttributeError('No NoiseProfile found.')
    else:
        print(file, 'ISO:', cam_iso, 'Noise:', cam_noise[:2])

    # add to list
    if cam_iso in cam_iso_list:
        cam_iso_list[cam_iso].append(file)
    else:
        cam_iso_list[cam_iso] = [file]

    if cam_iso not in noise_profile_list:
        noise_profile_list[cam_iso] = cam_noise

## Estimate noise model

In [None]:
# requires dark image
def estNoiseModel(img_seq, a=-0.5, b=0.5, figs=[]):
    row_noise = []
    read_noise = []
    noise_model = {}
    for img in img_seq:
        row_mean = np.mean(img, axis=1)
        row_noise.append(row_mean)
        img = img - row_mean.reshape(-1, 1)
        read_noise.append(img)
    row_noise = np.array(row_noise).flatten()
    read_noise = np.array(read_noise).flatten()

    # fit row noise model
    loc, scale = stat.norm.fit(row_noise)
    noise_model['sig_r'] = scale

    # fit read noise model
    if len(figs) > 0:
        ax1 = figs[0].add_subplot(1, 1, 1)
        ax2 = figs[1].add_subplot(1, 1, 1)
    else:
        ax1 = None
        ax2 = None
    shape, ppcc = stat.ppcc_plot(read_noise, a, b, plot=ax1)
    lam = shape[np.argmax(ppcc)]
    ax1.vlines(lam, 0, 1, colors='r')
    noise_model['lam_tl'] = lam

    _, result = stat.probplot(read_noise,
                              sparams=(lam, ),
                              dist=stat.tukeylambda,
                              plot=ax2)
    noise_model['sig_tl'] = result[0]
    noise_model['r_tl'] = result[2]

    return noise_model


# process given ISO
def processISO(cam_iso):
    # estimate noise models
    file_list = cam_iso_list[cam_iso]

    img_seq = []
    for file in file_list:
        # read raw image
        with rp.imread(os.path.join(data_folder, file)) as raw_obj:
            raw_img = raw_obj.raw_image_visible.copy()
            raw_img = raw_img.astype(np.float32)
            mask = raw_obj.raw_colors_visible
            blk_level = raw_obj.black_level_per_channel
            sat_level = raw_obj.white_level

        # normalize to 0-1
        raw_img[mask == 0] = raw_img[mask == 0] - blk_level[0]
        raw_img[mask == 1] = raw_img[mask == 1] - blk_level[1]
        raw_img[mask == 2] = raw_img[mask == 2] - blk_level[2]
        raw_img[mask == 3] = raw_img[mask == 3] - blk_level[3]
        raw_img = raw_img / float(sat_level - max(blk_level))
        img_seq.append(raw_img)

    # estimate noise model
    fig0 = plt.figure()
    fig1 = plt.figure()
    noise_model = estNoiseModel(img_seq, figs=[fig0, fig1])
    noise_model['k'] = noise_profile_list[cam_iso][0]
    noise_model['hf_step'] = 0.5 / float(sat_level)

    # save figures
    save_path = os.path.join(
        save_root,
        'ISO%d_PPCC.png' % cam_iso,
    )
    fig0.savefig(save_path, bbox_inches='tight')
    save_path = os.path.join(save_root, 'ISO%d_PP.png' % cam_iso)
    fig1.savefig(save_path, bbox_inches='tight')
    plt.close()

    return cam_iso, noise_model

## Linear regression

In [None]:
# requires estimated noise models
def linearFit(data_x, data_y):
    # compute slope and constant
    x_mean = np.mean(data_x)
    y_mean = np.mean(data_y)
    xy_diff = np.sum((data_x - x_mean) * (data_y - y_mean))
    x_diff2 = np.sum((data_x - x_mean)**2)

    slope = xy_diff / x_diff2
    const = y_mean - slope * x_mean

    # compute standard deviation
    length = data_x.shape[0]
    x_sum = np.sum(data_x)
    y_sum = np.sum(data_y)
    xy_sum = np.sum(data_x * data_y)
    x_sum2 = np.sum(data_x)**2
    y_sum2 = np.sum(data_y)**2
    xy_sum2 = np.sum(data_x * data_y)**2
    x2_sum = np.sum(data_x**2)
    y2_sum = np.sum(data_y**2)

    std = y2_sum - (length * xy_sum2 + y_sum2 * x2_sum -
                    2 * x_sum * y_sum * xy_sum) / (length * x2_sum - x_sum2)
    std = std / (length - 2)

    return slope, const, std

## Fit noise models

In [None]:
# estimate noise models
cores = par.cpu_count()
pool = par.Pool(processes=cores)
model_list = {}
iso_seq = [cam_iso for cam_iso in cam_iso_list]

for (cam_iso, noise_model) in tqdm(pool.imap(processISO, iso_seq),
                                   desc='progress',
                                   total=len(iso_seq)):
    model_list[cam_iso] = noise_model
    print('Estimation of ISO%d completed.' % cam_iso)
pool.close()

# save estimated noise models
save_path = os.path.join(save_root, 'model_params.pkl')
with open(save_path, 'wb') as pkl:
    pickle.dump(model_list, pkl)

## Matplotlib trick

In [None]:
# plot a line from slope and intercept
def abline(fig, slope, intercept):
    axes = fig.gca()
    x_vals = np.array(axes.get_xlim())
    y_vals = intercept + slope * x_vals
    plt.plot(x_vals, y_vals, 'r-')

## Statistical analysis

In [None]:
# take log
k_list = []
sig_r_list = []
sig_tl_list = []
lam_tl_list = []
for cam_iso in model_list:
    k_list.append(model_list[cam_iso]['k'])
    sig_r_list.append(model_list[cam_iso]['sig_r'])
    sig_tl_list.append(model_list[cam_iso]['sig_tl'])
    lam_tl_list.append(model_list[cam_iso]['lam_tl'])

k_list = np.log(np.array(k_list))
sig_r_list = np.log(np.array(sig_r_list))
sig_tl_list = np.log(np.array(sig_tl_list))
lam_tl_mean = np.mean(lam_tl_list)
hf_step = model_list[list(model_list.keys())[0]]['hf_step']

# linear regression
r_s, r_c, r_std = linearFit(k_list, sig_r_list)
tl_s, tl_c, tl_std = linearFit(k_list, sig_tl_list)

# print results
print(
    "Stat info for row noise {'slope': %f, 'const': %f, 'std': %e, 'min': %f, 'max': %f}"
    % (r_s, r_c, r_std, np.min(k_list), np.max(k_list)))
print(
    "Stat info for read noise {'slope': %f, 'const': %f, 'std': %e, 'min': %f, 'max': %f}"
    % (tl_s, tl_c, tl_std, np.min(k_list), np.max(k_list)))

# plot results
fig1 = plt.figure()
ax1 = fig1.add_subplot(1, 1, 1)
ax1.plot(k_list, sig_r_list, 'bo', markersize=3)
abline(fig1, r_s, r_c)
plt.xlabel('$\log(K)$')
plt.ylabel('$\log(\sigma_{r})$')
plt.close()

fig2 = plt.figure()
ax2 = fig2.add_subplot(1, 1, 1)
ax2.plot(k_list, sig_tl_list, 'bo', markersize=3)
abline(fig2, tl_s, tl_c)
plt.xlabel('$\log(K)$')
plt.ylabel('$\log(\sigma_{TL})$')
plt.close()

# save to figures
save_path = os.path.join(save_root, 'row_fit.png')
fig1.savefig(save_path, bbox_inches='tight')
save_path = os.path.join(save_root, 'read_fit.png')
fig2.savefig(save_path, bbox_inches='tight')

# save statistics
model_stats = {}
model_stats['hf_step'] = hf_step
model_stats['lam_tl'] = lam_tl_mean
model_stats['k'] = (np.min(k_list), np.max(k_list))
model_stats['sig_r'] = (r_s, r_c, r_std)
model_stats['sig_tl'] = (tl_s, tl_c, tl_std)
save_path = os.path.join(save_root, 'model_stats.pkl')
with open(save_path, 'wb') as pkl:
    pickle.dump(model_stats, pkl)