In [None]:
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import itertools

import sys
sys.path.append('../')
import plotting

# Read data from Data Sheet for beads

Data from Beckman Coulter for AMPure XP beads (https://media.beckman.com/-/media/pdf-assets/data-sheets/genomics-data-sheet-ampure-xp-purification-and-clean-up.pdf) digitized with WebPlotDigitizer (https://apps.automeris.io/wpd/)

In [None]:
full_df = pd.read_csv('wpd_datasets.csv', header=[0,1], skipinitialspace=True)

full_df

# Convert multilevel columns to long format

In [None]:
dfs = []

for i in range(len(full_df.columns.levels[0])//2):
    idf = full_df.iloc[:, 2*i:2*(i+1)].copy()
    name = idf.columns[0][0]
    idf.columns = idf.columns.droplevel(0)
    idf.dropna(inplace=True)
    idf['group'] = name
    dfs.append(idf)
long_df = pd.concat(dfs).reset_index(drop=True)

long_df

# Generate transform for x-axis

In [None]:
axis_points = long_df[long_df.group == "xaxis"].copy().sort_values(by="X").X.values
real_points = [35, 50, 100, 150, 200, 300, 400, 500, 600, 700]

xvals = np.linspace(35, 700, 1000)
yinterp = np.interp(xvals, axis_points, real_points)
fig = px.scatter(x=xvals, y=yinterp)
fig.add_trace(go.Scatter(x=axis_points, y=real_points, mode='markers', name='Real points'))
fig.show()

# Transform x-data and clean up

In [None]:
analysis_df = long_df[long_df.group != "xaxis"].copy()

# clip signal (Y) to zero
analysis_df['signal'] = analysis_df.Y.clip(lower=0)
# transform X to real size
analysis_df['size'] = np.interp(analysis_df.X, axis_points, real_points)

# drop uninteresting region
analysis_df.drop(analysis_df[analysis_df['size'] <= 40].index, inplace=True)
analysis_df.drop(analysis_df[analysis_df['size'] >= 700].index, inplace=True)

px.line(analysis_df, x='size', y='signal', color='group').show()
analysis_df

# Interpolate to equal grid

In [None]:
x_vals = np.linspace(40, 700, 100)
def interp(df):
    return pd.Series(np.interp(x_vals, np.array(df["size"]), np.array(df["signal"])))

interp_df = analysis_df.groupby('group').apply(interp).T
interp_df['size'] = x_vals

interp_df

# Normalize signal to Input

In [None]:
interp_df[['norm_0.6x', 'norm_0.7x', 'norm_0.9x', 'norm_1.8x']] = interp_df[['0.6x', '0.7x', '0.9x', '1.8x']].div(interp_df['Input'], axis=0)

# clip signal (Y) to one
interp_df[['norm_0.6x', 'norm_0.7x', 'norm_0.9x', 'norm_1.8x']] = interp_df[['norm_0.6x', 'norm_0.7x', 'norm_0.9x', 'norm_1.8x']].clip(upper=1)

px.line(interp_df, x='size', y=['norm_0.6x', 'norm_0.7x', 'norm_0.9x', 'norm_1.8x']).show()
interp_df

# Fit piecewise linear function

In [None]:
def recovery_model(x, start, stop):
    if stop < start: return np.zeros_like(x)
    y = np.zeros_like(x)
    y[x >= stop] = 1
    y[(x > start) & (x < stop)] = np.linspace(0, 1, np.sum((x > start) & (x < stop)))
    return y

# get all possible combinations of start and stop points
combinations = itertools.product(*[np.arange(40, 700, 10)]*2)
combinations = [c for c in combinations if c[0] < c[1]]

results = {}

for exp in ['0.6x', '0.7x', '0.9x', '1.8x']:
    df = interp_df[['size', f'norm_{exp}']].copy()
    guess_start = df['size'][df[f'norm_{exp}'] < 0.01].max()
    guess_stop = df['size'][df[f'norm_{exp}'] > 0.99].min()
    idf = df[(df['size'] > max(40, guess_start-40)) & (df['size'] < min(700, guess_stop+40))].copy()

    values = []
    for combination in combinations:
        y_pred = recovery_model(idf['size'], *combination)
        values.append(np.sum((idf[f'norm_{exp}'] - y_pred)**2))
    params = combinations[np.argmin(values)]
    results[exp] = {"fit_range": (idf['size'].min(), idf['size'].max()), "params": params}
    print(f"Best combination for {exp}: {params} with {values[np.argmin(values)]:.5f} loss")

# Plot results

In [None]:
colormap = {
    'norm_0.6x': '#e41a1c', 
    'norm_0.7x': '#377eb8', 
    'norm_0.9x': '#984ea3', 
    'norm_1.8x': '#ff7f00'
}

fig = px.line(
    interp_df, 
    x='size', 
    y=['norm_0.6x', 'norm_0.7x', 'norm_0.9x', 'norm_1.8x'],
    color_discrete_map=colormap,
)


for exp in ['0.6x', '0.7x', '0.9x', '1.8x']:
    data = results[exp]
    fig.add_trace(go.Scatter(
        x=np.linspace(*data["fit_range"], 100),
        y=recovery_model(np.linspace(*data["fit_range"], 100), *data["params"]), 
        mode='lines', 
        line_dash='dot',
        line_color='black',
    ))

    fig.add_annotation(
        x=0.875*data["fit_range"][1],
        y=0.7,
        text=f"{exp}",
        showarrow=False,
        font_color=colormap[f"norm_{exp}"],
    )


fig.update_layout(
    width=300,
    height=200,
    showlegend=False,
    margin=dict(l=0, r=10, t=10, b=0),
)
fig.update_xaxes(title_text='Size', range=[40, 650])
fig.update_yaxes(title_text='Normalized signal', range=[0, 1.01])

fig = plotting.standardize_plot(fig)
fig.write_image("figures/recovery_model.svg")
fig.show()