In [None]:
import pandas as pd
import numpy as np
from scipy.stats import norm

# Load reference BED file
ref_qc = pd.read_csv('/home/nick/Documents/TRIAL/CODES-FOR-GRAPHS/TRYS/my-pipeline/NEWTAR.bed', 
                     sep='\t', header=0)

# Load data file
NEWAPP = pd.read_csv('/home/nick/Documents/TRIAL/CODES-FOR-GRAPHS/TRYS/my-pipeline/Removalofzeros.bed',
                     sep='\t', header=None)

# Normalization - Z-Score
reads = NEWAPP[4]  # V5 in R is index 4 in Python (0-based)
expected_reads = reads.mean()
Z = (reads - expected_reads) / reads.std()

# Emission probabilities
emission1 = norm.pdf(Z, -0.05, 1)
emission2 = norm.pdf(Z, 0, 1)
emission3 = norm.pdf(Z, 0.05, 1)

# Avoid log(0)
emission1[emission1 == 0] = emission1[emission1 > 0].min()
emission2[emission2 == 0] = emission2[emission2 > 0].min()
emission3[emission3 == 0] = emission3[emission3 > 0].min()

# Transition probabilities
transition_probs = np.array([[0.9, 0.05, 0.05],
                             [0.05, 0.9, 0.05],
                             [0.05, 0.05, 0.9]])

# Viterbi algorithm
n = len(Z)
v = np.full((n, 3), -np.inf)
pointer = np.full((n + 1, 3), np.nan)

# Initialization
v[0, :] = np.log([emission1[0], emission2[0], emission3[0]]) + np.log(1/3)

# Recursion
for i in range(1, n):
    for j in range(3):
        transition_logs = v[i - 1] + np.log(transition_probs[:, j])
        v[i, j] = np.log([emission1[i], emission2[i], emission3[i]][j]) + np.max(transition_logs)
        pointer[i, j] = np.argmax(transition_logs)

# Termination
traceback = int(np.argmax(v[n-1]))
pointer[n, 0] = traceback + 1

# Traceback
result = [None] * n
for i in range(n-1, 0, -1):
    if traceback == 0:
        result[i-1] = 'deletion'
    elif traceback == 1:
        result[i-1] = 'neutral'
    else:
        result[i-1] = 'duplication'
    traceback = int(pointer[i, traceback])

print("CNV calls:", result)

# Output CNV segments
delIndex = [i for i, x in enumerate(result) if x == "deletion"]
dupIndex = [i for i, x in enumerate(result) if x == "duplication"]

if len(delIndex) + len(dupIndex) == 0:
    finalcalls = None
else:
    stDel = [i for i in delIndex if i + 1 not in delIndex]
    edDel = [i for i in delIndex if i - 1 not in delIndex]
    stDup = [i for i in dupIndex if i + 1 not in dupIndex]
    edDup = [i for i in dupIndex if i - 1 not in dupIndex]

    st_exon = stDel + stDup
    ed_exon = edDel + edDup
    cnv_type = ['del'] * len(stDel) + ['dup'] * len(stDup)

    st_bp = ref_qc.iloc[st_exon]['start'].values
    ed_bp = ref_qc.iloc[ed_exon]['end'].values
    chr_vals = ref_qc.iloc[st_exon]['chr'].values
    length_kb = (ed_bp - st_bp) / 1000

    finalcalls = pd.DataFrame({
        'cnv': cnv_type,
        'st_bp': st_bp,
        'ed_bp': ed_bp,
        'chr': chr_vals,
        'length_kb': length_kb
    })

    finalcalls.to_csv('/home/nick/Documents/TRIAL/CODES-FOR-GRAPHS/TRYS/my-pipeline/Viterbi-Code/my-vit-algorithm-using-for-thesis/resultsFIN.csv', index=False)


In [None]:
#python module
import pandas as pd
import numpy as np
from scipy.stats import norm

def run_viterbi_cnv(ref_bed_path, input_reads_path, output_csv_path):
    # Load reference BED file
    ref_qc = pd.read_csv(ref_bed_path, sep='\t', header=0)

    # Load read data
    NEWAPP = pd.read_csv(input_reads_path, sep='\t', header=None)

    # Z-score normalization
    reads = NEWAPP[4]
    expected_reads = reads.mean()
    Z = (reads - expected_reads) / reads.std()

    # Emission probabilities
    emission1 = norm.pdf(Z, -0.05, 1)
    emission2 = norm.pdf(Z, 0, 1)
    emission3 = norm.pdf(Z, 0.05, 1)

    # Avoid log(0)
    emission1[emission1 == 0] = emission1[emission1 > 0].min()
    emission2[emission2 == 0] = emission2[emission2 > 0].min()
    emission3[emission3 == 0] = emission3[emission3 > 0].min()

    # Transition probabilities
    transition_probs = np.array([[0.9, 0.05, 0.05],
                                 [0.05, 0.9, 0.05],
                                 [0.05, 0.05, 0.9]])

    # Viterbi algorithm
    n = len(Z)
    v = np.full((n, 3), -np.inf)
    pointer = np.full((n + 1, 3), np.nan)
    v[0, :] = np.log([emission1[0], emission2[0], emission3[0]]) + np.log(1 / 3)

    for i in range(1, n):
        for j in range(3):
            transition_logs = v[i - 1] + np.log(transition_probs[:, j])
            v[i, j] = np.log([emission1[i], emission2[i], emission3[i]][j]) + np.max(transition_logs)
            pointer[i, j] = np.argmax(transition_logs)

    traceback = int(np.argmax(v[n - 1]))
    pointer[n, 0] = traceback + 1

    result = [None] * n
    for i in range(n - 1, 0, -1):
        if traceback == 0:
            result[i - 1] = 'deletion'
        elif traceback == 1:
            result[i - 1] = 'neutral'
        else:
            result[i - 1] = 'duplication'
        traceback = int(pointer[i, traceback])

    # CNV segment identification
    delIndex = [i for i, x in enumerate(result) if x == "deletion"]
    dupIndex = [i for i, x in enumerate(result) if x == "duplication"]

    if len(delIndex) + len(dupIndex) == 0:
        print("No CNVs detected.")
        return

    stDel = [i for i in delIndex if i + 1 not in delIndex]
    edDel = [i for i in delIndex if i - 1 not in delIndex]
    stDup = [i for i in dupIndex if i + 1 not in dupIndex]
    edDup = [i for i in dupIndex if i - 1 not in dupIndex]

    st_exon = stDel + stDup
    ed_exon = edDel + edDup
    cnv_type = ['del'] * len(stDel) + ['dup'] * len(stDup)

    st_bp = ref_qc.iloc[st_exon]['start'].values
    ed_bp = ref_qc.iloc[ed_exon]['end'].values
    chr_vals = ref_qc.iloc[st_exon]['chr'].values
    length_kb = (ed_bp - st_bp) / 1000

    finalcalls = pd.DataFrame({
        'cnv': cnv_type,
        'st_bp': st_bp,
        'ed_bp': ed_bp,
        'chr': chr_vals,
        'length_kb': length_kb
    })

    finalcalls.to_csv(output_csv_path, index=False)
    print(f"CNV results saved to: {output_csv_path}")

# If running directly
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Run CNV detection with Viterbi algorithm.")
    parser.add_argument("--ref", required=True, help="Path to the reference BED file")
    parser.add_argument("--input", required=True, help="Path to the input read counts BED file")
    parser.add_argument("--output", required=True, help="Path to output CSV file for CNV calls")

    args = parser.parse_args()
    run_viterbi_cnv(args.ref, args.input, args.output)


In [None]:
#Command line usage
python viterbi_cnv.py --ref /path/to/NEWTAR.bed \
                      --input /path/to/Removalofzeros.bed \
                      --output /path/to/resultsFIN.csv

In [None]:
#if used in another Python script:

from viterbi_cnv import run_viterbi_cnv

run_viterbi_cnv("data/NEWTAR.bed", "data/Removalofzeros.bed", "output/resultsFIN.csv")