In [None]:
#@title Install PYIR and setup the database

# Install Biopython & PyIR
%pip install crowelab_pyir
%pip install Biopython
%pip install XlsxWriter
!pip install mpl-chord-diagram
# Builds databases in pyir library directory
!pyir setup

In [None]:
from google.colab import drive

drive.mount('/content/drive')
folder_path = "/content/drive/Shared drives/BIIE_PROGRAM-LABS/DEEPIR/3_RESEARCH/IRP-110_MAMMALIAN-AB/1_PILOT-PHASE/2_DATA-FILES/Sanger_Seq_Clones/4392194_4392777" #@param {type:"string"}


In [None]:
from getpass import getpass
import os

#Securely input your GitHub token
token = getpass('Enter your GitHub token: ')

# Your GitHub username and repo
username = "Maria-Chatzinikolaou"
repo = "Ab-Clone-Selenction"
repo_url = f"https://{token}:x-oauth-basic@github.com/{username}/{repo}.git"

# Clone the repository
!git config --global user.name "Maria-Chatzinikolaou"
!git config --global user.email "maria.chatzinikolaou@immune.engineering"

!git config --global user.email "you@example.com"
!git config --global user.name "Your Name"

# (Optional) If already exists, skip cloning
if not os.path.exists(repo):
    !git clone {repo_url}


In [None]:
print("tralala")

In [None]:
from getpass import getpass
import os

# 🔐 Input GitHub token securely
token = getpass('Enter your GitHub token: ')

# 🧾 Repo info
username = "Maria-Chatzinikolaou"
repo = "Ab-Clone-Selenction"
repo_url = f"https://{token}:x-oauth-basic@github.com/{username}/{repo}.git"
repo_path = f"/content/{repo}"

# 🛠️ Configure Git
!git config --global user.name "Maria-Chatzinikolaou"
!git config --global user.email "maria.chatzinikolaou@immune.engineering"


# 🔽 Clone if not already cloned
if not os.path.exists(repo):
    !git clone {repo_url}

# 📁 Set source and destination paths
source_path = "/content/drive/Shared drives/BIIE_PROGRAM-LABS/DEEPIR/3_RESEARCH/IRP-110_MAMMALIAN-AB/1_PILOT-PHASE/2_DATA-FILES/Sanger_Seq_Clones/Copy of SangerCloneIdentifier_DoublePrimer_Cov2RBD_high_binders.ipynb"
target_filename = "SangerCloneIdentifier_DoublePrimer_Cov2RBD_high_binders.ipynb"
target_path = f"{repo_path}/{target_filename}"

# 📝 Copy and rename notebook into the repo directory
!cp "{source_path}" "{target_path}"

# 🧽 Clean outputs
%cd {repo_path}
!pip install nbstripout --quiet
!nbstripout "{target_filename}"

# ✅ Stage the notebook
!git add "{target_filename}"

# 💬 Commit
!git commit -m "Clean upload from Colab"

# 🚀 Push to GitHub
!git push {repo_url}


In [None]:
#@title Import Libraries and define input & output
import numpy as np
import os
import io
import pandas as pd
from google.colab import drive
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from crowelab_pyir import PyIR
import matplotlib.pyplot as plt
from mpl_chord_diagram import chord_diagram
import seaborn as sns
import regex as re  # Use regex module for fuzzy matching
import warnings


# Define a list to store plot images globally
plot_images = []

def save_plot_to_memory(fig, dpi=500):
    """Save matplotlib figure to memory for Excel export."""
    img = io.BytesIO()
    fig.savefig(img, format='png', bbox_inches='tight', dpi=dpi)
    img.seek(0)
    plot_images.append(img)
    plt.close(fig)

# Mount Google Drive
drive.mount('/content/drive')
folder_path = "/content/drive/Shared drives/BIIE_PROGRAM-LABS/DEEPIR/3_RESEARCH/IRP-110_MAMMALIAN-AB/1_PILOT-PHASE/2_DATA-FILES/Sanger_Seq_Clones/4392194_4392777" #@param {type:"string"}
output_csv_file = "CoV2_RBD_high_binders"  #@param {type:"string")
sanger_quality_threshold = 30  #@param {type:"string"}
remove_superimposed_signal_reads = True   #@param {type:"string"}
sanger_report_superimposed_signal_reads = "()"  #@param {type:"string"}


## Phred Quality Score
Phred quality score, or q-score, is a quality measure that estimates the probability that a base was called incorrectly, given on a negative log scale (Q=−log10P(incorrect)) so that a higher q-score indicates a more confident base call.


* Phred score of 50 indicates that there is 99.999% that this base has been assigned *incorrectly*

In [None]:
def get_phred_quality(folder_path, plot=False, sanger_quality_threshold=20):
    """
    Reads all .ab1 files in the specified folder and extracts sequencing quality data.
    If plot=True, generates and saves a histogram of average Phred quality scores.
    """

    quality_data = []

    for filename in os.listdir(folder_path):
        if filename.endswith('.ab1'):
            file_path = os.path.join(folder_path, filename)
            record = SeqIO.read(file_path, "abi")

            quality_scores = record.letter_annotations["phred_quality"]
            avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0
            clone_name = filename.rsplit(".ab1", 1)[0]
            threshold_status = "pass" if avg_quality > sanger_quality_threshold else "failed"

            quality_data.append({
                "File Name": filename,
                "Clone Name": clone_name,
                "Avg Quality": avg_quality,
                "Threshold": threshold_status
            })

    df = pd.DataFrame(quality_data)

    if plot and not df.empty:
        fig1, ax1 = plt.subplots(figsize=(10, 6))
        sns.histplot(df["Avg Quality"], bins=100, kde=False, ax=ax1)
        ax1.set_xlabel("Average Sequencing Quality")
        ax1.set_ylabel("Frequency")
        ax1.set_title("Distribution of Average Phred Sequencing Quality")
        ax1.axvline(sanger_quality_threshold, color='blue', linestyle='dotted', linewidth=2,
                   label=f'Threshold: {sanger_quality_threshold}')
        ax1.legend()
        plt.tight_layout()
        plt.show()
        save_plot_to_memory(fig1, dpi=600)

    return df


In [None]:
df_quality = get_phred_quality(folder_path, plot=True, sanger_quality_threshold=sanger_quality_threshold)

In [None]:
# Parse superimposed reads
if isinstance(sanger_report_superimposed_signal_reads, str):
    try:
        superimposed_clones = list(eval(sanger_report_superimposed_signal_reads))
    except:
        superimposed_clones = []
else:
    superimposed_clones = list(sanger_report_superimposed_signal_reads)

#List of failed clones
failed_clones = df_quality[df_quality['Threshold'] == 'failed']['Clone Name'].tolist()

#Remove reads that were reported with "superimposed signals" by Nanopore and very short in length
exclusion_prefixes = failed_clones.copy()

if remove_superimposed_signal_reads:
    exclusion_prefixes += superimposed_clones

if exclusion_prefixes:
    print("The following clones will be filtered from the quality processing of the reads:")
    for clone in exclusion_prefixes:
        print(f" - {clone}")
else:
    print("No clones to filter out based on quality threshold.")

# Count excluded entries
excluded_from_df = df_quality[df_quality["Clone Name"].str.contains('|'.join(exclusion_prefixes))] if exclusion_prefixes else pd.DataFrame()
num_excluded_clones = excluded_from_df["Clone Name"].nunique()
print(f"Total number of sequence IDs excluded due to quality: {num_excluded_clones}")



# Processing of the .fasta files from Sanger Sequencing
##Fetch Ab Framework Sequences using PyIR

In [None]:
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
import os

output_fasta = os.path.join(folder_path, "combined_parts.fasta")


with open(output_fasta, 'w') as out_handle:
    for filename in os.listdir(folder_path):
        if (("PR186" in filename) or ("PR171" in filename)) and filename.endswith(".fasta"):
            file_path = os.path.join(folder_path, filename)
            for record in SeqIO.parse(file_path, "fasta"):
                # Skip records whose ID starts with any excluded prefix that didnt pass the QC
                if record.id.startswith(tuple(exclusion_prefixes)):
                  continue
                seq_str = str(record.seq)
                if "PR171" in filename:
                    front_seq = seq_str[:700]
                    description_suffix = "[front 700bp]"
                elif "PR186" in filename:
                    front_seq = seq_str[:500]
                    description_suffix = "[front 500bp]"
                else:
                    front_seq = seq_str
                    description_suffix = "[full sequence]"

                front_record = SeqRecord(
                    Seq(front_seq),
                    id=record.id,
                    description=record.description + " " + description_suffix
                )
                SeqIO.write(front_record, out_handle, "fasta")
def predict_frameworks(input_file):
    pyir = PyIR(
        query=input_file,
        args=[
            '--input_type', 'fasta',
            '--outfmt', 'dict',
            '--receptor', 'Ig',
            '--species', 'human'
        ]
    )
    result = pyir.run()
    df = pd.DataFrame.from_dict(result, orient='index')
    df = df.sort_index()
    return df
#-------------------------------------------------------------------------------------------------->
df = predict_frameworks(output_fasta)
#Keep necessary columns
df_reduced = df[['sequence_id', 'sequence', 'locus', 'sequence_alignment', 'sequence_alignment_aa', 'v_family', 'd_family', 'j_family',
                  'cdr1_aa', 'cdr2_aa', 'cdr3_aa', 'cdr3_aa_length', 'j_sequence_alignment', 'v_identity', 'd_identity', 'j_identity']] #j_sequece_alignment will be needed to reconstruct the j gene
cols = ['v_identity', 'd_identity', 'j_identity']

df_reduced[cols] = df_reduced[cols].apply(pd.to_numeric, errors='coerce')
#<--------------------------------------------------------------------------------------------------

#Rretrieve heavy & light chain sequence with nuceotide mismatch allowance in the search *motifs*

In [None]:
def sanger_reads_to_dict(input_folder):
    seq_dict = {}
    seq_lengths = {}
    for filename in os.listdir(input_folder):
        if ("PR186" in filename or "PR171" in filename) and filename.endswith(".fasta"):
            file_path = os.path.join(input_folder, filename)
            for record in SeqIO.parse(file_path, "fasta"):
                seq_dict[record.id] = str(record.seq)
                seq_lengths[record.id] = len(record.seq)
    return seq_dict, seq_lengths

def trim_sequence_by_id(seq_id, seq):
    """
    Trims sequences based on start and end motifs and returns only the sequence between them (excluding motifs).

    - PR171: Extracts the sequence between motifs **as-is**, ensuring length >600.
    - PR186: Extracts the sequence and **returns its reverse complement**, ensuring length >300.
    - If no exact match is found for a motif, allows **1 mismatch**, but only if the length condition is met.
    - If the first mismatch attempt does not fulfill the length condition, tries another mismatch combination.
    - If still unsuccessful, searches for the second occurrence of the start or end motif with 1 mismatch at a different position.
    """
    if not isinstance(seq, str) or pd.isna(seq):  # Handle missing sequences
        return "", 0

    if "PR186" in seq_id:
        start_motif = "GTGACC"
        end_motif = "GGGGCCGGG"
        min_length = 300
        reverse_complement = True
    elif "PR171" in seq_id:
        start_motif = "GCCACCATG"
        end_motif = "CTGGAGAC"
        min_length = 600
        reverse_complement = False
    else:
        return "", 0  # If not recognized, return empty

    def find_motif_with_mismatch(motif, sequence, occurrence=1):
        exact_matches = [m.end() for m in re.finditer(motif, sequence)]
        if len(exact_matches) >= occurrence:
            return exact_matches[occurrence - 1]

        mismatch_matches = [m.end() for m in re.finditer(f'({motif}){{s<=1}}', sequence)]
        if len(mismatch_matches) >= occurrence:
            return mismatch_matches[occurrence - 1]

        return None

    # Try to find start motif
    start_idx = find_motif_with_mismatch(start_motif, seq)
    if start_idx is None:
        return "", 0

    remaining_seq = seq[start_idx:]

    # Try to find end motif
    end_idx = None
    for mismatch_count in range(2):  # Try 0 or 1 mismatch
        end_matches = [m.start() for m in re.finditer(f'({end_motif}){{s<={mismatch_count}}}', remaining_seq)]
        if end_matches:
            for end_position in end_matches:
                trimmed_seq = seq[start_idx:start_idx + end_position]
                if len(trimmed_seq) > min_length:
                    end_idx = start_idx + end_position
                    break  # Stop if valid length is found
        if end_idx:
            break

    # If length is still not fulfilled, search for the second occurrence of start or end motif
    if end_idx is None:
        start_idx_alt = find_motif_with_mismatch(start_motif, seq, occurrence=2)
        if start_idx_alt:
            remaining_seq_alt = seq[start_idx_alt:]
            for mismatch_count in range(2):
                end_matches_alt = [m.start() for m in re.finditer(f'({end_motif}){{s<={mismatch_count}}}', remaining_seq_alt)]
                if end_matches_alt:
                    for end_position_alt in end_matches_alt:
                        trimmed_seq_alt = seq[start_idx_alt:start_idx_alt + end_position_alt]
                        if len(trimmed_seq_alt) > min_length:
                            start_idx, end_idx = start_idx_alt, start_idx_alt + end_position_alt
                            break
                if end_idx:
                    break

    if end_idx is None:
        return "", 0

    trimmed_seq = seq[start_idx:end_idx]

    # Apply reverse complement if needed (only for PR186)
    if reverse_complement:
        trimmed_seq = str(Seq(trimmed_seq).reverse_complement())

    return trimmed_seq, len(trimmed_seq)


def process_sanger_reads(df, input_folder, sanger_reads_to_dict, trim_sequence_by_id):
    """
    Processes Sanger sequencing reads by mapping original sequences and lengths,
    and applying sequence trimming.
    """
    # Build the dictionary of full original sequences and their lengths
    sanger_read_dict, sanger_length_dict = sanger_reads_to_dict(input_folder)

    # Store original sequences and lengths in DataFrame
    df['original_sanger_read'] = df['sequence_id'].map(sanger_read_dict)
    df['original_length'] = df['sequence_id'].map(sanger_length_dict)

    # Apply trimming and store processed sequences and lengths
    df['processed_sanger_read'], df['processed_length'] = zip(*df.apply(
        lambda row: trim_sequence_by_id(row['sequence_id'], row['original_sanger_read']), axis=1
    ))
    return df
#-------------------------------------------------------------------------------------------------->
df_reduced = process_sanger_reads(df_reduced, folder_path, sanger_reads_to_dict, trim_sequence_by_id)
#<--------------------------------------------------------------------------------------------------

# Function to find motifs with up to 2 mismatches but only in either start or end motif (not both)
def find_motifs_with_mismatches(sequence, start_motif, end_motif, allow_mismatch_in_start=True):
    """
    Searches for start and end motifs allowing mismatches in only one of them.
    """
    if allow_mismatch_in_start:
        start_regex = f"({start_motif}){{s<=2}}"  # Allow mismatches in start motif
        end_regex = f"({end_motif})"  # Exact match for end motif
    else:
        start_regex = f"({start_motif})"  # Exact match for start motif
        end_regex = f"({end_motif}){{s<=2}}"  # Allow mismatches in end motif

    start_match = re.search(start_regex, sequence)
    end_match = re.search(end_regex, sequence)

    if start_match and end_match:
        start_pos = start_match.start()
        end_pos = end_match.end()
        return sequence[start_pos:end_pos], end_pos - start_pos
    return "", 0

def find_second_motif(sequence, motif, is_start=True):
    """
    Finds the second occurrence of the given motif in the sequence and extracts the subsequence.

    """
    matches = list(re.finditer(motif, sequence))

    if len(matches) < 2:
        return "", 0  # Not enough motif occurrences

    if is_start:
        # Take the second start motif position and slice from there to the end
        start_pos = matches[1].start()
        return sequence[start_pos:], len(sequence[start_pos:])
    else:
        # Take the second end motif position and slice from start to there
        end_pos = matches[1].end()
        return sequence[:end_pos], len(sequence[:end_pos])

#-------------------------------------------------------------------------------------------------->

# Recheck for sequences where processed_length is 0 or below the thresholds

df_recheck = df_reduced[
    ((df_reduced["processed_length"] == 0) |
     ((df_reduced["processed_length"] < 600) & df_reduced["sequence_id"].str.contains("PR171")) |
     ((df_reduced["processed_length"] < 300) & df_reduced["sequence_id"].str.contains("PR186")))
].copy()

# Process only rechecked rows
for idx, row in df_recheck.iterrows():
    seq_id = row["sequence_id"]
    sequence = row["original_sanger_read"]

    if "PR186" in seq_id:
        start_motif = "GTGACC"
        end_motif = "GGGGCCGGG"
    elif "PR171" in seq_id:
        start_motif = "GCCACCATG"
        end_motif = "CTGGAGAC"
    else:
        continue

    # Step 1: Try allowing mismatches only in the start motif
    processed_seq, processed_len = find_motifs_with_mismatches(sequence, start_motif, end_motif, allow_mismatch_in_start=True)

    # Step 2: If still below threshold, try allowing mismatches only in the end motif
    if ((processed_len == 0) or
        (processed_len < 600 and "PR171" in seq_id) or
        (processed_len < 300 and "PR186" in seq_id)):
        processed_seq, processed_len = find_motifs_with_mismatches(sequence, start_motif, end_motif, allow_mismatch_in_start=False)

    # Step 3: If still below threshold, find second occurrence of start_motif
    if ((processed_len == 0) or
        (processed_len < 600 and "PR171" in seq_id) or
        (processed_len < 300 and "PR186" in seq_id)):
        processed_seq, processed_len = find_second_motif(sequence, start_motif, is_start=True)

    # Step 4: If still below threshold, find second occurrence of end_motif
    if ((processed_len == 0) or
        (processed_len < 600 and "PR171" in seq_id) or
        (processed_len < 300 and "PR186" in seq_id)):
        processed_seq, processed_len = find_second_motif(sequence, end_motif, is_start=False)

    # Update the rechecked DataFrame
    df_recheck.at[idx, "processed_sanger_read"] = processed_seq
    df_recheck.at[idx, "processed_length"] = processed_len


# Merge the updated values back into the original DataFrame
df_reduced.update(df_recheck)
#df_reduced.to_csv('/content/drive/Shared drives/BIIE_PROGRAM-LABS/DEEPIR/3_RESEARCH/IRP-110_MAMMALIAN-AB/1_PILOT-PHASE/2_DATA-FILES/Sanger_Seq_Clones/4318714_4314914/df_reduced_new_.csv')

#<--------------------------------------------------------------------------------------------------

#Stich last part of J gene
def stitch_j_gene(df, motif="GGTCAC"):
    """
    Matches the motif 'GGTCAC' between the processed_sanger_read and the j_sequence_alignment
    for _PR186 sequences and appends the remaining part of the j gene to processed_sanger_read.
    """
    def append_j_gene(row):
        if "PR186" in row["sequence_id"] and isinstance(row["j_sequence_alignment"], str):
            # Find the motif in the sequence
            match = re.search(motif, row["j_sequence_alignment"])
            if match:
                start_idx = match.start()
                remaining_j_gene = row["j_sequence_alignment"][start_idx:]

                 # Extract sequence from motif onward
                return row["processed_sanger_read"] + remaining_j_gene

        return row["processed_sanger_read"]  # Return unmodified for non-PR186

    # Apply function to update processed_sanger_read
    df["processed_sanger_read"] = df.apply(append_j_gene, axis=1)

    return df


#                if updated_sequence.endswith("G"):
        #            updated_sequence = updated_sequence[:-1]
              #  return updated_sequence
       # return row["processed_sanger_read"]

#-------------------------------------------------------------------------------------------------->
df_reduced = stitch_j_gene(df_reduced)
#<--------------------------------------------------------------------------------------------------

#Merge heavy&light chain reads and check for gene identity coverage

In [None]:
remove_identity_failures = True  #@param {type:"boolean"}
remove_length_failures = True  #@param {type:"boolean"}
remove_stopcodon_failures = False  #@param {type:"boolean"}
remove_repeated_genes = True #@param {type:"boolean"}
remove_clonotype_duplicates=True  #@param {type:"boolean"}
prefix = "_INS_" #@param {type:"string"}

In [None]:
def merge_sanger_reads(df,
                       remove_identity_failures=remove_identity_failures,
                       remove_length_failures=remove_length_failures,
                       remove_stopcodon_failures=remove_stopcodon_failures,
                       remove_repeated_genes=remove_repeated_genes,
                       remove_clonotype_duplicates=remove_clonotype_duplicates):
    """
    Merges Sanger sequencing reads by combining heavy (PR186) and light (PR171) chains into a single row.
    Adds QC flags and clonotype frequency, with options to filter out:
      - Length <80 aa
      - Identity <80%
      - Stop codons
      - Repeated gene segments
      - Clonotype duplicates (Frequency > 1)
    Prints summary statistics and final counts.
    Returns the filtered and annotated DataFrame.
    """


    # ------------------- Merge Paired Reads -------------------



    df["clone_name"] = df["sequence_id"].str.replace(r'(_\d+)?_PR\d+$', '', regex=True)
    df_heavy = df[df['sequence_id'].str.contains('_PR186')].copy()
    df_light = df[df['sequence_id'].str.contains('_PR171')].copy()

    df_light = df_light.rename(columns=lambda x: x + "_light" if x not in ["clone_name", "sequence_id"] else x)
    df_heavy = df_heavy.rename(columns=lambda x: x + "_heavy" if x not in ["clone_name", "sequence_id"] else x)

    df_merged = pd.merge(df_heavy, df_light, on="clone_name", how="outer")

    # ------------------- Build Recombination Info -------------------
    recomb_cols = ['v_family_heavy', 'd_family_heavy', 'j_family_heavy', 'v_family_light', 'j_family_light']
    df_merged["info"] = df_merged[recomb_cols].apply(lambda x: "/".join(str(i) for i in x), axis=1)

    # ------------------- Add QC Flags -------------------
    df_merged['length<80'] = (
        (df_merged['sequence_alignment_aa_heavy'].str.len() < 80) |
        (df_merged['sequence_alignment_aa_light'].str.len() < 80)
    )

    identity_cols = ['v_identity', 'd_identity', 'j_identity']
    for col in identity_cols:
        for chain in ['heavy', 'light']:
            colname = f"{col}_{chain}"
            if colname in df_merged.columns:
                df_merged[colname] = pd.to_numeric(df_merged[colname], errors='coerce')

    df_merged['identity<80%'] = (
        (df_merged['v_identity_heavy'] < 80) |
        (df_merged['d_identity_heavy'] < 80) |
        (df_merged['j_identity_heavy'] < 80) |
        (df_merged['v_identity_light'] < 80) |
        (df_merged['d_identity_light'] < 80) |
        (df_merged['j_identity_light'] < 80)
    )

    df_merged['sequence_alignment_aa_heavy'] = df_merged['sequence_alignment_aa_heavy'].astype(str)
    df_merged['sequence_alignment_aa_light'] = df_merged['sequence_alignment_aa_light'].astype(str)
    df_merged['stopcodon_present'] = (
        df_merged['sequence_alignment_aa_heavy'].str.contains(r'\*', na=False) |
        df_merged['sequence_alignment_aa_light'].str.contains(r'\*', na=False)
    )

    df_merged["repeated_gene_present"] = df_merged["info"].apply(
        lambda x: len(x.split('/')) != len(set(x.split('/')))
    )

    # ------------------- Clonotype Frequency -------------------
    group_cols = ['v_family_heavy', 'd_family_heavy', 'j_family_heavy', 'cdr1_aa_heavy',
                  'cdr2_aa_heavy', 'cdr3_aa_heavy', 'cdr1_aa_light', 'cdr2_aa_light',
                  'v_family_light', 'j_family_light', 'cdr3_aa_light']

    frequency_df = df_merged.groupby(group_cols).size().reset_index(name='Frequency')
    df_merged = df_merged.merge(frequency_df, on=group_cols, how='left')

    # ------------------- Print QC and Frequency Stats -------------------
    print(f"Number of clones that did not pass the QC (Phred):", len(exclusion_prefixes))
    print(f"Number of clones with aa sequence length < 80: {df_merged['length<80'].sum()}")
    print(f"Number of clones with v/d/j identity < 80%: {df_merged['identity<80%'].sum()}")
    print(f"Number of clones with stop codons present: {df_merged['stopcodon_present'].sum()}")
    print(f"Number of clones with repeated gene segments: {df_merged['repeated_gene_present'].sum()}")

    total_clones = len(df_merged)
    num_unique_clonotypes = frequency_df.shape[0]
    num_duplicates = total_clones - num_unique_clonotypes
    percent_unique = (num_unique_clonotypes / total_clones) * 100
    percent_duplicates = 100 - percent_unique

    print(f"Τotal number of clonotypes: {total_clones}")
    print(f"Number of duplicate clonotypes: {num_duplicates}")
    print(f"Percentage of duplicates: {percent_duplicates:.2f}%")
    print(f"\nTotal number of unique clonotypes: {num_unique_clonotypes}")
    print(f"Percentage of unique remaining clones: {percent_unique:.2f}%")


    # Print duplicate clone names
    high_freq = df_merged[df_merged["Frequency"] > 1][["clone_name", "info", "Frequency"]].drop_duplicates()
    if not high_freq.empty:
        print("\nClone names with repeated clonotypes:")
        print(high_freq.to_string(index=False))

    # ------------------- Apply Filtering -------------------
    filter_mask = pd.Series(False, index=df_merged.index)
    # Apply filtering based on all selected QC flags
    filter_mask = pd.Series(False, index=df_merged.index)

    if remove_length_failures:
        filter_mask |= df_merged['length<80']
    if remove_identity_failures:
        filter_mask |= df_merged['identity<80%']
    if remove_stopcodon_failures:
        filter_mask |= df_merged['stopcodon_present']
    if remove_repeated_genes:
        filter_mask |= df_merged['repeated_gene_present']
    #if remove_clonotype_duplicates:
     #   filter_mask |= df_merged.duplicated(subset=group_cols, keep='first')
    if remove_clonotype_duplicates:
        filter_mask |= df_merged['Frequency'] > 1

    num_removed = filter_mask.sum()
    num_remaining = (~filter_mask).sum()


    # ------------------- Report which duplicate clones are kept ------------------
    duplicates_df = df_merged[df_merged["Frequency"] > 1].copy()
    duplicate_resolution_records = []
    duplicate_resolution_df = pd.DataFrame()  # Ensure this is always defined

    if remove_clonotype_duplicates:
        for _, group in duplicates_df.groupby(group_cols):
            clone_ids = group["clone_name"].unique().tolist()
            kept_clone = clone_ids[0]
            duplicate_clones = clone_ids[1:]
            recomb = group["info"].iloc[0]
            if duplicate_clones:
              duplicate_resolution_records.append({
              "Kept Clone": kept_clone,
             "Removed Clones": ", ".join(duplicate_clones),
              "Clonotype": recomb
          })
            print(f" - Keeping {kept_clone}, removing {', '.join(duplicate_clones)} (clonotype: {recomb})")

            print(f" - Keeping {kept_clone}, removing {', '.join(duplicate_clones)} (clonotype: {recomb})")
            print(f"\nTotal clones removed after filtering: {num_removed}")
            print(f"Total clones remaining: {num_remaining}")
            duplicate_resolution_df = pd.DataFrame(duplicate_resolution_records)
    else:
      print(f"\nClonotype deduplication is disabled.")
      print(f"Total clones removed after filtering: {num_removed}")
      print(f"Total clones remaining: {num_remaining}")

    # ------------------- Remove duplicate clonotypes from output -------------------
    # Keep only the first clone per unique clonotype
    df_merged = df_merged.drop_duplicates(subset=group_cols, keep='first')

    df_merged.rename(columns={
        "sequence_alignment_heavy": "vh_dna",
        "sequence_alignment_light": "vl_dna",
        "sequence_alignment_aa_heavy": "vh_aa",
        "sequence_alignment_aa_light": "vl_aa"
    }, inplace=True)

    # Define columns to keep if not keeping all columns
    columns_to_keep = [
        "clone_name", "vh_dna",  "vh_aa", "locus_heavy",
        "vl_dna", "vl_aa", "locus_light",
        "processed_sanger_read_heavy", "processed_sanger_read_light","info", "identity<80%", "length<80", "stopcodon_present", "repeated_gene_present"
    ]



    # Keep specific columns

    df_processed = df_merged[columns_to_keep]


    # Substitute "Clone" with "C" in clone_name
    df_processed["clone_name"] = df_processed["clone_name"].astype(str).str.replace(r'^clone', 'C', regex=True)

    # Add prefix to clone_name
    df_processed["clone_name"] = df_processed["clone_name"].apply(lambda x: f"{prefix}{x}" if isinstance(x, str) else x)

    # Natural sorting function to order C1, C2, C3 instead of C1, C10, C2
    def natural_sort_key(text):
        return [int(num) if num.isdigit() else num for num in re.split(r'(\d+)', text)]

    # Sort DataFrame by clone_name using natural sorting
    df_processed = df_processed.sort_values(by="clone_name", key=lambda x: x.map(natural_sort_key))

    # ------------------- Collect summary statistics in a DataFrame ------------------

    # ---- Create summary section as DataFrame ----
    summary_stats = [
    ["Number of clones that did not pass the QC", len(exclusion_prefixes)],
    ["Number of clones with aa sequence length < 80", df_merged['length<80'].sum()],
    ["Number of clones with v/d/j identity < 80%", df_merged['identity<80%'].sum()],
    ["Number of clones with stop codons present", df_merged['stopcodon_present'].sum()],
    ["Number of clones with repeated gene segments", df_merged['repeated_gene_present'].sum()],
    ["", ""],
    ["Number of duplicate clonotypes", num_duplicates],
    ["Percentage of duplicates", f"{percent_duplicates:.2f}%"],
    ["Total number of unique clonotypes", num_unique_clonotypes],
    ["Percentage of unique remaining clones", f"{percent_unique:.2f}%"],
    ["", ""],
    ["Total clones removed after filtering", num_removed],
    ["Total unique clones remaining", num_remaining],

]

    summary_df = pd.DataFrame(summary_stats, columns=["Metric", "Value"])

    # ---- Clone names with repeated clonotypes ----
    high_freq = df_merged[df_merged["Frequency"] > 1][["clone_name", "info", "Frequency"]].drop_duplicates()
    repeated_clones_df = high_freq.rename(columns={"clone_name": "Clone with Duplicates"})

    return df_processed[~filter_mask].copy(), df_merged[~filter_mask].copy(), summary_df, repeated_clones_df, duplicate_resolution_df

#-------------------------------------------------------------------------------------------------->

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    df_processed, df_merged, summary_df, repeated_clones_df, duplicate_resolution_df = merge_sanger_reads(df_reduced)
#<--------------------------------------------------------------------------------------------------

def process_merged_data(df_merged, keep_all_columns=False, save_csv=False, output_path="/content/drive/MyDrive/processed_data.csv", prefix=""):
    """
    Processes the merged Sanger sequencing DataFrame by formatting and filtering columns, and optionally saving the data as a CSV file.
    Allows adding a prefix to the clone name and sorting clone names numerically.

    """

    # ------------------- Sequence Processing ------------------

    #Define adaptor sequence (PaqCl + attP + Kozak + ATG), added extra "AA" to distinguish easier the start
    adaptor_seq_light = "AACACCTGCGCCCATCAGTGGTTTGTCTGGTCAACCACCGCGGACTCAGTGGTGTACGGTACAAACCCAGCCGCCACCATG"
    # Define P2A sequence
    P2A = "CTGCTGAAGCAGGCTGGAGACGTGGAAGAGAACCCCGGCCCC"
    #Define adaptor sequence (w PaqCl)
    adaptor_seq_heavy = "GGTAAGCAGGTGTT"

    #Remove "G" at the end of the heavy chain sequence since it doesnt belong to a codon
    df_processed = df_merged.copy()
    df_processed["processed_sanger_read_heavy"] = df_processed["processed_sanger_read_heavy"].apply(lambda x: x[:-1] if isinstance(x, str) and x.endswith("G") else x)
    df_processed["vh_dna"] = df_processed ["vh_dna"].apply(lambda x: x[:-1] if isinstance(x, str) and x.endswith("G") else x)

    # Construct full Sanger read with P2A sequence in between
    df_processed["full_sanger_read"] = adaptor_seq_light +  df_processed["processed_sanger_read_light"] + P2A + df_processed["processed_sanger_read_heavy"] + adaptor_seq_heavy

    #Format dataframe
    #df_final = df_processed.copy()
    df_processed = df_processed.drop(columns=[ 'processed_sanger_read_heavy', 'processed_sanger_read_light'])
    df_processed = df_processed.rename(columns={'full_sanger_read': 'insert_seq'})
    df_processed = df_processed[["clone_name", "insert_seq", "vh_dna", "vl_dna", "vh_aa", "vl_aa", "info"]]
    # Save DataFrame to CSV if requested
    if save_csv:
        df_processed.to_csv(output_path, index=False)

    return df_processed
#-------------------------------------------------------------------------------------------------->
df_merged_b = process_merged_data(df_processed)
#<--------------------------------------------------------------------------------------------------


# Visualize *Data*

In [None]:
#-------------------------------------------------------------------------------------------------->
# Define required columns
required_columns = ['v_family_heavy', 'd_family_heavy', 'j_family_heavy']
path_counts = df_merged.groupby(required_columns).size().reset_index(name='count')

# Extract unique gene families
v_genes = path_counts["v_family_heavy"].dropna().unique().tolist()
d_genes = path_counts["d_family_heavy"].dropna().unique().tolist()
j_genes = path_counts["j_family_heavy"].dropna().unique().tolist()
all_genes = v_genes + d_genes + j_genes
gene_idx = {gene: i for i, gene in enumerate(all_genes)}

# Initialize adjacency matrices
matrix_v_d = np.zeros((len(all_genes), len(all_genes)))
matrix_v_j = np.zeros((len(all_genes), len(all_genes)))
matrix_j_d = np.zeros((len(all_genes), len(all_genes)))

# Populate matrices
for _, row in path_counts.iterrows():
    v, d, j, count = row["v_family_heavy"], row["d_family_heavy"], row["j_family_heavy"], row["count"]
    if v in gene_idx and d in gene_idx:
        matrix_v_d[gene_idx[v], gene_idx[d]] += count
    if v in gene_idx and j in gene_idx:
        matrix_v_j[gene_idx[v], gene_idx[j]] += count
    if j in gene_idx and d in gene_idx:
        matrix_j_d[gene_idx[j], gene_idx[d]] += count

# Ensure symmetry
matrix_v_d += matrix_v_d.T
matrix_v_j += matrix_v_j.T
matrix_j_d += matrix_j_d.T

def filter_empty_genes(matrix, names):
    nonzero = np.any(matrix > 0, axis=0)
    return matrix[np.ix_(nonzero, nonzero)], [name for i, name in enumerate(names) if nonzero[i]]

matrix_v_d, names_v_d = filter_empty_genes(matrix_v_d, all_genes)
matrix_v_j, names_v_j = filter_empty_genes(matrix_v_j, all_genes)
matrix_j_d, names_j_d = filter_empty_genes(matrix_j_d, all_genes)

# Create plots
fig2, axes2 = plt.subplots(1, 3, figsize=(24, 8), dpi=600)
chord_diagram(matrix_v_d, names=names_v_d, sort="size", rotate_names=True, fontsize=8, cmap="twilight", ax=axes2[0])
axes2[0].set_title("V-D Genes", fontsize=12, y=1.1)

chord_diagram(matrix_v_j, names=names_v_j, sort="size", rotate_names=True, fontsize=8, cmap="twilight", ax=axes2[1])
axes2[1].set_title("V-J Genes", fontsize=12, y=1.1)

chord_diagram(matrix_j_d, names=names_j_d, sort="size", rotate_names=True, fontsize=8, cmap="twilight", ax=axes2[2])
axes2[2].set_title("J-D Genes", fontsize=12, y=1.1)

plt.tight_layout()

plt.show()

#fig2.savefig("chord_diagrams_light.png", dpi=500, bbox_inches='tight', format='png')

save_plot_to_memory(fig2, dpi=600)

#<--------------------------------------------------------------------------------------------------


In [None]:
#-------------------------------------------------------------------------------------------------->

def prepare_vj_matrix(df_light):
    light_path_counts = df_light.groupby(['v_family_light', 'j_family_light']).size().reset_index(name='count')

    v_genes = light_path_counts["v_family_light"].dropna().unique().tolist()
    j_genes = light_path_counts["j_family_light"].dropna().unique().tolist()
    all_genes = v_genes + j_genes

    gene_idx = {gene: i for i, gene in enumerate(all_genes)}
    matrix = np.zeros((len(all_genes), len(all_genes)))

    for _, row in light_path_counts.iterrows():
        v, j, count = row["v_family_light"], row["j_family_light"], row["count"]
        if v in gene_idx and j in gene_idx:
            matrix[gene_idx[v], gene_idx[j]] += count

    matrix = matrix + matrix.T  # Make symmetric

    # Filter out unused genes
    nonzero = np.any(matrix > 0, axis=0)
    filtered_matrix = matrix[np.ix_(nonzero, nonzero)]
    filtered_genes = [gene for i, gene in enumerate(all_genes) if nonzero[i]]

    return filtered_matrix, filtered_genes

# -----------------------
# Prepare data for IGK and IGL
# -----------------------
df_igk = df_merged[df_merged["locus_light"] == "IGK"]
df_igl = df_merged[df_merged["locus_light"] == "IGL"]

matrix_igk, genes_igk = prepare_vj_matrix(df_igk)
matrix_igl, genes_igl = prepare_vj_matrix(df_igl)

# Count number of IGK and IGL clones
num_igk = df_igk["clone_name"].nunique()
num_igl = df_igl["clone_name"].nunique()

print(f"Number of IGK (kappa) light chain clones: {num_igk}")
print(f"Number of IGL (lambda) light chain clones: {num_igl}")


# -----------------------
# Plot side-by-side chord diagrams
# -----------------------

fig3, axes3 = plt.subplots(1, 2, figsize=(24, 8))

# IGK
chord_diagram(matrix_igk, names=genes_igk, sort="size", rotate_names=True,
              fontsize=8, cmap="twilight", ax=axes3[0])
axes3[0].set_title("IGK Light Chain V–J Genes", fontsize=12, y=1.1)

# IGL
chord_diagram(matrix_igl, names=genes_igl, sort="size", rotate_names=True,
              fontsize=8, cmap="twilight", ax=axes3[1])
axes3[1].set_title("IGL Light Chain V–J Genes", fontsize=12, y=1.1)

plt.tight_layout()
plt.show()

#fig3.savefig("chord_diagrams_light.png", dpi=500, bbox_inches='tight', format='png')

save_plot_to_memory(fig3, dpi=600)
#<--------------------------------------------------------------------------------------------------

In [None]:
#-------------------------------------------------------------------------------------------------->
#  Export everything into Excel
output_excel_path = f"/content/drive/MyDrive/{output_csv_file}"

with pd.ExcelWriter(output_excel_path, engine='xlsxwriter') as writer:
    # Save DataFrames
    df_merged_b.to_excel(writer, sheet_name='Benchling_Data', index=False)
    df_processed.to_excel(writer, sheet_name='Processed_Data', index=False)
    summary_df.to_excel(writer, sheet_name='Summary', index=False)
    duplicate_resolution_df.to_excel(writer, sheet_name='Duplicate_Clones', index=False)

    # Create a worksheet for plots
    workbook = writer.book
    worksheet = workbook.add_worksheet("Plots")
    writer.sheets["Plots"] = worksheet

    # Insert each plot image
    row = 0
    for i, img in enumerate(plot_images):
      worksheet.insert_image(row, 0, f"plot_{i}.png", {'image_data': img})
      row += 40

print(f"Excel file successfully saved at: {output_excel_path}")

# OR Download the file LOCALLY
#from google.colab import files
#files.download(output_excel_path)
#<--------------------------------------------------------------------------------------------------

# Microsynth NightSeq Plate Ordering

In [None]:
import pandas as pd
from google.colab import files

# ----- CONFIGURATION -----
# Customize these components
antigen = "CTLA4" #@param {type:"string")
plate_id = "P2" #@param {type:"string")
primer_name = "PR171" #@param {type:"string")
primer_cat = "Custom" #@param {type:"string")
output_file_path = f"NightSeq_{plate_id}_{primer_name}_plate_order.xlsx" #@param {type:"string")

# 96-well layout (row-major: A1 to H12)
rows = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
cols = list(range(1, 13))
well_positions = [f"{row}{col}" for row in rows for col in cols]

# ----- FUNCTION DEFINITIONS -----
def reshape_row_major_to_column_order(sample_names):
    n_rows, n_cols = 8, 12
    matrix = [sample_names[i * n_cols:(i + 1) * n_cols] for i in range(n_rows)]
    return [matrix[row][col] for col in range(n_cols) for row in range(n_rows)]

def main():
    sample_names = [f"C{idx + 1}_{antigen}_{plate_id}_{well}" for idx, well in enumerate(well_positions)]
    primers = [primer_name] * len(sample_names)
    primer_type = [primer_cat] * len(sample_names)

    samples_reordered = reshape_row_major_to_column_order(sample_names)
    primers_reordered = reshape_row_major_to_column_order(primers)
    primer_type_recorded = reshape_row_major_to_column_order(primer_type)

    df = pd.DataFrame(zip(samples_reordered, primers_reordered, primer_type_recorded))

    with pd.ExcelWriter(output_file_path) as writer:
        df.to_excel(writer, sheet_name="Plate", index=False, header=False)

    print(f"Order Layout saved to {output_file_path}")
    return output_file_path

# Run the script
output_file = main()
files.download(output_file)

# Check for clone overlap with the previous cov2 dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
import pandas as pd

# Step 1: Load files from Google Drive
xlsx_path = "/content/drive/MyDrive/CoV2_RBD_high_binders"  # Update this
csv_path = "/content/drive/Shared drives/BIIE_PROGRAM-LABS/DEEPIR/3_RESEARCH/IRP-110_MAMMALIAN-AB/1_PILOT-PHASE/2_DATA-FILES/Sanger_Seq_Clones/4318714_4314914/df_benchling_202.csv"      # Update this

df_xlsx = pd.read_excel(xlsx_path)
df_csv = pd.read_csv(csv_path)

# Step 2: Extract 'info' columns and drop any missing entries
info_xlsx = df_xlsx["info"].dropna().astype(str).str.strip()
info_csv = df_csv["info"].dropna().astype(str).str.strip()

# Step 3: Find common gene combinations
common_genes = set(info_xlsx).intersection(set(info_csv))

# Step 4: Output result
if common_genes:
    print("✅ Common gene combinations found:")
    for gene in common_genes:
        print(gene)
else:
    print("❌ No identical gene combinations found in 'info' columns.")


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import re
from collections import Counter
# --- Define a function to extract gene parts from info string ---
def extract_genes(info_series):
    gene_patterns = {
        "IGHV": r"IGHV[^\s/]+",
        "IGHD": r"IGHD[^\s/]+",
        "IGHJ": r"IGHJ[^\s/]+",
        "IGKV": r"IGKV[^\s/]+",
        "IGKJ": r"IGKJ[^\s/]+"
    }
    gene_counts = {k: [] for k in gene_patterns}

    for entry in info_series:
        for gene, pattern in gene_patterns.items():
            matches = re.findall(pattern, entry)
            gene_counts[gene].extend(matches)
    return {k: Counter(v) for k, v in gene_counts.items()}

# --- Extract gene counts from both files ---
counts_xlsx = extract_genes(info_xlsx)
counts_csv = extract_genes(info_csv)

# --- Plotting ---
for gene_type in counts_xlsx:
    combined_keys = set(counts_xlsx[gene_type]) | set(counts_csv[gene_type])
    data = {
        "Gene": list(combined_keys),
        "cov2_high_binders": [counts_xlsx[gene_type].get(g, 0) for g in combined_keys],
        "cov2_binders": [counts_csv[gene_type].get(g, 0) for g in combined_keys]
    }
    df_plot = pd.DataFrame(data).sort_values("cov2_high_binders", ascending=False)

    # Plot
    plt.figure(figsize=(10, 5))
    bar_width = 0.4
    x = range(len(df_plot))

    plt.bar([i - bar_width/2 for i in x], df_plot["cov2_high_binders"], width=bar_width, label="cov2_high_binders", color='blue', alpha=0.7)
    plt.bar([i + bar_width/2 for i in x], df_plot["cov2_binders"], width=bar_width, label="cov2_binders", color='orange', alpha=0.7)

    plt.xticks(ticks=x, labels=df_plot["Gene"], rotation=45, ha='right')
    plt.title(f"Frequency of {gene_type} Genes")
    plt.ylabel("Count")
    plt.legend()
    plt.tight_layout()
    plt.show()

# Create an input file for QPix to select out the clones to be rearranged

In [None]:
plate = "PLATE: 1" #@param {type:"string"}
barcode = "BARCODE: 2" #@param {type:"string"}
type = "TYPE: SOURCE" #@param {type:"string"}
plate_type = "PLATETYPE: Nunc 96 Deep Well" #@param {type:"string"}

In [None]:
from google.colab import files

# Extract well names from the last part of "clone_name"
df_merged_b["well"] = df_merged_b["clone_name"].str.extract(r'_([^_]+)$')

# Extract unique well names
well_names = df_merged_b["well"].dropna().unique().tolist()

# Prepare content for the file
output_lines = [
    "PLATE: 1",
    "BARCODE: 2",
    "TYPE: SOURCE",
    "PLATETYPE: Nunc 96 Deep Well",
    "SUBTYPE:"
] + well_names


output_file = '/content/QPix_plate_layout.frd'

# Write the file content
with open(output_file, "w") as f:
    f.writelines('\n'.join(output_lines))

# Trigger download
files.download(output_file)


In [None]:
from google.colab import files

# Extract well names from the last part of "clone_name"
df_merged_b["well"] = df_merged_b["clone_name"].str.extract(r'_([^_]+)$')

# Extract unique well names
well_names = df_merged_b["well"].dropna().unique().tolist()

# Prepare content for the file
output_lines = [
    "PLATE: 1",
    "BARCODE: 2",
    "TYPE: SOURCE",
    "PLATETYPE: Nunc 96 Deep Well",
    "SUBTYPE: "
] + well_names

output_file = "/content/QPix_plate_layout.frd"

# Write with CR-LF line endings **and** a trailing newline
content = "\r\n".join(output_lines) + "\r\n"        # <-- extra CRLF at the end
with open(output_file, "wb") as f:                  # binary mode
    f.write(content.encode("utf-8"))                # explicit UTF-8, no BOM

# Trigger download
files.download(output_file)

In [None]:
from google.colab import files

# Extract unique well names
well_names = ["A1", "A2", "A3"]

# Prepare content for the file
output_lines = [
    "PLATE: 1",
    "BARCODE: 2",
    "TYPE: SOURCE",
    "PLATETYPE: Nunc 96 Deep Well",
    "SUBTYPE: "
] + well_names

output_file = "/content/QPix_plate_layout.frd"

# Write with CR-LF line endings **and** a trailing newline
content = "\r\n".join(output_lines) + "\r\n"        # <-- extra CRLF at the end
with open(output_file, "wb") as f:                  # binary mode
    f.write(content.encode("utf-8"))                # explicit UTF-8, no BOM

# Trigger download
files.download(output_file)