In [None]:
import rapidfuzz
import Bio.SeqIO
import tqdm.auto
import pandas as pd
import gzip
import plotly.express as px
import plotly.graph_objects as go

In [None]:
def compare_to_references(sequences, reference_sequences):
    """Compare sequences to reference sequences.

    Args:
        sequences (dict): Dictionary of {seq_id: sequence}.
        reference_sequences (dict): Dictionary of {seq_id: sequence}.

    Returns:
        pd.DataFrame: Dataframe with the following columns:
            seqid (str): Sequence ID.
            ref_seqid (str): Reference sequence ID.
            edit_distance (int): Edit distance to reference sequence.
            similarity (float): Normalized similarity to reference sequence.
            length (int): Sequence length.
    """
    
    # get the reference sequence for each sequence, and calculate metrics
    seq_ids = []
    ref_seqids = []
    edit_distances = []
    similarities = []
    lengths = []
    for seq_id, seq in tqdm.auto.tqdm(sequences.items()):
        _, _, ref_seqid = rapidfuzz.process.extractOne(seq, reference_sequences)
        seq_ids.append(seq_id)
        ref_seqids.append(ref_seqid)
        lengths.append(len(seq))
        edit_distances.append(rapidfuzz.distance.Levenshtein.distance(seq, reference_sequences[ref_seqid]))
        similarities.append(rapidfuzz.distance.Levenshtein.normalized_similarity(seq, reference_sequences[ref_seqid]))

    # create a dataframe with the results
    df = pd.DataFrame({
        "seqid": seq_ids, 
        "ref_seqid": ref_seqids, 
        "edit_distance": edit_distances, 
        "similarity": similarities, 
        "length": lengths
    })
    return df


ref_seqs = {record.id: str(record.seq) for record in Bio.SeqIO.parse("./design_files.fasta", "fasta")}

# unmapped reads

In [None]:
# dfs = []

# for i in range(1, 6+1):
#     sequences = {record.id: str(record.seq) for record in Bio.SeqIO.parse(gzip.open(f"./PCR{i}/unmapped.fq.gz", "rt"), "fastq")}
#     df = compare_to_references(sequences, ref_seqs)
#     df["exp"] = f"PCR{i}"
#     dfs.append(df)

# df = pd.concat(dfs).reset_index(drop=True)
# df.to_csv("unmapped_data.csv", index=False)
df = pd.read_csv("unmapped_data.csv")

In [None]:
fig = px.histogram(
    df, 
    x="edit_distance", 
    facet_col="exp", 
    facet_col_wrap=2, 
    facet_row_spacing=0.1,
    facet_col_spacing=0.05,
    range_x=[0, 120],
    range_y=[0, 175],
)

fig.update_traces(marker=dict(line=dict(width=0)), xbins=dict(start=0, end=150, size=2))

fig.update_xaxes(title_text="Edit distance", row=1)
fig.update_yaxes(title_text="Count", col=1)
fig.for_each_annotation(lambda a: a.update(text=""))

for i in range(1, 7):
    row = 3 - (i-1) // 2
    col = (i-1) % 2 + 1
    fig.add_annotation(
        x=20, 
        y=150, 
        text=f"<b>PCR {i}</b><br>n = {len(df[df['exp'] == f'PCR{i}'])}", 
        showarrow=False, 
        font=dict(size=28/3, family="Inter"),
        row=row,
        col=col,
    )


fig.update_layout(
    template="simple_white",
    font_family="Inter",
    legend_font_size=28/3,
    margin=dict(l=0, r=10, t=10, b=0),
    width=320,
    height=300,
)
fig.update_yaxes(
    minor_ticks="outside", 
    title_font_family="Inter", 
    title_font_size=28/3, 
    tickfont_size=28/3, 
)
fig.update_xaxes(
    minor_ticks="outside", 
    title_font_family="Inter", 
    title_font_size=28/3, 
    tickfont_size=28/3, 
)
fig.for_each_annotation(lambda a: a.update(
    font_size=28/3,
    font_family="Inter",
))
fig.show()
fig.write_image("unmapped_distances.svg")

# unassigned reads

In [None]:
unassigned = pd.read_csv("./unassigned/scafstats.txt", sep="\t", dtype={'#name': str})
unassigned.rename(columns={"#name": "seq_id"}, inplace=True)
unassigned = unassigned[["seq_id", "assignedReads"]].copy()

params = pd.read_csv("params.csv", dtype={'seq_id': str})
counts = pd.read_csv("abundance_by_experiment.csv", dtype={'seq_id': str})
total_df = pd.merge(unassigned, params, on="seq_id")
total_df = pd.merge(total_df, counts, on="seq_id")
total_df

In [None]:
total_df["totalCount"] = total_df[[f"PCR{i}" for i in range(1, 6+1)]].sum(axis=1)

In [None]:
fig = px.scatter(
    total_df, 
    x="assignedReads", 
    y="totalCount", 
    color="eff",
    color_continuous_scale=px.colors.diverging.RdYlBu,
    range_color=[0.95,1.0],
)

fig.update_traces(marker=dict(size=2, opacity=0.5, line=dict(width=0)))

fig.update_layout(coloraxis_colorbar=dict(
    title="PCR eff.",
    title_font_family="Inter", 
    title_font_size=28/3, 
    tickfont_size=28/3,
    thicknessmode="pixels", 
    thickness=20,
    lenmode="pixels", 
    len=200,
    yanchor="top", y=0.8,
))

fig.add_annotation(
    x=1.2, 
    y=0.95, 
    xref="paper",
    yref="paper",
    text=f"n = {total_df.assignedReads.sum()}<br>σ = {total_df.assignedReads.corr(total_df.totalCount, method="spearman"):.2f}", 
    showarrow=False, 
    font=dict(size=28/3, family="Inter"),
    xanchor="center",
    yanchor="top",
)
lineannotations = {
    0.01: [80, 3900],
    0.05: [240, 3900],
    0.1: [445, 3900],
    0.2: [600, 2700],
}
for fac in [0.01, 0.05, 0.1, 0.2]:
    fig.add_trace(go.Scatter(
        x=[0, 1e6*fac], 
        y=[0, 1e6],
        marker_color="#bdbdbd",
        mode="lines",
    ))
    fig.add_annotation(
        x=lineannotations[fac][0], 
        y=lineannotations[fac][1], 
        text=f"{fac:.0%}", 
        showarrow=False, 
        font=dict(size=28/3, family="Inter", color="#bdbdbd"),
    )
    print(fac, total_df.loc[total_df.assignedReads < total_df.totalCount*fac].shape[0] / total_df.shape[0])

rangeannotations = {
    (0.00, 0.01): [5, 3000],
    (0.01, 0.05): [90, 3000],
    (0.05, 0.1): [210, 2900],
    (0.1, 0.2): [380, 2600],
    (0.2, 1.0): [550, 900],
}
for (start, end), pos in rangeannotations.items():
    fig.add_annotation(
        x=pos[0], 
        y=pos[1], 
        text=f"{total_df.loc[(total_df.assignedReads < total_df.totalCount*end) & (total_df.assignedReads >= total_df.totalCount*start)].shape[0] / total_df.shape[0]:.1%}", 
        showarrow=False, 
        font=dict(size=20/3, family="Inter", color="#bdbdbd"),
    )


fig.data = fig.data[::-1]
fig.update_xaxes(
    title_text="Mapped reads without index-based assignment",
    range=[-10, 1.01*total_df.assignedReads.max()],
)
fig.update_yaxes(
    title_text="Total reads across sequencing run",
    range=[-100, 1.01*total_df.totalCount.max()],
)
fig.update_layout(
    template="simple_white",
    font_family="Inter",
    legend_font_size=28/3,
    margin=dict(l=0, r=10, t=10, b=0),
    width=320,
    height=300,
    showlegend=False,
)
fig.update_yaxes(
    minor_ticks="outside", 
    title_font_family="Inter", 
    title_font_size=28/3, 
    tickfont_size=28/3, 
)
fig.update_xaxes(
    minor_ticks="outside", 
    title_font_family="Inter", 
    title_font_size=28/3, 
    tickfont_size=28/3, 
)
fig.show()
fig.write_image("unassigned_abundance.svg")

# Compose

In [None]:
import svgutils.transform as sg
fig = sg.SVGFigure("680px", "315px")

panel = sg.fromfile(f'unmapped_distances.svg').getroot()
panel.moveto(0, 10)
fig.append(panel)

panel = sg.fromfile(f'unassigned_abundance.svg').getroot()
panel.moveto(680-320, 10)
fig.append(panel)

txt = sg.TextElement(2, 10, "a", size=28/3, weight="bold", font="Inter")
fig.append([txt])
txt = sg.TextElement(680-320+2, 10, "b", size=28/3, weight="bold", font="Inter")
fig.append([txt])

fig.save(f'unmapped_composed.svg')