# LaQuacco

## Laboratory Quality Control

### Module Imports

In [None]:
import multiprocessing
import os
import platform
import sys
import matplotlib.pyplot as plt
import numpy as np
import definitions as defs  # required by Jupyter

### User Input

In [None]:
# define number of concurrent workers
processes = multiprocessing.cpu_count() // 2 or 1

# define relative samples size for normalization
sample_perc = 10

# define file search patterns
data_dir = r"C:\Users\Christian Rickert\Desktop\Polaris"  # use a raw string (r"")
data_ext = "*.tif"  # include files matching pattern
anti_ext = ""  # exclude files matching pattern
recurse = True  # find files in subdirectories

### Check Files

In [None]:
# get a list of all image files
files = sorted(
    defs.get_files(
        path=data_dir,
        pat=data_ext,
        anti=anti_ext,
        recurse=recurse,
    ),
    key=str.lower,
)

print(f"Found {len(files)} image files in {data_dir}:")
for file in files:
    print(f"{file.replace(data_dir, '.')}")

### Main Program

In [None]:
if __name__ == "__main__":
    # safe import of main module avoids spawning multiple processes simultaneously
    if platform.system() == "Windows":
        multiprocessing.freeze_support()  # required by 'multiprocessing'

    # sample experimental image data
    try:
        samples = sorted(defs.get_samples(population=files, perc=20), key=str.lower)
        sample_args = [(sample, None) for sample in samples]
    except ValueError:
        print("Could not draw samples from experimental population.")
        sys.exit(1)

    # analyze the sample data
    with multiprocessing.Pool(processes) as pool:
        sample_results = pool.starmap(defs.read_img_data, sample_args)
        pool.close()  # wait for worker tasks to complete
        pool.join()  # wait for worker process to exit
    samples_img_data = {sample: img_data for (sample, img_data) in sample_results}

    chans_set = set()  # avoid duplicate entries
    for img_data in samples_img_data.values():
        for chan in img_data:
            if chan not in ["metadata"]:
                chans_set.add(chan)
    chans = sorted(chans_set, key=str.lower)

    # prepare colormap
    color_map = defs.get_colormap(len(chans))

    # prepare lambdas for power transform
    chan_lmbdas = {}
    for chan in chans:
        chan_data = defs.get_chan_data(samples_img_data, chan, "chan_lmbda")
        chan_mean = defs.get_mean(chan_data)
        chan_lmbdas[chan] = chan_mean

    # analyze experimental image data
    image_args = [(image, chan_lmbdas) for image in files]
    with multiprocessing.Pool(processes) as pool:
        image_results = pool.starmap(defs.read_img_data, image_args)
        pool.close()  # wait for worker tasks to complete
        pool.join()  # wait for worker process to exit
    images_img_data = {image: img_data for (image, img_data) in image_results}

    # sort experimental image data by time stamp
    images_img_data = dict(
        sorted(images_img_data.items(), key=lambda v: v[1]["metadata"]["date_time"])
    )

### Data Plots I - Distribution Chart

In [None]:
# prepare figure dimensions
dpi = plt.rcParams["figure.dpi"]
min_pixw, min_pixh = 1600, 1200
min_width, min_height = min_pixw / dpi, min_pixh / dpi
plt.rcParams["figure.figsize"] = [min_width, min_height]

# prepare data lists
data_means = []
data_norms = []

# get data for plots
fig, ax = plt.subplots()
for c, chan in enumerate(chans):
    # get statistics summary
    signal_means = defs.get_chan_data(images_img_data, chan, "sign_mean")
    data_means.append(signal_means)
    data_norms.append(
        defs.boxcox_transform(np.array(signal_means), lmbda=chan_lmbdas[chan])[0]
    )

# create violin plot
vp = ax.violinplot(data_means, showmeans=False, showmedians=False, showextrema=False)
for v in vp["bodies"]:
    v.set_facecolor("black")
    v.set_edgecolor("black")

# create boxplot
bp = ax.boxplot(data_norms, meanline=True, showmeans=True)
for b in bp["medians"]:
    b.set_color("black")
for b in bp["means"]:
    b.set_color("black")
    b.set_linestyle("dashed")
ax.set_xticks(
    [x for x in range(1, len(chans) + 1)],
    labels=chans,
    rotation=90,
    fontsize="small",
)

# add legend
legend = plt.legend(
    [vp["bodies"][0], bp["boxes"][0]],
    ["measured", "normalized"],
    loc="center left",
    bbox_to_anchor=(1, 0.5),
    fontsize="small",
)

# show plot
plt.show()

### Data Plots I - Extreme Values

In [None]:
# get whisker information
whiskers = bp["whiskers"]
extrema_bp = {}
for b, chan in zip(range(0, len(whiskers), 2), chans):
    (x1, x2), (y1, y2) = whiskers[b].get_data()
    (x3, x4), (y3, y4) = whiskers[b + 1].get_data()
    extrema_bp[chan] = {"max": y4, "min": y2}

# list all images with extreme values per channel
for c, chan in enumerate(chans):
    print(f"{chan}", flush=True)
    outliers_bp = []
    for n, data_norm in enumerate(data_norms[c]):
        if data_norm > extrema_bp[chan]["max"]:
            outliers_bp.append(("↑  ", os.path.basename(files[n]), data_norm))
        elif data_norm < extrema_bp[chan]["min"]:
            outliers_bp.append(("↓  ", os.path.basename(files[n]), data_norm))
    if outliers_bp:
        for indicator, filename, mean in outliers_bp:
            print(f"\t{indicator} {filename}")
    else:
        print(f"\t(none)")
    print()

### Data Plots II - Levey-Jennings Charts

In [None]:
# prepare figure dimensions
dpi = plt.rcParams["figure.dpi"]
min_pixw, min_pixh = 1600, 1200
min_width, min_height = min_pixw / dpi, min_pixh / dpi
plt.rcParams["figure.figsize"] = [min_width, min_height]

# Levey-Jennings chart
slice_margin = len(files) - 1  # extend slice to either sides
fit_trend = False  # fit a linear regression model of the mean
file_len = len(files)
slice_size = min(file_len, 2 * slice_margin + 1)
assert (
    slice_size > 3
), "Zero degrees of freedom to estimate the standard deviation from the trend line."
xs = range(0, file_len)
np_nan = np.full(file_len, np.nan)
signals_lj = {}
extrema_lj = {}
for c, chan in enumerate(chans):
    # prepare variables
    run_stats = {stat: np_nan.copy() for stat in ["slice", "means", "stdevs"]}
    trend_stats = {stat: np_nan.copy() for stat in ["slice", "vals", "stdevs"]}
    # get image statistics
    signal_means = defs.get_chan_data(images_img_data, chan, "sign_mean")
    signal_stdevs = defs.get_chan_data(images_img_data, chan, "sign_stdev")
    signal_stderrs = defs.get_chan_data(images_img_data, chan, "sign_stderr")
    signals_lj[chan] = signal_means
    # get trend statistics
    if fit_trend:
        slope, inter = np.polyfit(xs, signal_means, deg=1)
        trend_stats["vals"] = slope * xs + inter
    else:
        trend_stats["vals"].fill(defs.get_mean(signal_means))
    # get running statistics
    for i, mean in enumerate(signal_means):
        run_stats["slice"] = defs.get_run_slice(signal_means, i, slice_margin)
        if run_stats["slice"].size == slice_size:
            run_stats["means"][i] = defs.get_mean(run_stats["slice"])
            run_stats["stdevs"][i] = defs.get_mean(
                defs.get_run_slice(signal_stdevs, i, slice_margin)
            )
            trend_stats["slice"] = defs.get_run_slice(
                trend_stats["vals"], i, slice_margin
            )
            trend_stats["stdevs"][i] = defs.get_stdev(
                run_stats["slice"],
                defs.get_mean(trend_stats["slice"]),
                ddof=3,  # estimated: slope, intercept, and mean
            )
    # get extrema from trend line
    extrema_lj_keys = [("p2stdev", "m2stdev"), ("p1stdev", "m1stdev")]
    extrema_lj[chan] = {
        extrema_lj_keys[0][0]: trend_stats["vals"] + 2.0 * trend_stats["stdevs"],
        extrema_lj_keys[1][0]: trend_stats["vals"] + 1.0 * trend_stats["stdevs"],
        extrema_lj_keys[1][1]: trend_stats["vals"] - 1.0 * trend_stats["stdevs"],
        extrema_lj_keys[0][1]: trend_stats["vals"] - 2.0 * trend_stats["stdevs"],
    }
    # plot statistics
    if chan == chans[-1]:
        signal_labels = [os.path.basename(image) for image in images_img_data.keys()]
        plt.xticks(rotation=90, fontsize="small")
    else:
        signal_labels = range(0, len(images_img_data))
    for dist in [2.0, 1.0, -1.0, -2.0]:
        linestyle = (0, (1, 2))
        if abs(dist) == 2.0:
            linestyle = linestyle = (0, (1, 4))
        plt.plot(
            run_stats["means"] + dist * run_stats["stdevs"],
            color="black",
            linewidth=1,
            linestyle=linestyle,
        )
    for upper, lower in extrema_lj_keys:
        plt.fill_between(
            xs,
            extrema_lj[chan][upper],
            extrema_lj[chan][lower],
            color="black",
            alpha=0.2,
        )
    plt.plot(trend_stats["vals"], color="black", linewidth=1, linestyle="solid")
    plt.plot(run_stats["means"], color="black", linewidth=1, linestyle="dashed")
    plt.errorbar(
        signal_labels,
        signal_means,
        yerr=signal_stderrs,
        fmt="o-",
        linewidth=1,
        markersize=2,
        color=color_map[c],
        label=chan + " [SIG]",
    )
    legend = plt.legend(loc="center left", bbox_to_anchor=(1, 0.5), fontsize="small")
    plt.ylim(bottom=0.0)
    plt.show()

### Data Plots II - Extreme Values

In [None]:
# list all images with extreme values per channel
for c, chan in enumerate(chans):
    print(f"{chan}", flush=True)
    outliers_lj = []
    for s, signal_lj in enumerate(signals_lj[chan]):
        if signal_lj > extrema_lj[chan]["p2stdev"][s]:
            outliers_lj.append(("↑↑ ", s, os.path.basename(files[s]), signal_lj))
        elif signal_lj > extrema_lj[chan]["p1stdev"][s]:
            outliers_lj.append(("↑  ", s, os.path.basename(files[s]), signal_lj))
        elif signal_lj < extrema_lj[chan]["m2stdev"][s]:
            outliers_lj.append(("↓↓ ", s, os.path.basename(files[s]), signal_lj))
        elif signal_lj < extrema_lj[chan]["m1stdev"][s]:
            outliers_lj.append(("↓  ", s, os.path.basename(files[s]), signal_lj))
    if outliers_lj:
        for indicator, position, filename, mean in outliers_lj:
            print(f"\t{indicator} {position} = {filename}")
    else:
        print(f"\t(none)")
    print()