# Generate report  - POC

Automatic report generation from Hamilton measurements.  

Fast iteration in an agile way.  
Generic approach - different plates setup, prameters, ... all with the same code, no changes needed.  

**Python** programming language.  
**jupyter** notebook is currently used, with some functions divided into small modules.  
**Visual Studio Code** IDE (Integrated Development Environment).  
**Markdown** (*.md) format for generated report (Simple, humanly redable).  

**Input**:
 - Worklist file path (*.xls) as used for Hamilton input.
   - Sample name
   - Dilution
   - Viscosity
 - Measurement results file path (*.xls) as output from Hamilton.
 - Parameters; constants in code (file path *.json)
   - CV (Coefficient of variation) threshold
   - Referennce value (1.7954e+10 cp/ml)
   - Dilutions [1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0]
   - Decimal digits for output

**Output**:
  - Report (*.md, printable to pdf)
    - Could be manually edited
    - Image files
    - Result sheets
  - Estimated size <2kB (current)

**TODO**:
  - Invalid sample:
    - CV >THRESHOLD
    - Only one point
  - Running modes
    - Python script - automatic run (command line with parameters)
    - GUI; use modules to crete an App (code remains the same, but used from GUI)
  - Finalize the report
  - Parameters file (*.json)
  - Multiple plates (in worklist file)
  - Tests (unit, integration)
  - Modules
  - Extensive testing...
  - Automatic print to *.pdf ?


## Read data

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import r2_score
from os import path
import os
from decimal import Decimal
import math

In [None]:
def make_input_paths(base_path, sample_num):
    input_dir = os.path.dirname(os.path.abspath(base_path))
    base_name = os.path.basename(os.path.abspath(base_path))
    # print(base_name)
    # print(base_path, input_dir)
    worklist = base_path + 'worklist-ELISA.xls'
    if not os.path.isfile(worklist):
        raise Exception("Worklist file path is invlaid: {}".format(worklist))
    results = base_path + 'calc{}.xlsx'.format(sample_num)
    if not os.path.isfile(results):
        raise Exception("Rewsults file path is invlaid: {}".format(results))
    
    report = path.join(input_dir, 'results_{}'.format(sample_num))
    report = path.join(report, '{}report_{}.md'.format(base_name, sample_num))

    return {'worklist': worklist, 'results': results, 'report': report}

In [None]:
WORKLIST_BASE_PATH = './data/input/230426_GN004240-033_-_'
PLATE_ID = 1 # plate id

input_files = make_input_paths(WORKLIST_BASE_PATH, 1)
WORKLIST_FILE_PATH = input_files['worklist']
RESULT_FILE_PATH = input_files['results']
REPORT_FILE_PATH = input_files['report']
REPORT_DIR = os.path.dirname(os.path.abspath(REPORT_FILE_PATH))

data = pd.read_excel(RESULT_FILE_PATH, sheet_name=None)
data["Result_Overview"]

In [None]:
data["Data"]

In [None]:
df = data["Data"]
df_450 = df.iloc[2:10, 2:14].copy().reset_index(drop=True)
df_450.columns = range(df_450.columns.size)
df_450

In [None]:
df_630 = df.iloc[2:10, 15:28].copy().reset_index(drop=True)
df_630.columns = range(df_630.columns.size)
df_630

In [None]:
def get_data_crop(df, row_span, col_span):
  crop = df.iloc[row_span, col_span].copy()
  crop.reset_index(drop=True, inplace=True)
  crop.set_index([['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']], inplace=True)
  crop.columns = range(1, crop.columns.size+1)
  return crop

def read_data_xls(file_path):
  data = pd.read_excel(file_path, sheet_name=None)
  df_450 = get_data_crop(data["Data"], range(2, 10), range(2, 14))
  df_630 = get_data_crop(data["Data"], range(2, 10), range(15, 27))

  return df_450, df_630


df_450, df_630 = read_data_xls(RESULT_FILE_PATH)

In [None]:
df_450

In [None]:
df_630

In [None]:
df_delta = df_450 - df_630
df_delta

In [None]:
def to_multi_index(df_single_index, name):
  df_multi_idx = df_single_index.stack().to_frame()
  df_multi_idx.columns = [name]

  return df_multi_idx


In [None]:
def read_concat_data(data_file_path):
  df_450, df_630 = read_data_xls(data_file_path)
  df_delta = df_450 - df_630
  df_delta_all = to_multi_index(df_delta, "OD_delta")
  df_450_all = to_multi_index(df_450, "OD_450")
  df_630_all = to_multi_index(df_630, "OD_630")

  return pd.merge(df_delta_all, 
                  pd.merge(df_450_all, df_630_all, 
                  left_index=True, right_index=True),
                  left_index=True, right_index=True)

In [None]:
m = read_concat_data(RESULT_FILE_PATH)
m

### Layouts

First read layouts as arrays and re-arage to dataframe, using plate layout indexing.

In [None]:
def to_matrix(l, n):
  return [l[i:i+n] for i in range(0, len(l), n)]

def index_plate_layout(plate_layout):
  plate_layout.set_index([['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']], inplace=True)
  plate_layout.columns = range(1, plate_layout.columns.size + 1)

  return plate_layout

def to_plate_layout(lst):
  l_2d = to_matrix(lst, 8)
  plate_layout = pd.DataFrame(l_2d).T
  
  return index_plate_layout(plate_layout)

In [None]:
import data.layouts as layouts
to_plate_layout(layouts.l_plate_layout)

Save layouts to CSV, so that ib the future a layout could be defined in i.e. Excel.

In [None]:
def save_plate_layout_csv(layout_list, out_file):
  l = to_plate_layout(layout_list)
  l.to_csv(out_file, index=False)

SAVE_LAYOUTS_CSV = False
if SAVE_LAYOUTS_CSV:
  save_plate_layout_csv(layouts.l_plate_layout, './data/plate_layout.csv')
  save_plate_layout_csv(layouts.l_plate_layout_ident, './data/plate_layout_ident.csv')
  save_plate_layout_csv(layouts.l_plate_layout_num, './data/plate_layout_num.csv')
  save_plate_layout_csv(layouts.l_plate_layout_dil_id, './data/plate_layout_dil_id.csv')


Read CSV layout for check

In [None]:
def read_plate_layout(file_path):
  plate_layout = pd.read_csv(file_path)
  index_plate_layout(plate_layout)

  return plate_layout

plate_layout = read_plate_layout('./data/plate_layout.csv')
plate_layout_id = read_plate_layout('./data/plate_layout_ident.csv')
plate_layout_num = read_plate_layout('./data/plate_layout_num.csv')
plate_layout_dil_id = read_plate_layout('./data/plate_layout_dil_id.csv')

In [None]:
display(plate_layout_id)
display(plate_layout_num)
display(plate_layout_dil_id)

### Combine read data from XLSX with layouts

In [None]:
def concat_data_and_layout(df_data, df_layout):
  return pd.merge(df_data, df_layout,
                  left_index=True, right_index=True)

# df_all = concat_data_and_layout(m, to_multi_index(plate_layout, 'plate_layout'))
df_all = concat_data_and_layout(m, to_multi_index(plate_layout_id, 'plate_layout_ident'))
df_all = concat_data_and_layout(df_all, to_multi_index(plate_layout_num, 'plate_layout_num'))
df_all = concat_data_and_layout(df_all, to_multi_index(plate_layout_dil_id, 'plate_layout_dil_id'))
display(df_all)

Filter data

In [None]:
display(df_all.loc[(df_all['plate_layout_ident']=='r')])

In [None]:
def get_sample(dfa, type, sample_num):
    # TODO: check for valid `type` `and sample_num`
    dfa = dfa.loc[(dfa['plate_layout_ident']==type) & (dfa['plate_layout_num']==sample_num)]
    return dfa

# type ['k', 'b', 's']
# sample_num [1:21, 'b']
display(get_sample(df_all, 's', 1))

### Dilution to Concentration

Define dilution dataframe. The dataframe is indexed according plate layout, index of refference dataframe corresponds to refference of the `plate_layout_dil`.

In [None]:
# TODO: read reference value from parameters
REF_VAL_MAX = 1.7954e+10
DILUTIONS = [1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0]

def make_concentration(ref_val_max, dilution):
    conc  = pd.DataFrame({'dilution': dilution})
    conc.loc[:, ['concentration']] = conc.apply(lambda x: ref_val_max / x['dilution'], axis=1)
    conc.index = range(1, len(dilution) + 1)
    return conc

reference_conc = make_concentration(REF_VAL_MAX, DILUTIONS)
display(reference_conc)

Check the `reference_set_conc` indexing

In [None]:
print(reference_conc.loc[1])
print(reference_conc.loc[7])

## Fit

In [None]:
import fitdata
import matplotlib.pyplot as plt
import warnings
from scipy.optimize import OptimizeWarning

warnings.simplefilter('ignore', RuntimeWarning)
warnings.simplefilter('ignore', OptimizeWarning)

### Get the fitting data from dataframe

In [None]:
ref = df_all.loc[(df_all['plate_layout_ident']=='r')].copy()
ref['plate_layout_dil'] = ref['plate_layout_dil_id'].map(reference_conc['concentration'])
display(ref)

### Fit with confidence interval

In [None]:
CONFIDENCE_INTERVAL = 95.0 # 95% confidence interval = 100*(1-alpha)

In [None]:
from scipy.stats import distributions

def fit_image(x, y, popt, pcov, file_path, confidence_interval=CONFIDENCE_INTERVAL,
    confidence=None, interval_ratio=2.0,
    rm_index=[],
    sx=None, sy=None, mask_index=[], sna_idx=[],
    verbose=False, valid_sample=True, show=True):
    r"""Plot the fitted function with confidence intervals.

    Confidence intervals coud be set using `confidence` parameter.
    'student-t' method is a correct one producing wider confidence intervals.

    Parameters
    ----------
    x : array_like
        x-axis reference values.
    y : array_like
        y-axis reference values.
    popt : iterable
        Fit parameters.
    pcov : 
        Covariance matrix from fitting algorithm.
    file_path : string
        Path where the graph image is saved or `None`
    confidence_interval : float 
        Confidence interval in %
    interval_ratio: float
        Ration of min and max extention of x axis for fitted curve plot.
    rm_index: list
        Index of a removed point.
    sx : array_like
        Sample x-values.
    sy : array_like
        Sample x-values.
    mask_index : array_like
        Indices of masked samples.
    verbose : bool
        Print verbose output.
    show : bool
        Show the graph. If `False` image is saved if file path is given. 
    """

    # confidence [None, 'student-t', 'sqrt_err'] 
    if verbose:
        print('parameter', popt)
    perr = np.sqrt(np.diag(pcov))
    sigma_err = 1.0
    chisq = np.sum((perr / sigma_err) ** 2)
    if verbose:
        print('chisq={0:.4}; error={1}'.format(np.sqrt(chisq), perr))
        # print('function calls', infodict['nfev'])

    kwargs = { 'marker': 'x'}
    if not valid_sample:
        kwargs = { 'marker': 'o', 'facecolors':'none'}
    if (sx is not None) and (sy is not None):
        if len(sx.drop(mask_index, axis=0)) != 0:
            plt.scatter(sx.drop(mask_index, axis=0), sy.drop(mask_index, axis=0),
                        s=48, linewidths=0.6, label='sample valid', color='forestgreen', **kwargs)
        if (len(sx.iloc[mask_index]) != 0) and (list(mask_index) != list(sna_idx)):
            plt.scatter(sx.iloc[mask_index], sy.iloc[mask_index],
                        s=48, linewidths=0.8, label='sample masked', color='r', **kwargs)

    if len(x.drop(rm_index, axis=0)) != 0:
        plt.scatter(x.drop(rm_index, axis=0), y.drop(rm_index, axis=0), marker='+',
                color='royalblue', s=48, linewidths=0.8, label='reference')
    if len(x.iloc[rm_index]) != 0:
        plt.scatter(x.iloc[rm_index], y.iloc[rm_index], marker='.',
                color='r', s=48, linewidths=0.8, label='reference masked')
    plt.xscale('log')
    
    alpha = (100.0 - confidence_interval) / 100.0 
    n = len(y)    # number of data points
    p = len(popt) # number of parameters
    dof = max(0, n - p) # number of degrees of freedom

    # student-t value for the dof and confidence level
    tval = distributions.t.ppf(1.0 - alpha / 2., dof) 
    sigma_popt = np.empty(len(popt), dtype=np.float64)
    param_names = ['a', 'b', 'c', 'd']
    for i, p, var, pname in zip(range(n), popt, np.diag(pcov), param_names):
        sigma = var ** 0.5
        st = sigma * tval
        sigma_popt[i] = st
        if verbose:
            print('{0}: {1:.3} [{2:.3}, {3:.3}]; err={4:.3}[{5:.2f}%]'.format(pname, p, p - st, p + st, st, 100*st/p))

    if confidence=='None' or confidence=='student-t':
        if verbose: print('student-t is used for error estimation using {} degrees of freedom'.format(dof))
        popt_high = popt + sigma_popt
        popt_low = popt - sigma_popt
    else:
        if verbose: print('sqrt of covariance matrix diagonal is used for error estimation')
        popt_high = popt + perr
        popt_low = popt - perr

    num_pts = 100
    x_min = x.min()
    x_max = x.max()
    t = np.arange(x_min, x_max, (x_max - x_min) / num_pts)
    plt.plot(t, fitdata.func(t, *popt), color='slategray', linewidth=0.2)

    sx_n = sx[~sx.isna()] if sx is not None else None
    x_min_ext = x.min() / interval_ratio
    if sx is not None: x_min_ext = min(x_min_ext, sx_n.min())
    if x_min != x_min_ext:
        t = np.arange(x_min_ext, x_min, (x_min - x_min_ext) / (num_pts / 10.0))
        plt.plot(t, fitdata.func(t, *popt), color='red', linestyle=(0, (5, 10)), linewidth=0.2, label='ext')

    x_max_ext = x.max() * interval_ratio
    if sx is not None: x_max_ext = max(x_max_ext, sx_n.max())
    if x_max != x_max_ext:
        t = np.arange(x_max, x_max_ext, (x_max_ext - x_max) / (num_pts / 10.0))
        # no label, only one extension labe
        plt.plot(t, fitdata.func(t, *popt), color='red', linestyle=(0, (5, 10)), linewidth=0.2)

    # show NaN concentration values somewhere -> show OD
    sx_na = sx[sx.isna()] if sx is not None else None
    if sx is not None:
        if verbose: print("NaN indices:", sx_na.index)
        sna_idx = sx[~sx.isna()].index
        if len(sx_na) != 0:
            x_na = np.full(shape=len(sx_na), fill_value=x_min_ext * 0.5)
            y_na = sy.drop(sna_idx, axis=0)
            plt.scatter(x_na, y_na, marker='_', color='red', s=48, linewidths=0.8, label='backfit failed')

    plt.xlabel('concentration [cp/ml]')
    plt.ylabel('Optical density')

    bound_upper = fitdata.func(t, *popt_high)
    bound_lower = fitdata.func(t, *popt_low)
    # plotting the confidence intervals
    plt.fill_between(t, bound_lower, bound_upper,
                    color = 'black', alpha = 0.15)
    
    plt.legend()

    if file_path is not None:
         plt.savefig(file_path)
    if show:
        plt.show()
    else:
        plt.clf()

In [None]:
def fit_sheet(popt, pcov, n, confidence_interval=95.0):
    # `confidence_interval` 95% confidence interval = 100*(1-alpha)
    alpha = (100.0 - confidence_interval) / 100.0 
    p = len(popt) # number of parameters
    dof = max(0, n - p) # number of degrees of freedom

    # student-t value for the dof and confidence level
    tval = distributions.t.ppf(1.0 - alpha / 2.0, dof) 

    sigma_popt = np.empty(len(popt), dtype=np.float64)
    confidence_interval = [None] * 4
    for i, p, var in zip(range(n), popt, np.diag(pcov)):
        sigma = var ** 0.5
        st = sigma * tval
        sigma_popt[i] = st
        confidence_interval[i] = '[{0:.3}, {1:.3}]'.format(p - st, p + st)

    param_names = ['a', 'b', 'c', 'd']
    perr = np.sqrt(np.diag(pcov))

    return pd.DataFrame({'Parameter name': param_names, 'Estimated value':popt,
        'Error': perr, 'Confidence interval': confidence_interval})

Backfit

In [None]:
def backfit(df, param):
    bf = df[['OD_delta', 'plate_layout_dil']].copy()
    bf = bf.reindex(['plate_layout_dil', 'OD_delta'], axis=1)
    bf.rename(columns={'plate_layout_dil': 'Standard Value [cp/ml]', 'OD_delta': 'Optical density'}, inplace=True)
    bf.loc[:, ['Concentration backfit [cp/ml]']] = bf.apply(lambda x: fitdata.inv_func(x['Optical density'], *param), axis=1)
    bf.loc[:, ['SV to OD fit']] = bf.apply(lambda x: fitdata.func(x['Standard Value [cp/ml]'], *param), axis=1)
    bf.index.name = 'Well'
    bf = bf.reindex(['Standard Value [cp/ml]', 'Concentration backfit [cp/ml]', 'Optical density', 'SV to OD fit'], axis=1)
    bf.loc[:, ['Recovery rate [%]']] = bf.apply(lambda x: (x['Concentration backfit [cp/ml]'] / x['Standard Value [cp/ml]']) * 100.0, axis=1)
    
    return bf

In [None]:
def fit_reference_auto_rm(x, y, err_threshold=0.998, verbose=False):
    """Fits the reference and removes a point to fing min error

    Parameters
    ----------
    x : array_like
        x-axis values.
    y : array_like
        y-axis value.
    err_threshold : Eroor threshold to to skip point removal

    Returns
    -------
    array
        parameters of the filt
    int
        index of the removed point, -1 if no point is removed
    float64
        error
    """
    fc = fitdata.fit_reference(fitdata.func, x, y)
    bfn = lambda l: fitdata.inv_func(l, *fc[0])
    x_hat = bfn(y)
    r2_max = r2_score(x, x_hat)
    idx = []
    fit_stats = pd.DataFrame(columns=['idx', 'metric', 'note'])
    if verbose:
        print('R-squared is invalid for nonlinear models!')
        print('metric, index')
        print('{0:.5f}, {1}'.format(r2_max, idx))
    
    if r2_max > err_threshold:
        fit_stats.loc[len(fit_stats)] = [-1, r2_max, '']
        return fc, idx, r2_max
    fit_stats.loc[len(fit_stats)] = [-1, r2_max, 'metric < threshold ({:.4f} < {:.4f})'.format(r2_max, err_threshold)]

    for i in range(len(x)):
        xd = x.drop([i], axis=0)
        yd = y.drop([i], axis=0)
        try:
            fc_i = fitdata.fit_reference(fitdata.func, xd.to_numpy(), yd.to_numpy())
        except Exception as e:
            estr = '{0}, {1} Reason: {2}'.format(np.nan, [i], str(e))
            if verbose: print(e)
            fit_stats.loc[len(fit_stats)] = [i, np.nan, str(e)]
            continue

        bfn = lambda l: fitdata.inv_func(l, *fc_i[0])
        x_hat = bfn(yd)
        r_squared = r2_score(xd, x_hat)
        if np.isinf(fc_i[1]).any():
            fit_stats.loc[len(fit_stats)] = [i, r_squared, 'Invalid covariance matrix.']
            continue

        fit_stats.loc[len(fit_stats)] = [i, r_squared, '']
        if verbose: print('{0:.4f}, {1}'.format(r_squared, i))
        if (r_squared > r2_max) and not np.isinf(fc_i[1]).any():
            r2_max = r_squared
            fc = fc_i
            idx = [i]

    fit_stats.set_index('idx', inplace=True)
    fit_stats.loc[idx, 'note'] = 'Maximum.'
    
    return fc, idx, r2_max, fit_stats

In [None]:
x = ref.reset_index(level=[0,1])['plate_layout_dil']
y = ref.reset_index(level=[0,1])['OD_delta']
fit_result = fit_reference_auto_rm(x, y, verbose=False)
popt = fit_result[0][0]
pcov = fit_result[0][1]

fit_image(x, y, fit_result[0][0], fit_result[0][1], None, confidence='student-t', rm_index=fit_result[1])
display(fit_result[3])
display(fit_result[1])

In [None]:
bf = backfit(ref, popt)
display(bf)

od_min = bf['Optical density'].min()
od_max = bf['Optical density'].max()
print('Optical density range <{0:.4f}, {1:.4f}>'.format(od_min, od_max))

od_fit_min = bf['SV to OD fit'].min()
od_fit_max = bf['SV to OD fit'].max()
print('SV to OD fit range <{0:.4f}, {1:.4f}>'.format(od_fit_min, od_fit_max))

sv_min = bf['Standard Value [cp/ml]'].min()
sv_max = bf['Standard Value [cp/ml]'].max()
print('Standard Value [cp/ml] range <{0}, {1}>'.format(sv_min, sv_max))

cb_min = bf['Concentration backfit [cp/ml]'].min()
cb_max = bf['Concentration backfit [cp/ml]'].max()
print('Concentration backfit [cp/ml] range <{0}, {1}>'.format(cb_min, cb_min))

## Sample evaluation

In [None]:
display(get_sample(df_all, 's', 1))

Fit the data, and apply the inverse function as a check...

In [None]:
samplesk = df_all.loc[(df_all['plate_layout_ident']=='s') | (df_all['plate_layout_ident']=='k') | (df_all['plate_layout_ident']=='r')]
samplesk.loc[:, ['plate_layout_dil']] = samplesk['plate_layout_dil_id'].map(reference_conc['dilution'])
display(samplesk)

def unique_sample_numbers(df):
  sample_nums = df['plate_layout_num'].astype(int).unique()
  sample_nums.sort()
  return sample_nums

sample_nums = unique_sample_numbers(samplesk)
display(sample_nums)

For Coefficient of variation compution we need to use ddof (degrees of freedom) parameter set to `ddof=1`.

In [None]:
from scipy.stats import variation
import numpy

def conc_func(x, dil, *popt):
    return fitdata.inv_func(x, *popt) * dil


# process sample
sample_num_t = 11
sample_type_t = 's'
a_sample = get_sample(samplesk, sample_type_t, sample_num_t)
a_sample.loc[:, ['concentration']] = a_sample.apply(lambda x: conc_func(x['OD_delta'], x['plate_layout_dil'], *popt), axis=1)
display(a_sample)

smp_t = a_sample[~a_sample.concentration.isna()]
display(smp_t)
test_cv = variation(smp_t['concentration'], ddof=1)
test_mean = numpy.mean(smp_t['concentration'])
print("CV %f" % test_cv)
print("mean %f" % test_mean)

### Compute concentration for all `s` and `k` samples

In [None]:
samplesk.loc[:, ['concentration']] = samplesk.apply(lambda x: conc_func(x['OD_delta'], x['plate_layout_dil'], *popt), axis=1)
samplesk.loc[:, ['backfit']] = samplesk.apply(lambda x: fitdata.inv_func(x['OD_delta'], *popt), axis=1)
samplesk

### Sample masking

In [None]:
def mask_reason_fn(val, odmin, odmax, note):
    if val < odmin:
        return '{2} {0:.3e} < {1:.3e}'.format(Decimal(val), Decimal(odmin), note)
    if val > odmax:
        return '{2} {0:.3e} > {1:.3e}'.format(Decimal(val), Decimal(odmin), note)
    if math.isnan(val):
        return 'NaN'
    return None

def mask_reason_short_fn(val, vmin, vmax, dil, note):
    if val < vmin:
        return '<{:.3e}'.format(Decimal(vmin * dil))
    if val > vmax:
        return '>{:.3e}'.format(Decimal(vmax * dil))
    if math.isnan(val):
        return 'Backfit failed.'
    return None

print('Optical density range = <{0:.4f}, {1:.4f}>'.format(od_min, od_max))
print('Backfit range = <{0:.4e}, {1:.4e}>'.format(Decimal(sv_min), sv_max))
od_note = 'Measured OD'
samplesk.loc[:, ['od_mask_reason']] = samplesk.apply(lambda x: mask_reason_fn(x['OD_delta'], od_min, od_max, od_note), axis=1)
sv_note = 'Backfit'
samplesk.loc[:, ['mask_reason']] = samplesk.apply(lambda x: mask_reason_short_fn(x['backfit'], cb_min, cb_max, x['plate_layout_dil'], ''), axis=1)
samplesk

In [None]:
SAMPLE_TYPES = {'s':'sample', 'k':'controll', 'r':'refference', 'b':'blank'}

def process_sample(samples, stype, sample_num):
    sample = get_sample(samples, stype, sample_num)
    smp_t = sample[sample.mask_reason.isna()]
    cv = np.nan
    mean = np.nan
    if len(smp_t['concentration']) > 1:
        cv = variation(smp_t['concentration'], ddof=1)
        mean = numpy.mean(smp_t['concentration'])
    elif len(smp_t['concentration']) == 1:
        mean = numpy.mean(smp_t['concentration'])

    return sample, cv, mean


CV_THRESHOLD = 0.2 # 20%
MIN_VALID_SAMPLE_POINTS = 2

def sample_check(samples, stype, sample_num, cv_thresh=CV_THRESHOLD,
                 min_valid_pts=MIN_VALID_SAMPLE_POINTS):
    s = process_sample(samples, stype, sample_num)
    valid = True
    note = ''
    if s[1] > cv_thresh:
        note = 'CV > {}; '.format(cv_thresh)
        valid = False
    smp = s[0]
    valid_pts = smp['mask_reason'].isna().sum()
    if valid_pts < min_valid_pts:
        note += 'Not enough valid sample points. Required {}, available {};'.format(min_valid_pts, valid_pts)
        valid = False
    elif valid_pts != len(smp['mask_reason']):
        note += 'Reduced number of sample points. Measured {}, valid {};'.format(len(smp['mask_reason']), valid_pts)
        valid &= True

    note_cols = smp[~smp['mask_reason'].isna()]
    if len(note_cols)!= 0:
        if (note_cols['mask_reason'] == note_cols['mask_reason'][0]).all():
            note += note_cols['mask_reason'][0] + ';' + note_cols['od_mask_reason'][0]
        # else:
        #     note += note_cols['mask_reason'].str.cat(sep=', ')

    return {'sample':smp, 'cv':s[1], 'mean':s[2], 'note':note, 'type':stype, 'num':sample_num, 'valid':valid, 'valid_pts': valid_pts}


def print_sample(number, stype, sample, cv, mean):
    display(sample[['OD_delta', 'plate_layout_dil', 'concentration', 'backfit', 'mask_reason']])
    print("{1} '{2}' {0}".format(number, SAMPLE_TYPES[stype], stype))
    print("CV = {:2.3} [%]".format(100 * cv))
    print("mean = {:.4} [cp/ml]".format(mean))


def print_sample_dc(sample_dict):
    display(sample_dict['sample'][['OD_delta', 'plate_layout_dil', 'concentration', 'backfit', 'mask_reason']])
    print("{1} '{2}' {0}".format(sample_dict['num'], SAMPLE_TYPES[sample_dict['type']], sample_dict['type']))
    print("CV = {:2.3} [%]".format(100 * sample_dict['cv']))
    print("mean = {:.4} [cp/ml]".format(sample_dict['mean']))
    print("valid = {}".format(sample_dict['valid']))
    print("note: {}".format(sample_dict['note']))

sc = sample_check(samplesk, 'k', 1)
sample_results = pd.DataFrame(columns=['id', 'cv', 'cp_mean', 'Note', 'Valid'])
for i in [14]:#sample_nums: [5, 6, 9]
    stype = 's'
    # s = process_sample(samplesk, 's', i)
    # print_sample(i, 's', *s)
    sc = sample_check(samplesk, 's', i)
    print_sample_dc(sc)
    sample_results.loc[len(sample_results)] = ['sample {:02d}'.format(i),
                                               sc['cv'], sc['mean'], sc['note'], sc['valid']]
    if i == 3: break;

display(sample_results)

In [None]:
from enum import Enum


class SampleInfo(str, Enum):
    NAN_LOW = 'NaN below reference'
    NAN_HIGH = 'NaN above reference'
    LOW = 'value below reference'
    HIGH = 'value above reference'
    CV = 'CV above threshold'
    VALID_PTS = 'few valid points'


def sampleinfo_to_str(info, multiplier=1.0):
    if info is None:
        return None

    if not info:
        return None;
    
    if info['enum'] == SampleInfo.CV:
        return 'CV>{:.1f}%({:.1f}%)'.format(CV_THRESHOLD * 100, float(info['value']) * 100.0)

    if info['enum'] == SampleInfo.VALID_PTS:
        return '{} valid point'.format(info['value'])

    return '{}{:.4e}'.format(info['sign'], float(info['value']) * multiplier)


def sample_info(samples, stype, sample_num, verbose=False):
    s = get_sample(samples, stype, sample_num)
    sc = sample_check(samples, stype, sample_num)
    if verbose:
        display(s)
        # display(k)
        print('OD=[{}, {}]'.format(od_min, od_max))
        print('OD_fit=[{:.3}, {:.3}]'.format(Decimal(od_fit_min), Decimal(od_fit_max)))
        print('SV=[{:.3e}, {:.3e}]'.format(Decimal(sv_min), Decimal(sv_max)))
        print('CB=[{}, {}]'.format(cb_min, cb_max))
    above_ref_od_max = s['OD_delta'] > od_fit_max
    below_ref_od_min = s['OD_delta'] < od_fit_min
    msgdc = {}
    if s['backfit'].isna().all():
        if above_ref_od_max.all():
            msgdc = {'sign': '>', 'value': Decimal(sv_max), 'enum': SampleInfo.NAN_HIGH}
        if below_ref_od_min.all():
            msgdc = {'sign': '<', 'value': Decimal(sv_min), 'enum': SampleInfo.NAN_LOW}
    elif sc['cv'] > CV_THRESHOLD:
        msgdc = {'sign': '>{:.2f}'.format(CV_THRESHOLD), 'value': sc['cv'], 'enum': SampleInfo.CV}
    elif not s['mask_reason'].isna().all():
        t = s[['OD_delta', 'plate_layout_dil', 'concentration', 'backfit']]
        t_not_na = t[~t['backfit'].isna()]
        
        if t_not_na['OD_delta'].max() < od_fit_min:
            t_below_ref = t_not_na[below_ref_od_min]
            # msgdc = {'sign': '<', 'value': t_below_ref['concentration'].max(), 'enum': SampleInfo.LOW}
            msgdc = {'sign': '<', 'value': Decimal(sv_min * sc['sample']['plate_layout_dil'].min()), 'enum': SampleInfo.LOW}
        elif t_not_na['OD_delta'].min() > od_fit_max:
            t_above_ref = t_not_na[above_ref_od_max]
            # display(sc['sample'])
            print('*** {} *  {} = {}'.format(sv_max, sc['sample']['plate_layout_dil'].max(), sv_max * sc['sample']['plate_layout_dil'].max()))
            msgdc = {'sign': '>', 'value': Decimal(sv_max * sc['sample']['plate_layout_dil'].max()), 'enum': SampleInfo.HIGH}
    
    if sc['valid_pts'] < MIN_VALID_SAMPLE_POINTS and sc['valid_pts'] != 0:
        msgdc = {'sign': '', 'value': sc['valid_pts'], 'enum': SampleInfo.VALID_PTS}

    del sc['sample']
    del sc['note']
    sc['info'] = msgdc
    return sc

In [None]:
si = sample_info(samplesk, 's', 6)
si

In [None]:
sample_results = pd.DataFrame(columns=['id', 'CV [%]', 'Reader Data [cp/ml]', 'Note', 'Valid', 'info'])
knum = 1
s = sample_check(samplesk, 'k', knum)
si = sample_info(samplesk, 'k', knum)
display(si)
sample_results.loc[len(sample_results)] = ['control {:02d}'.format(knum), s['cv'], s['mean'], s['note'], s['valid'], si]

rnum = 1
s = sample_check(samplesk, 'r', rnum)
si = sample_info(samplesk, 'r', knum)
sample_results.loc[len(sample_results)] = ['reference {:02d}'.format(knum), s['cv'], s['mean'], s['note'], s['valid'], si]

for i in sample_nums:
    stype = 's'
    s = sample_check(samplesk, 's', i)
    si = sample_info(samplesk, 's', i)
    sample_results.loc[len(sample_results)] = ['sample {:02d}'.format(i), s['cv'], s['mean'], s['note'], s['valid'], si]

sample_results.set_index(sample_results['id'], inplace=True)
sample_results = sample_results.drop('id', axis=1)
sl = sample_results
display(sl)

In [None]:
si = sample_info(samplesk, 's', 6, True)
display(si)
sampleinfo_to_str(si['info'])

### Plot sample with referene curve

In [None]:
def mask_index(df):
    b = df.reset_index(level=[0,1])
    b = b[b['mask_reason'].notna()]

    return b.index


def na_index(df):
    b = df.reset_index(level=[0,1])
    b = b[b['backfit'].isna()]
    
    return b.index


def sample_img(samples, sample_type, sample_num, img_file=None, show=True, verbose=False):
    sd = sample_check(samplesk, sample_type, sample_num)
    if verbose:
        print(sample_type, sample_num)
        display(sd['sample'])

    mask_idx = mask_index(sd['sample'])
    x = ref.reset_index(level=[0,1])['plate_layout_dil']
    y = ref.reset_index(level=[0,1])['OD_delta']
    fit_result = fit_reference_auto_rm(x, y, verbose=verbose)
    # compute original concenmtration 
    sd['sample'].loc[:, ['conc_plot']] = sd['sample'].apply(lambda x: x['concentration'] / x['plate_layout_dil'], axis=1)
    sx = sd['sample'].reset_index(level=[0,1])['conc_plot']
    sy = sd['sample'].reset_index(level=[0,1])['OD_delta']
    fit_image(x, y, fit_result[0][0], fit_result[0][1], img_file, confidence='student-t',
              rm_index=fit_result[1], mask_index=mask_idx,
              sx=sx, sy=sy, sna_idx=na_index(sd['sample']), show=show, valid_sample=sd['valid'], interval_ratio=1.0)
    # display(na_index(sd['sample']))
    
sample_img(samplesk, 's', 6)

## Worklist

In [None]:
def worklist_sample(worklist_file, sample_id):
    wl = pd.read_excel(worklist_file)
    wl.set_index([['control 01', 'reference 01', 'blank', 'sample 01', 'sample 02', 'sample 03',
        'sample 04', 'sample 05', 'sample 06', 'sample 07', 'sample 08', 'sample 09', 'sample 10',
        'sample 11', 'sample 12', 'sample 13', 'sample 14', 'sample 15', 'sample 16', 'sample 17',
        'sample 18', 'sample 19', 'sample 20', 'sample 21']], inplace=True)
    wl.drop('blank', axis=0, inplace=True)
    wl.index.name = 'Sample type'
    cols_id =['SampleID', 'Dilution', 'Viscosity']
    cols = [x + '_' + str(sample_id) for x in cols_id]
    cols_dict = {x : y for x,y in zip(cols_id, cols)}

    invalid_sample = wl['SampleID_{}'.format(sample_id)].isnull().values.any()
    if invalid_sample:
        return None, None
    
    return wl[cols], cols_dict

wl, wl_cols_dict = worklist_sample(WORKLIST_FILE_PATH, PLATE_ID)

In [None]:
def final_sample_info(all_info, pre_dilution, verbose=False):
    # print('INFO', all_info)
    info = all_info['info']
    if not info:
        return ''
    
    msg = ''
    if info['enum'] == SampleInfo.NAN_HIGH:
        msg = '>{:.4e}'.format(info['value'] * pre_dilution)
        # print(SampleInfo.NAN_HIGH)
    elif info['enum'] == SampleInfo.NAN_LOW:
        msg = '<{:.4e}'.format(info['value'] * pre_dilution)
        # print(SampleInfo.NAN_LOW)
    elif info['enum'] == SampleInfo.HIGH:
        # msg = '>{:.4e}'.format(info['value'] * pre_dilution)
        msg = '>{:.4e}'.format(info['value'] * pre_dilution)
        # print(SampleInfo.HIGH, pre_dilution, info['value'])
    elif info['enum'] == SampleInfo.LOW:
        msg = '<{:.4e}'.format(info['value'] * pre_dilution)
        # print(SampleInfo.LOW)
    elif info['enum'] == SampleInfo.VALID_PTS:
        msg = '{} valid point'.format(all_info['valid_pts'])
        # print(SampleInfo.VALID_PTS)
    elif info['enum'] == SampleInfo.CV:
        msg = 'CV>{:.2f}%({:.2f}%)'.format(CV_THRESHOLD * 100.0, info['value'] * 100.0)
    else:
        msg = ''

    # print(msg)
    return msg

In [None]:
# TODO: nasty, using globals!!!
def make_final():
    final = pd.concat([wl, sl], axis=1)
    cd = wl_cols_dict
    final.loc[:, ['Result [cp/ml]']] = final.apply(lambda x: x['Reader Data [cp/ml]'] * x[cd['Dilution']], axis=1)
    final.loc[:, ['CV [%]']] = final.apply(lambda x: x['CV [%]'] * 100, axis=1)
    # reorder columns
    final = final.reindex([cd['SampleID'], cd['Dilution'], cd['Viscosity'], 'Reader Data [cp/ml]', 'Result [cp/ml]', 'CV [%]', 'Valid', 'info'], axis=1)
    final.rename(columns={cd['SampleID']: 'Sample Name', cd['Dilution']: 'Pre-dilution'}, inplace=True)
    final.drop('Viscosity_1', axis=1, inplace=True)
    final.index.name = 'Sample type'
    return final

final = make_final()
final

In [None]:
final = make_final()
final.loc[:, ['info_ex']] = final.apply(lambda x: final_sample_info(x['info'], x['Pre-dilution']), axis=1)
final

## Plate Layout

In [None]:
df = plate_layout_num.replace({'b':-99}).astype(float)
df

In [None]:
from matplotlib import pyplot as plt

vals = np.around(df.values, 2)
norm = plt.Normalize(vals.min()-1, vals.max()+1)
colours = plt.cm.hot(norm(vals))

fig = plt.figure(figsize=(8,6))
ax = fig.add_subplot(111, frameon=False, xticks=[], yticks=[])

the_table=plt.table(cellText=vals, rowLabels=df.index, colLabels=df.columns,
                    loc='center', cellColours=colours)
plt.show()

## Report  
We build a report here...

### Fit Reference Curve

In [None]:
def fit_section_md(df_ref, popt, pcov, out_dir):
    x = df_ref.reset_index(level=[0,1])['plate_layout_dil']
    y = df_ref.reset_index(level=[0,1])['OD_delta']
    fit_result = fit_reference_auto_rm(x, y)
    result_img = path.join(out_dir, 'fit.png')
    fit_image(x, y, fit_result[0][0], fit_result[0][1], result_img, confidence='student-t', rm_index=fit_result[1])
 
    n = len(x) - len(fit_result[1])
    df_fit = fit_sheet(popt, pcov, n)
    display(df_fit)

    md = '## Reference Curve Fit\n\n'
    md += '$\LARGE y = {d + {a - d \over {1 + ({ x \over c })^b}} }$  \n\n'
    md += '!["alt text"](./img/fit.png)'

    md += '\n\n'
    md += 'Verbose fitting progress, metric is R-squared:\n\n'
    md += fit_result[3].to_markdown() + '\n\n'

    md += 'Fit parameters\n\n'
    md += df_fit.to_markdown(index=False) + '\n\n'
    md += 'Backfit...'
    fit_result = fit_reference_auto_rm(x, y)
    df_backfit = backfit(df_ref, fit_result[0][0])
    md += '\n\n' + df_backfit.to_markdown() + '\n\n'

    # cv = variation(df_backfit['concentration'], ddof=1)

    return md

# fit_section_md(ref, popt, pcov, REPORT_DIR)

### Sample

In [None]:
def sample_to_md(dc):
    s_view = dc['sample'][['OD_delta', 'plate_layout_dil', 'concentration', 'mask_reason']]
    md = "### **Sample: {0} '{1}' {2}**\n\n".format(SAMPLE_TYPES[dc['type']], dc['type'], dc['num'])
    md += s_view.to_markdown()
    md += '\n\n'
    md += "CV = {:2.3} [%]  \n".format(100 * dc['cv'])
    md += "mean = {:.4} [cp/ml]  \n".format(dc['mean'])
    md += "valid = {}  \n".format(dc['valid'])
    if dc['note']:
         md += "note: {}  ".format(dc['note'])

    return md

def sample_section_md(samples, img_dir):
    md = '## Sample evaluation\n\n' 
    k = sample_check(samples, 'k', 1)
    md += sample_to_md(k)
    sfile = 'control_{0:02d}.png'.format(1)
    img_file = path.join(img_dir, sfile)
    sample_img(samplesk, 'k', 1, img_file, show=False)
    md += '!["alt text"]({})\n\n'.format(sfile)
    sample_n = samples['plate_layout_num'].astype(int).unique()
    sample_n.sort()
    for i in sample_n:
        stype = 's'
        s = sample_check(samples, stype, i)
        md += sample_to_md(s)
        # sample info
        si = sample_info(samples, stype, i, verbose=False)
        si_str = sampleinfo_to_str(si['info'])
        if si_str:
            md += '\n'
            md += 'info: ' + si_str + '  '
        md += '\n'
        img_file = path.join(img_dir, 'sample_{0:02d}.png'.format(i))
        sample_img(samplesk, stype, i, img_file=img_file, show=False, verbose=False)
        md += '![{0}](img/{0})\n\n'.format(sfile)
    return md

def save_md(file_path, md_txt):
    try:
        with open(file_path, 'w') as fl:
            fl.write(md_txt)
    except Exception as e:
        print('Error: ' + str(e))

### Results

In [None]:
final_result = make_final()

def format_results(df):
    df.loc[:, ['Comment']] = df.apply(lambda x: final_sample_info(x['info'], x['Pre-dilution']), axis=1)
    df.loc[:, ['CV [%]']] = df.apply(lambda x:'{:.2f}'.format(x['CV [%]']), axis=1)
    df.loc[:, ['Result [cp/ml]']] = df.apply(lambda x: x['Comment'] if math.isnan(x['Result [cp/ml]']) else '{:.4e}'.format(x['Result [cp/ml]']), axis=1)

    df.drop(['info', 'Valid', 'Reader Data [cp/ml]'], axis=1, inplace=True)
    
    return df

# format_results(final_result)
# final_result

In [None]:
def comment_fn(info, multiplier):
    return sampleinfo_to_str(info['info'], multiplier)

def result_section(df):
    # df.loc[:, ['Comment']] = df.apply(lambda x: comment_fn(x['info'], x['Pre-dilution']), axis=1)

    # df.drop(['info', 'Valid'], axis=1, inplace=True)
    md = '## Analysis Results\n\n'

    md += format_results(df).to_markdown()
    md += '\n\n'
    
    return md


### Header

In [None]:
def header_section(date, id, msg):
    md =  '## Header\n\n'

    md += 'Date: {}\n\n'.format(date)
    md += 'Identification: {}\n\n'.format(id)
    md += 'Comment: {}\n\n'.format(msg)

    return md;

### Report Assembly

In [None]:
report = '''
# Automatically Generated Markdown report

This a PoC for automatic report generation...  

'''

report += header_section('05 May 2023', 'GN004240-033', 'Plate 01')
report += result_section(final_result.drop('reference 01', axis=0))
img_dir = path.join(REPORT_DIR, 'img')
os.makedirs(img_dir, exist_ok=True)
report += fit_section_md(ref, popt, pcov, img_dir) # TODO: !!! global fit_result[3]

report += sample_section_md(samplesk, img_dir)

save_md(REPORT_FILE_PATH, report)

### Export to PDF

In [None]:
from md2pdf.core import md2pdf
PDF_FILE_PATH = "./result/my.pdf"

In [None]:
md2pdf(PDF_FILE_PATH,
       md_content=report,
       md_file_path=None,
       css_file_path=None,
       base_url=None)