In [121]:
import os
import subprocess
import math
from pathlib import Path
import shutil
import numpy as np
import random
import copy
from scipy.stats import pearsonr
from pprint import pprint
from argparse import ArgumentParser

In [122]:
# Load the functions of smm_gradient_descent just once

def encode(peptides, encoding_scheme, alphabet):
    
    encoded_peptides = []

    for peptide in peptides:

        encoded_peptide = []

        for peptide_letter in peptide:

            for alphabet_letter in alphabet:

                encoded_peptide.append(encoding_scheme[peptide_letter][alphabet_letter])

        encoded_peptides.append(encoded_peptide)
        
    return np.array(encoded_peptides)

def gradient_descent(y_pred, y_target, peptide, weights, lamb_N, epsilon):
    
    # do is dE/dO
    do = y_pred - y_target
        
    for i in range(0, len(weights)):
        
        de_dw_i = do * peptide[i] + (2 * lamb_N * weights[i] * peptide[i])

        weights[i] -= epsilon * de_dw_i
        
def vector_to_matrix(vector, alphabet):
    
    rows = int(len(vector)/len(alphabet))
    
    matrix = [0] * rows
    
    offset = 0
    
    for i in range(0, rows):
        
        matrix[i] = {}
        
        for j in range(0, 20):
            
            matrix[i][alphabet[j]] = vector[j+offset] 
        
        offset += len(alphabet)

    return matrix

def to_psi_blast(matrix):

    # print to user
    
    header = ["", "A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V"]

    print('{:>4} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8}'.format(*header)) 

    letter_order = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V"]

    for i, row in enumerate(matrix):

        scores = []

        scores.append(str(i+1) + " A")

        for letter in letter_order:

            score = row[letter]

            scores.append(round(score, 4))

        print('{:>4} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8}'.format(*scores)) 

In [128]:
# Load the functions of pep2score just once
def initialize_matrix(peptide_length, alphabet):

    init_matrix = [0]*peptide_length

    for i in range(0, peptide_length):

        row = {}

        for letter in alphabet: 
            row[letter] = 0.0

        #fancy way:  row = dict( zip( alphabet, [0.0]*len(alphabet) ) )

        init_matrix[i] = row
        
    return init_matrix

def from_psi_blast(file_name):

    f = open(file_name, "r")
    
    nline = 0
    for line in f:
    
        sline = str.split( line )
        
        if nline == 0:
        # recover alphabet
            alphabet = [str]*len(sline)
            for i in range(0, len(sline)):
                alphabet[i] = sline[i]
                
            matrix = initialize_matrix(peptide_length, alphabet)
        
        else:
            i = int(sline[0])
            
            for j in range(2,len(sline)):
                matrix[i-1][alphabet[j-2]] = float(sline[j])
                
        nline+= 1
            
    return matrix

def score_peptide(peptide, matrix):
    acum = 0
    for i in range(0, len(peptide)):
        acum += matrix[i][peptide[i]]
    return acum


In [129]:
# --- Configuration ---
RDIR = "/home/luis_ubuntu/unixdir/Peptide_Binding/smm-final/"
DDIR = "/home/luis_ubuntu/unixdir/Peptide_Binding/Data"

alleles = ["A0101"]
lambdas = [0.02, 0.08, 0.1]
epis = [0.01, 0.04]
folds = range(5)

In [130]:
# --- Pearson correlation calculator ---
def pearson_from_pairs(pairs):
    n = len(pairs)
    if n == 0:
        return 0.0, float("inf")
    
    x = [p[0] for p in pairs]
    y = [p[1] for p in pairs]
    
    x0 = sum(x) / n
    y0 = sum(y) / n
    
    t = nx = ny = err = 0.0
    for i in range(n):
        dx = x[i] - x0
        dy = y[i] - y0
        t += dx * dy
        nx += dx * dx
        ny += dy * dy
        err += (x[i] - y[i]) ** 2
    
    if nx * ny == 0:
        pcc = 0.0
    else:
        pcc = t / math.sqrt(nx * ny)
    
    mse = err / n
    return pcc, mse

In [126]:
def make_and_enter(dir_path):
    path = Path(dir_path)
    path.mkdir(exist_ok=True)
    os.chdir(path)

def concat_train_files(n_list, allele, DDIR, held_out_fold):
    output_file = Path(f"conc_f00{held_out_fold}")
    if output_file.exists():
        return output_file  # Return existing file without rewriting
    
    with open(output_file, "w") as outfile:
        for n in n_list:
            file_path = Path(f"{DDIR}/{allele}/c00{n}")
            with open(file_path) as infile:
                outfile.writelines(infile.readlines())

def run_training_and_evaluation(RDIR, train_file, eval_file, mat_file, pred_file, _lambda, _epsilon):
    # Run training
    if not Path(mat_file).exists():
        with open(mat_file, "w") as fout:
            subprocess.run(
                [
                    "python", f"{RDIR}/smm_gradient_descent.py",
                    "-l", str(_lambda),
                    "-epi", str(_epsilon),
                    "-t", train_file,
                    "-e", eval_file
                ],
                stdout=fout,
                stderr=subprocess.DEVNULL,
                env={**os.environ, "QT_QPA_PLATFORM": "offscreen"}
            )

    # Run evaluation
    if not Path(pred_file).exists():
        with open(pred_file, "w") as fout:
            subprocess.run(
                [
                    "python", f"{RDIR}/pep2score.py",
                    "-mat", mat_file,
                    "-f", eval_file
                ],
                stdout=fout,
                stderr=subprocess.DEVNULL,
                env={**os.environ, "QT_QPA_PLATFORM": "offscreen"}
            )

def collect_outer_preds(allele):
    preds = []
    preds_for_file = []
    for path in Path('.').rglob('*.outer_pred'):
        with open(path) as pf:
            for line in pf:
                if "#" not in line and line.strip():
                    try:
                        parts = line.strip().split()
                        preds_for_file.append(f"{parts[0]} {parts[1]} {parts[2]}\n")
                    except:
                        continue

    # Save all collected predictions to a single file
    output_file = Path(f"{allele}_prediction")
    with open(output_file, "w") as out:
        out.writelines(preds_for_file)
    
    print(f"Saved {len(preds_for_file)} predictions to {output_file}")

In [127]:
os.chdir(RDIR)
# --- Loop over all alleles ---
for allele in alleles:
    make_and_enter(f"{allele}.res")

    # --- Outer Validation loop ---
    for n in folds:
        inner_loop_files = [0, 1, 2, 3, 4]
        inner_loop_files.remove(n)
    
        make_and_enter(f"validation_set_{n}")

        # copy the validation files and create the new concatenated training files
        for m in inner_loop_files:
            # Copy the evaluation file
            eval_file = f"{DDIR}/{allele}/c00{m}"
            shutil.copy(eval_file, f"c00{m}")
            # Create the concatenated trining files
            train_files = inner_loop_files.copy()
            train_files.remove(m)

            concat_train_files(train_files, allele, DDIR, m)
        
        best_pcc = -1000
        best_model = ""
        best_lambda = ""
        best_epsilon = ""
        
    # --- Hyperparameters loop ---    
        for l in lambdas:
            make_and_enter(f"l.{l}")
                
            for epi in epis:
                make_and_enter(f"epi.{epi}")
    
                preds = []

                # --- Inner CV loop ---
                for m in inner_loop_files:
                    
                    # Define the files to start running
                    mat_file = f"mat.{m}"
                    pred_file = f"c00{m}.pred"
                    
                    eval_file = f"../../c00{m}"
                    train_file = f"../../conc_f00{m}"
                    
                    # Run training and evalutation

                    run_training_and_evaluation(RDIR, train_file, eval_file, mat_file=mat_file, pred_file=pred_file, _lambda=l, _epsilon=epi)
                    
                    with open(pred_file) as pf:
                        for line in pf:
                            if "#" not in line and line.strip():
                                try:
                                    parts = line.strip().split()
                                    preds.append((float(parts[1]), float(parts[2])))
                                except:
                                    continue
                
                # Compute PCC and MSE
                pcc, mse = pearson_from_pairs(preds)
                eval_output = f"{allele}; Outer validation set = {n}, lambda = {l} epsilon = {epi} PCC {pcc:.5f} MSE {mse:.5f}"
                print(eval_output)

                if pcc > best_pcc:
                    best_pcc = pcc
                    best_lambda = l
                    best_epsilon = epi
                    best_model = f"lambda {best_lambda} epsilon {best_epsilon}"
                    
                
                # Leave epi.{epi}
                os.chdir("..")
        
            # Leave l.{l}
            os.chdir("..")

        # Final result
        print("\nBest model for allele", allele,"and Outer validation set '", n, "', :", best_model, "with correlation", f"{best_pcc:.5f}\n")

        # Copy the final evaluation file
        final_eval_file = f"{DDIR}/{allele}/c00{n}"
        shutil.copy(final_eval_file, f"final_evaluation_c00{n}")
        
        # Copy the full training file to use when the best HP are chosen
        full_train_file = f"{DDIR}/{allele}/f00{n}"
        shutil.copy(full_train_file, f"final_training_f00{n}")

        # Define the files to start running
        mat_file = f"mat.{n}"
        pred_file = f"c00{n}.outer_pred"
                    
        eval_file = f"final_evaluation_c00{n}"
        train_file = f"final_training_f00{n}"

        # Run training and evaluation
        run_training_and_evaluation(RDIR, train_file, eval_file, mat_file=mat_file, pred_file=pred_file, _lambda=best_lambda, _epsilon=best_epsilon)
        
        # Leave validation_set_{n}
        os.chdir("..")

    # Concatenate predictions for allele and get the final measure
    collect_outer_preds(allele)

    pairs = []
    with open(f"{allele}_prediction") as f:
        for line in f:
            parts = line.strip().split()
            try:
                x = float(parts[1])  # second column
                y = float(parts[2])  # third column
                pairs.append((x, y))
            except (IndexError, ValueError):
                continue
                
    pcc, mse = pearson_from_pairs(pairs)
    eval_output = f"Final prediciton for allele:{allele}; PCC {pcc:.5f} MSE {mse:.5f}"
    print(eval_output)
    
    # Leave {allele}.res
    os.chdir("..")
                    

A0101; Outer validation set = 0, lambda = 0.02 epsilon = 0.01 PCC 0.59812 MSE 0.01912
A0101; Outer validation set = 0, lambda = 0.02 epsilon = 0.04 PCC 0.56607 MSE 0.02066
A0101; Outer validation set = 0, lambda = 0.08 epsilon = 0.01 PCC 0.59816 MSE 0.01911
A0101; Outer validation set = 0, lambda = 0.08 epsilon = 0.04 PCC 0.56611 MSE 0.02066


KeyboardInterrupt: 

In [None]:
# --- Main loop ---
for allele in alleles:
    allele_dir = Path(f"{allele}.res")
    allele_dir.mkdir(exist_ok=True)
    os.chdir(allele_dir)

    for l in lambdas:
        l_dir = Path(f"l.{l}")
        l_dir.mkdir(exist_ok=True)
        os.chdir(l_dir)

        for epi in epis:
            epi_dir = Path(f"epi.{epi}")
            epi_dir.mkdir(exist_ok=True)
            os.chdir(epi_dir)

            preds = []

            for n in folds:
                mat_file = f"mat.{n}"
                pred_file = f"c00{n}.pred"
                train_file = f"{DDIR}/{allele}/f00{n}"
                eval_file = f"{DDIR}/{allele}/c00{n}"

                # Run training
                if not Path(mat_file).exists():
                    subprocess.run(
                        ["python", f"{RDIR}/smm_gradient_descent.py",
                         "-l", str(l), "-epi", str(epi), "-t", train_file, "-e", eval_file],
                        stdout=open(mat_file, "w"),
                        stderr=subprocess.DEVNULL,
                        env={**os.environ, "QT_QPA_PLATFORM": "offscreen"}
                    )

                # Run evaluation
                if not Path(pred_file).exists():
                    with open(pred_file, "w") as fout:
                        subprocess.run(
                            ["python", f"{RDIR}/pep2score.py",
                             "-mat", mat_file, "-f", eval_file],
                            stdout=fout,
                            stderr=subprocess.DEVNULL,
                            env={**os.environ, "QT_QPA_PLATFORM": "offscreen"}
                        )

                # Parse predictions
                with open(pred_file) as pf:
                    for line in pf:
                        if "#" not in line and line.strip():
                            try:
                                parts = line.strip().split()
                                preds.append((float(parts[1]), float(parts[2])))
                            except:
                                continue

            # Compute PCC and MSE
            pcc, mse = pearson_from_pairs(preds)
            eval_output = f"{allele} lambda {l} epsilon {epi} PCC {pcc:.5f} MSE {mse:.5f}"
            print(eval_output)

            if pcc > best_pcc:
                best_pcc = pcc
                best_model = f"{allele} lambda {l} epsilon {epi}"

            os.chdir("..")  # up from epi.X

        os.chdir("..")  # up from l.X

    os.chdir("..")  # up from allele.res

# Final result
print("\nBest model:", best_model, "with correlation", f"{best_pcc:.5f}\n")