# PCR report Work In Progress :)

In [None]:
DATA_DIR = './data'

In [None]:
from pcrep import parse_input

input = '230901_GN004308-086_20230901_112734_186.csv'
dc = parse_input.parse_inputname(input)
dc

In [None]:
# 230901_GN004308-086.md
out_name = dc['date'] + dc['gn'] + '.md'
out_name

In [None]:
import pandas as pd
import os

INPUT_PCR_DATA = "./example/230901_GN004308-086/230901_GN004308-086_20230901_112734_186.csv"
df = pd.read_csv(INPUT_PCR_DATA, delimiter=';')
df = df.replace(',', '.', regex=True)
CONC_NAME = 'Conc(copies/µL)'
df[CONC_NAME] = df[CONC_NAME].astype('Float64')

DIL_FINAL_FACTOR_NAME = 'final dilution factor'
DIL_SAMPLE_DESCRIPTION_NAME = 'Sample name'
DIL_TYPE_NAME = 'type'


def get_dir(path_name):
    if not os.path.isfile(path_name):
        raise Exception('Not file!')
    return os.path.split(path_name)[0]


WORK_DIR = get_dir(INPUT_PCR_DATA)
display(WORK_DIR)

In [None]:
INPUT_CONCENTRATION_DATA = "./example/230901_GN004308-086/230901_GN004308-086_conc.csv"
df_conc = pd.read_csv(INPUT_CONCENTRATION_DATA)

df_conc.set_index(['sample_id'], inplace=True)
df_conc

In [None]:
df[["wr", "wc"]] = df.apply(lambda x: well2idx(
    x['Well']), axis='columns', result_type='expand')
df.set_index(['wr', 'wc'], inplace=True)
# df

In [None]:
FDL_NAME = "final dilution factor"
SAMPLE_NAME = "Sample"
SAMPLE_TYPE_NAME = "sample type"
SAMPLE_NUM_NAME = "Sample description 1"

df.loc[:, [FDL_NAME]] = df[SAMPLE_NUM_NAME].map(
    df_conc[DIL_FINAL_FACTOR_NAME], na_action='ignore')

df.loc[:, [SAMPLE_NAME]] = df[SAMPLE_NUM_NAME].map(
    df_conc[DIL_SAMPLE_DESCRIPTION_NAME], na_action='ignore')

df.loc[:, [SAMPLE_TYPE_NAME]] = df[SAMPLE_NUM_NAME].map(
    df_conc[DIL_TYPE_NAME], na_action='ignore')

df = df.dropna(subset=[SAMPLE_TYPE_NAME])
df

## Compute results

In [None]:
def result_fn(conc, dil, a=20.0, b=2.0):
    """Compute results

    Parameters:
    conc : float
    dil : float
        final dilution factor of the sample
    a : float
        ddPCR Volume 20 µL
    b : float
        Sample volume in the ddPCR reaction 2 µL
    """
    return ((a * conc) * (1000.0 / b)) * dil


WELL_RESULT_NAME = 'vg/ml'
df.loc[:, [WELL_RESULT_NAME]] = df.apply(lambda x: result_fn(
    x['Conc(copies/µL)'], x['final dilution factor']), axis=1)

### Limits

In [None]:
PLASMID_CONTROL_LIMITS_FILE = 'plasmid_control_limits.csv'
palsmid_control_limits = pd.read_csv(
    os.path.join(DATA_DIR, PLASMID_CONTROL_LIMITS_FILE))
palsmid_control_limits.set_index(['Target'], inplace=True)
palsmid_control_limits

In [None]:
REFERENCE_CONTROL_LIMITS_FILE = 'reference_control_limits.csv'
reference_control_limits = pd.read_csv(
    os.path.join(DATA_DIR, REFERENCE_CONTROL_LIMITS_FILE))
reference_control_limits.set_index(['Target'], inplace=True)
reference_control_limits

How to access limits

In [None]:
rcl = reference_control_limits
lmts = rcl.loc['IDT']
lmts['upper 3s action']

In [None]:
from pcrep.check import METHOD_LIMIT_MULTIPLIER_NEGATIVE_CONTROL
METHOD_TARGET_ID = 'IDT'
METHOD_LIMIT_MULTIPLIER = 1.0e3  # conversion μl -> ml
method_limits = pd.read_csv('./data/method_limits.csv')

# method_limits['Lower [vg/μl]'] = method_limits['Lower [vg/μl]'].multiply(
#     METHOD_LIMIT_MULTIPLIER)
# method_limits.rename(columns={"Lower [vg/μl]": "Lower [vg/ml]"}, inplace=True)

# method_limits['Upper [vg/μl]'] = method_limits['Upper [vg/μl]'].multiply(
#     METHOD_LIMIT_MULTIPLIER)
# method_limits.rename(columns={"Upper [vg/μl]": "Upper [vg/ml]"}, inplace=True)

method_limits.set_index(['target_id'], inplace=True)
display(method_limits)

dc_limits = {'method': method_limits, 'reference_control': reference_control_limits,
             'plasmid_control': palsmid_control_limits}

In [None]:
method_limits.loc['IDT']['Lower [vg/μl]']
mlmts = method_limits.loc['IDT']
mlmts['Lower [vg/μl]']

In [None]:
print(method_limits.loc[METHOD_TARGET_ID]['Lower [vg/μl]'],
      method_limits.loc[METHOD_TARGET_ID]['Upper [vg/μl]'])

negative_control_limit = method_limits.loc[:,
                                           'Lower [vg/μl]'][METHOD_TARGET_ID] * METHOD_LIMIT_MULTIPLIER_NEGATIVE_CONTROL
negative_control_limit

### Method check

In [None]:
from pcrep.check import method_check_routing

METHOD_CHECK_COLNAME = 'Conc(copies/µL)'


def method_check_fn(s):
    return method_check_routing(dc_limits['method'], s[SAMPLE_TYPE_NAME],
                                s[METHOD_CHECK_COLNAME], s['Target'])


VALUE_CHECK_NAME = 'method_check'
df.loc[:, [VALUE_CHECK_NAME]] = df.apply(lambda x: method_check_fn(x), axis=1)
df.head()

### Droplets check

In [None]:
from pcrep.check import droplets_check

DROPLET_THRESHOLD = int(10000)
DROPLET_CHECK_COLNAME = 'Accepted Droplets'


def droplets_check_fn(s):
    return droplets_check(s[DROPLET_CHECK_COLNAME], DROPLET_THRESHOLD)


DROPLET_CHECK_NAME = 'droplet_check'
df.loc[:, [DROPLET_CHECK_NAME]] = df.apply(
    lambda x: droplets_check_fn(x), axis=1)
df.head()

#### Acess data examples

In [None]:
df['Target'].unique()

In [None]:
df_idt = df.query("Target == 'IDT'")
# df_idt

targets = df['Target'].unique()
dc = dict.fromkeys(targets)
for t in targets:
    dc[t] = df.query("Target == @t")

# dc['ITR']

In [None]:
dc['ITR'].head()

In [None]:
df_itr = dc['ITR']

In [None]:
df_itr.head()

In [None]:
def get_sample(dfa, sample_num):
    # TODO: check for valid `type` `and sample_num`
    dfa = dfa.loc[dfa['Sample description 1'] == sample_num]
    return dfa

In [None]:
s = get_sample(dc['ITR'], 6)
s

### Multiindex ????

In [None]:
dfi = df.copy()
# dfi
dfi.reset_index(inplace=True)
# dfi = dfi.set_index(['Well', 'Sample description 1'], append=True)
dfi.rename(columns={'Sample description 1': 'sample_id'}, inplace=True)
dfi.set_index(['sample_id', 'Target', 'Well'], inplace=True)
dfi.sort_index(inplace=True)
dfi.sort_index(axis=1)
dfi.drop(['wr', 'wc',
          'Sample description 2', 'Sample description 3', 'Sample description 4',
          'TargetType', 'Supermix', 'Status', 'Experiment', 'SampleType'],
         axis=1, inplace=True)
dfi

In [None]:
# tmp = dfi.groupby(level=["sample_id", 'Target'])['vg/ml']
# tmean = tmp.mean()
# tmean.head()

display(dfi.groupby(level=["sample_id", 'Target']).apply(
    lambda x: x['Positives'].mean()).head())

#### Control check

Compute mean of `[vg/ml]` amd apply control checks

In [None]:
dfi.loc[:, ['mean [vg/ml]']
        ] = dfi.groupby(level=["sample_id", 'Target']).apply(lambda x: x['vg/ml'].mean())

In [None]:
from pcrep.check import control_check_routing

CONTROL_CHECK_COLNAME_ORIG = 'mean [vg/ml]'


def control_check_fn(s):
    return control_check_routing(dc_limits, s[SAMPLE_TYPE_NAME],
                                 s[CONTROL_CHECK_COLNAME_ORIG], s.name[1])


CONTROL_CHECK_NAME = 'control_check'
dfi.loc[:, [CONTROL_CHECK_NAME]] = dfi.apply(
    lambda x: control_check_fn(x), axis=1)
dfi.head()

#### CV check

Compute Coefficient of variation (CV) and apply check

In [None]:
dfi.loc[:, ['STDE']] = dfi.groupby(level=["sample_id", 'Target']).apply(
    lambda x: x['vg/ml'].std(ddof=0))
dfi.head()

In [None]:
def cv_fn(mean_vam: float, std_val: float, stype: str):
    cv = float("nan")
    # cv is not applied to negative samples
    if stype == 'nc':
        return cv

    if isinstance(mean_vam, float) and mean_vam != 0.0:
        cv = 100.0 * std_val / mean_vam
    return cv


CV_COLNAME = 'CV [%]'
dfi.loc[:, [CV_COLNAME]] = dfi.apply(lambda x: cv_fn(
    x['mean [vg/ml]'], x['STDE'], x['sample type']), axis=1)
dfi

In [None]:
from pcrep.check import cv_check


def cv_check_fn(cv_val: float):
    return cv_check(cv_val)


CV_CHECK_NAME = 'cv_check'
dfi.loc[:, [CV_CHECK_NAME]] = dfi.apply(
    lambda x: cv_check_fn(x[CV_COLNAME]), axis=1)
dfi

In [None]:
dfi.to_excel('./example/230901_GN004308-086/GN004308-086-data_analysis.xlsx')

#### Get sample...

In [None]:
def get_sample_ex(samnple_num, target_type):
    idx = pd.IndexSlice
    return dfi.loc[idx[samnple_num, target_type, :], :]


tmps = get_sample_ex(2, 'IDT')
tmps

## Checks control, samples, ...

### Process sample

In [None]:
def process_sample(s):
    m = s.loc[:, WELL_RESULT_NAME].mean()
    t = s.loc[:, WELL_RESULT_NAME].std(ddof=0)
    if isinstance(m, float) and m != 0.0:
        cv = 100.0 * t / m
    else:
        cv = float("nan")
    return {'id': s['Sample description 1'][0], 'type': s[SAMPLE_TYPE_NAME][0], 'name': s[SAMPLE_NAME][0], 'mean': m, 'std': t, 'cv': cv}


process_sample(s)

In [None]:
import json

params_file = './data/params.json'
with open(params_file) as json_file:
    check_params = json.load(json_file)
# print(check_params)


def check_sample_cv(sample, ref):
    cv_hreshold = ref['assesmentCriteria']['cvThreshold'] * 100.0  # in %
    complies = True
    comment = None
    if sample['cv'] > cv_hreshold:
        complies = False
        comment = "CV {:.2f} > {:.2f}".format(sample['cv'], cv_hreshold)
    return {"complies": complies, "comment": comment}


check_sample_cv(process_sample(s), check_params)

In [None]:
def gen_sample_results(df):
    targets = df['Target'].unique()
    df_res = pd.DataFrame(columns=['id', SAMPLE_TYPE_NAME, 'target', 'sample name',
                          'result [vg/ml]', 'STDE', 'CV [%]'])
    for t in targets:
        dft = df.query("Target == @t")
        samples = dft[SAMPLE_NUM_NAME].unique()
        for n in samples:
            s = get_sample(dft, n)
            r = process_sample(s)
            df_res.loc[len(df_res)] = [r['id'], r['type'], t, r['name'],
                                       r['mean'], r['std'], r['cv']
                                       ]

    return df_res


df_t = gen_sample_results(df)
df_t = df_t.dropna(subset=['sample name'])
df_t

### Control check - plasmid & reference

In [None]:
dc_limits['plasmid_control'].loc['IDT']

In [None]:
from pcrep.check import control_check_routing

CONTROL_CHECK_COLNAME = 'result [vg/ml]'


def control_check_fn(s):
    return control_check_routing(dc_limits, s[SAMPLE_TYPE_NAME],
                                 s[CONTROL_CHECK_COLNAME], s['target'])


CONTROL_CHECK_NAME = 'control_check'
df_t.loc[:, [CONTROL_CHECK_NAME]] = df_t.apply(
    lambda x: control_check_fn(x), axis=1)
df_t.head()
# df_t

### Coefficient of variation (CV) check

In [None]:
from pcrep.check import cv_check

CV_CHECK_COLNAME = 'CV [%]'


def cv_check_fn(s):
    return cv_check(s[CV_CHECK_COLNAME])


CV_CHECK_NAME = 'cv_check'
df_t.loc[:, [CV_CHECK_NAME]] = df_t.apply(lambda x: cv_check_fn(x), axis=1)
# df_t.head()
df_t

In [None]:
dfidt = df_t.loc[df_t['target'] == 'IDT']
dfidt = dfidt[['id', 'target', 'sample name',
               'result [vg/ml]', CONTROL_CHECK_NAME]]
dfidt.set_index(['id'], inplace=True)
starget = 'IDT'
dfidt.rename(columns={"target": f"target {starget}",
                      "result [vg/ml]": f"result {starget} [vg/ml]",
                      CONTROL_CHECK_NAME: f'comment {starget}'}, inplace=True)
dfidt

In [None]:
dfitr = df_t.loc[df_t['target'] == 'ITR']
dfitr = dfitr[['id', 'target', 'result [vg/ml]', CONTROL_CHECK_NAME]]
dfitr.set_index(['id'], inplace=True)
starget = 'ITR'
dfitr.rename(columns={"target": f"target {starget}",
                      "result [vg/ml]": f"result {starget} [vg/ml]",
                      CONTROL_CHECK_NAME: f'comment {starget}'}, inplace=True)
dfitr

In [None]:
dfj = pd.concat([dfidt, dfitr], axis=1, join="inner")

dfj


def fn_concat(s1, s2):

    if s1 and s2:
        return s1 + '/' + s2
    elif s1:
        return s1
    else:
        return s2
    return None


dfj.loc[:, ['target']] = dfj.apply(lambda x: fn_concat(
    x['target IDT'], x['target ITR']), axis=1)
# TODO: !!! Uncomment below to add comments; needs to be refactored to generate comments from dictionary
# dfj.loc[:, ['comment']] = dfj.apply(lambda x: fn_concat(
#     x['comment IDT'], x['comment ITR']), axis=1)

final = dfj.reindex(
    ['target', 'sample name', 'result IDT [vg/ml]', 'result ITR [vg/ml]', 'comment'], axis=1)

final

## Export report

In [None]:
with open('md_intro.md', 'r') as f:
    md_intro = f.read()
with open('md_end.md', 'r') as f:
    md_end = f.read()

md_eval = final.to_markdown()

md = md_intro + md_eval + md_end


def save_md(file_path, md_txt):
    try:
        with open(file_path, 'w') as fl:
            fl.write(md_txt)
    except Exception as e:
        print('Error: ' + str(e))


MD_FILE = './example/230901_GN004308-086/230901_GN004308-086.md'
save_md(MD_FILE, md)

xls_path = os.path.splitext(MD_FILE)[0] + '.xlsx'
final.to_excel(xls_path)

In [None]:
from pcrep import mdhandling

with open(os.path.join(DATA_DIR, "config.json")) as json_file:
    jd = json.load(json_file)
    reference_doc = jd['reference_docx']
    pdflatex_bin = jd['pdflatex_bin']
    pandoc_bin = jd['pandoc_bin']

mdhandling.md2docx(pandoc_bin, reference_doc, MD_FILE)
print("Done.")