In [1]:
import unittest
import pandas as pd
import numpy as np
import mysql.connector 
import csv
from tqdm import tqdm
import os
import json
import sys
from datetime import datetime

In [2]:
################################################################################$$##########
# Reading information from json file. Used to extract the parameters from the `config.json`.
def read_json(path:str = "config.json") -> dict:
    """
    path : str -> path of the json file
    """

    with open('config.json') as config:
        config_f = json.load(config)

    return config_f


In [3]:
#####################################################
# Creating folder according to the and program scheme
def create_folders(req_folders : list = ["temp_data", "tms_input", "reports"]):
    """
    req_folders : str -> required folders path, if subfolder exsits input '\\' between folders.
    """

    for folder in req_folders:
        if os.path.exists(folder) is False:
            os.mkdir(folder)
            print(f"> folder `{folder}` was created.")

        else:
            print(f"> folder `{folder}` exists, continuing.")

In [4]:
#####################
# sql connector class
class mysql_connector():

    # Getting the connection information from the config
    def __init__(self):
        self.conn_cred = read_json()["sql"]

    # Setting the sql connection
    def setup_conn(self):
        try:
            self.sql_conn = mysql.connector.connect(
                                                    host=self.conn_cred["adress"],
                                                    user=self.conn_cred["username"],
                                                    passwd=self.conn_cred["password"],
                                                    auth_plugin='mysql_native_password',
                                                    )
            print("> Established connection to the MySQL server.")
            
            return self.sql_conn

        except:
            raise Exception("> Failed to establish connection to the MySQL server!")
        
    # Closing the sql connection
    def close_conn(self):
        self.sql_conn.close()
        print("> Connection to the MySQL was closed.")

In [5]:
############################
# Running Pipeline with test
def run_pipeline(test_pipeline):
    """
    test_pipeline -> the pipeline uninitited unittest pipeline we want to run
    """
    
    # unitest  loader object
    loader = unittest.TestLoader()
    # Load tests from the specific class
    suite = loader.loadTestsFromTestCase(test_pipeline) 
    
    # Run with high verbosity for detail
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(suite)
    
    # Custom detailed summary
    print("\n--- PIPELINE EXECUTION SUMMARY ---")
    if result.wasSuccessful():
        print("Final Status: SUCCESS V")
    else:
        print(f"Final Status: FAILED X ({len(result.failures) + len(result.errors)} issues found)")

In [32]:
from source.helpers import protein

class PipelinePreprocessingTest(unittest.TestCase):

    # Class initiation, 
    @classmethod
    def setUpClass(cls):
        print("--- Initializing Preprocessing Pipeline Environment ---")

        # Importing config information
        cls.config_sql = read_json()["sql"]
        cls.config_db = read_json()["database"]
        cls.db_name = cls.config_db["db_name"]
        cls.time_start = datetime.now()
        cls.subjects = list(cls.config_db["subject_id"])
        

        # Cheecking if database assosiated talbes exsists
        create_folders()
        create_folders(req_folders=[f"{i}//{cls.db_name}" for i in ["temp_data", "tms_input", "reports"]])

        # Setting paths
        cls.path_temp = f"temp_data//{cls.db_name}"
        cls.path_final = f"tms_input//{cls.db_name}"


    # Cheecking if the the step already processed the data for this step
    # For each subject
    def test_01_translate_convert_extract(self):
        print(self.subjects)

        for subject_i in self.subjects:
            toFile = self.path_temp + "1_{}_{}.csv".format(self.db_name, subject_i)
            toFile1 = self.path_temp+"1_AA_{}_{}.csv".format(self.db_name, subject_i)
            toFile2 = self.path_temp+"1_{}_{}_seqK.csv".format(self.db_name, subject_i)

            bool_files = [os.path.exists(i) for i in [toFile, toFile1, toFile2]]

            if sum(bool_files) == 3:
                print("Step No.1 of the preprocessing already done, continuing to step 2.")
            
            else:
                # Setting up sql connection
                
                mydb = mysql.connector.connect(
                                                host=self.config_sql["adress"],
                                                user=self.config_sql["username"],
                                                passwd=self.config_sql["password"],
                                                auth_plugin='mysql_native_password',
                                               )
                cmd = f"SELECT seq.*, coll.* FROM {self.db_name}.sequences AS seq INNER JOIN {self.db_name}.sequence_collapse AS coll ON seq.ai=coll.seq_ai WHERE seq.subject_id={int(subject_i)} AND seq.functional=1 AND coll.instances_in_subject !=0 AND coll.copy_number_in_subject > 1 AND seq.deletions is null AND seq.insertions is null"
                mycursor = mydb.cursor(dictionary=True)

                removed ="rem.csv"

                #Command for getting the sequences translated:
                command = (cmd)
                print("Translating starts ....")     
                mycursor.execute(command)
                seq = mycursor.fetchall()
                with open(toFile, 'w',newline='') as new_file:
                    csv_writer = csv.writer(new_file)
                    csv_writer.writerow(['seq_id','sequence','TranslatedSeq','TranslatedGermline','ai','subject_id','clone_id','sample_id'])
                    for line in tqdm(seq):
                            dna=""
                            germ=""
                            protein_sequence=""
                            #fix the germline to match the cdr with N's
                            germ=line['germline']
                            cdr3Length=line['cdr3_num_nts']
                            postCDR=line['post_cdr3_length']
                            x=int(cdr3Length)+int(postCDR)
                            replaced=germ[-x:]
                            replaced=replaced.replace('-','N',x) 
                            germ=germ.replace(germ[-x:],replaced)
                            # -------------- To NNNNNNNNNNNNNN #
                            dna=line['sequence']
                            seqID=line['seq_id']
                            ai =line['ai']
                            cloneID=line['clone_id']
                            sampleID=line['sample_id']
                            subjectID=line['subject_id']
                            # Generate protein sequence
                            for i in range(0, len(dna)-(len(dna)%3), 3):
                                if dna[i] == "N" and dna[i+1] == "N" and dna[i+2] == "N":
                                    protein_sequence += "x"
                                elif dna[i] == "N" or dna[i+1] == "N" or dna[i+2] == "N" and dna[i] != "-" and dna[i+1] != "-" and dna[i+2] != "-":
                                    protein_sequence += "x"
                                elif dna[i] == "-" and dna[i+1] == "-" and dna[i+2] == "-":
                                    protein_sequence += protein[dna[i:i+3]]
                                elif dna[i] == "-" or dna[i+1] == "-" or dna[i+2] == "-":
                                    i=i+0;
                                else:
                                    protein_sequence += protein[dna[i:i+3]]
                            germProtein_sequence=""
                            for i in range(0, len(germ)-(len(germ)%3), 3):
                                if germ[i] == "N" and germ[i+1] == "N" and germ[i+2] == "N":
                                    germProtein_sequence += "x"
                                elif germ[i] == "N" or germ[i+1] == "N" or germ[i+2] == "N" and germ[i] != "-" and germ[i+1] != "-" and germ[i+2] != "-":
                                    germProtein_sequence += "x"
                                elif germ[i] == "-" and germ[i+1] == "-" and germ[i+2] == "-":
                                    germProtein_sequence += protein[germ[i:i+3]]
                                elif germ[i] == "-" or germ[i+1] == "-" or germ[i+2] == "-":
                                    i=i+0;
                                else:
                                    germProtein_sequence += protein[germ[i:i+3]]
                            csv_writer.writerow([seqID,dna,protein_sequence,germProtein_sequence,ai,subjectID,cloneID,sampleID])
                    print("Tranlating DONE!")
                
                #Matrix for the mutations
                def build_matrix(rows, cols):
                    matrix = []
                    for r in range(0, rows):
                        matrix.append([0 for c in range(0, cols)])
                    return matrix
                #Mutation function
                def mutatedFunc(seqAA,germAA):
                    global flag
                    flag=0
                    vec=build_matrix(2, len(seqAA))
                    if len(seqAA)!=len(germAA):
                        csv_writer1.writerow([seqAA,germAA])
                        flag=1
                    else:
                        for i in range(0,len(seqAA),1):
                            vec[0][i]=i+1
                        # print(seqAA[i],germAA[i])
                            if seqAA[i]!=germAA[i] and seqAA[i]!= "x" and seqAA[i]!="-" and germAA[i]!= "x" and germAA[i]!="-" and seqAA[i]!="*" and germAA[i]!="*":
                                vec[1][i]=1
                    return vec
                
                print("AA-mutations starts ....")     
                with open(toFile,'r') as csv_file:
                    csv_reader = csv.DictReader(csv_file)
                    with open(toFile1, 'w',newline='') as new_file ,open(removed, 'w',newline='') as nfile:
                        csv_writer = csv.writer(new_file)
                        csv_writer.writerow(['ai','sequence','seq_id','translatedSeq','translatedGerm','vector','subject_id','clone_id','sample_id'])
                        csv_writer1 = csv.writer(nfile)
                        csv_writer1.writerow(['translatedSeq','translatedGerm'])

                        for line in (csv_reader):
                            seq=(line['sequence'])
                            seqID=(line['seq_id'])
                            ai =(line['ai'])
                            seqAA=(line['TranslatedSeq'])
                            germAA=(line['TranslatedGermline'])
                            cloneID=(line['clone_id'])
                            sampleID=(line['sample_id'])
                            subjectID=(line['subject_id'])
                            vec1=mutatedFunc(seqAA, germAA)
                            vector=[]
                            if flag != 1: 
                                for i in range(len(vec1[0])):
                                    if vec1[1][i]==1:
                                        vector.append(i+1)
                                csv_writer.writerow([ai,seq,seqID,seqAA,germAA,vector,subjectID,cloneID,sampleID])
                    print("AA-mutations DONE!")     
                
                def kmersFunc(AA,k):
                    global start
                    start=0
                    p=0
                    kmer=""
                    x=1
                    s=1
                    while(x!=20):
                        if AA[-s] != "-":
                            x+=1
                            s+=1
                        else:
                            s+=1
                
                    for i in range(0,len(AA),1):
                        if AA[i]=="x" or AA[i]=="-":
                            i+=0
                            start+=1
                        else:
                            p1=i
                            for q in range(i,(len(AA)-s)+1,1):
                                for j in range(q,len(AA),1):
                                    if AA[j]=="-" and kmer=="":
                                        j=q+1
                                        p1=j
                                        break
                                    if AA[j]=="-":
                                        j+=0
                                    else:
                                        p+=1
                                        kmer+=AA[j]
                                        if p==k:
                                            p=0
                                            p2=j
                                            pos=(p1+1,p2+1)
                                            #print(AA[p1:p2+1])
                                            #print(kmer)
                                            csv_writer1.writerow([kmer,pos,seqID,ai,subjectID,cloneID,sampleID])
                                            j=q+1
                                            p1=j
                                            kmer=""
                                            break
                            break   
                            
                
                i=0
                print("Kmers extraction starts ....")     
                with open(toFile1,'r') as csv_file:
                    csv_reader = csv.DictReader(csv_file)
                    with open(toFile2, 'w',newline='') as new_file1:
                        csv_writer1 = csv.writer(new_file1)
                        csv_writer1.writerow(['k-mer','position','seq_id','ai','subject_id','clone_id','sample_id'])
                        for line in csv_reader:
                            KmerS=(line['translatedSeq'])
                            seqID=(line['seq_id'])
                            ai =(line['ai'])
                            cloneID=(line['clone_id'])
                            sampleID=(line['sample_id'])
                            subjectID=(line['subject_id'])
                            #Function for the k-mers!
                            kmersFunc(KmerS,20)
                        print("Kmers extraction DONE!")

In [33]:
# Execute the function
if __name__ == "__main__":
    pipeline_result = run_pipeline(PipelinePreprocessingTest)

test_01_translate_convert_extract (__main__.PipelinePreprocessingTest.test_01_translate_convert_extract) ... 

--- Initializing Preprocessing Pipeline Environment ---
> folder `temp_data` exists, continuing.
> folder `tms_input` exists, continuing.
> folder `reports` exists, continuing.
> folder `temp_data//covid_vaccine_new` exists, continuing.
> folder `tms_input//covid_vaccine_new` exists, continuing.
> folder `reports//covid_vaccine_new` exists, continuing.
['7']
Translating starts ....


100%|██████████| 19547/19547 [00:01<00:00, 10245.85it/s]


Tranlating DONE!
AA-mutations starts ....
AA-mutations DONE!
Kmers extraction starts ....


ok

----------------------------------------------------------------------
Ran 1 test in 32.943s

OK


Kmers extraction DONE!

--- PIPELINE EXECUTION SUMMARY ---
Final Status: SUCCESS V
