In [1]:
import os
import re
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

from keras.models import Sequential
from keras.layers import Dense, Conv1D, MaxPooling1D, Flatten,Dropout,Conv2D
from keras.layers import LSTM
from keras.layers import Reshape
from sklearn.model_selection import train_test_split
# Load the TensorBoard notebook extension
%load_ext tensorboard
import datetime

2023-07-18 19:03:04.379500: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.1


In [2]:
# get the raw sequence
def input_file(file_path):
    """
    input is the path of fasta file
    output is a list of sequences
    """
    sequences = []
    current_sequence = ""

    with open(fasta_file, 'r') as f:
        for line in f:
            line = line.strip()
            if line.startswith(">"):
                if current_sequence:
                    sequences.append(current_sequence)
                    current_sequence = ""
            else:
                current_sequence += line

    if  current_sequence:
        sequences.append(current_sequence)
    
    return sequences

In [3]:
# one hot encoding for the input
def one_hot_encode(seq):
    """
    input is the individual sequence
    output is the individual one-hot-encoded sequence
    """
    map = np.asarray([[1,0,0,0,0],
                      [0,1,0,0,0],
                      [0,0,1,0,0],
                      [0,0,0,1,0],
                      [0,0,0,0,1]])
    # replace ATCG with corresponding numbers
    seq = seq.upper().replace('A','\x00').replace('C','\x01').replace('G', '\x02').replace('T', '\x03').replace('N','\x04')
    seq_array = np.fromstring(seq, np.int8)
    seq_array = seq_array % 5
    encoded_seq = map[seq_array]
    
    return encoded_seq
    

In [4]:
# pad the sequence and get the input
def process_input(sequences,chunk_size):
    """
    To ensure every sequence has the same length, I pad the sequence with "N" up to the length of set chunk_size
    Then encode every sequence to get the final input for the model
    input: a list of raw sequences and chunk size
    output: the input of model (X)
    """
    padded_seq = []
    pad = ''
    for seq in sequences:
        pad = seq.ljust(chunk_size, 'N')
        padded_seq.append(pad)

    one_hot_sequences = [one_hot_encode(seq) for seq in padded_seq]
    seq_array = np.array(one_hot_sequences)
    one_hot_length = 5
    X = seq_array.reshape(-1, chunk_size, one_hot_length)
    return X

In [3]:
# translate sequence to VDJ sequence
def translate_output(df,idx):
    """
    input: the path of annotation file and the index of each item (sequence)
    output: the individual VDJ sequence like "VVVVVDDDJJJ"
    """
    # store the start, end position and length of VDJ
    # the start and end
    positions = {
    'V': [df.loc[idx, 'v_sequence_start'] - 1, df.loc[idx, 'v_sequence_end']-1],
    'D': [df.loc[idx, 'd_sequence_start'] - 1, df.loc[idx, 'd_sequence_end']-1],
    'J': [df.loc[idx, 'j_sequence_start'] - 1, df.loc[idx, 'j_sequence_end']-1],
    }
    # the length 
    for key in positions.keys():
        positions[key].append(positions[key][1] - positions[key][0]+1)
    
    # create sequence
    seq = ['X'] * (positions['J'][1] + 1)
    
    # replace the X with VDJN 
    for key, (start, end, length) in positions.items():
        seq[start:end+1] = [key] * length
    my_seq = ''.join(seq)
    my_seq = my_seq.replace('X', 'N')
    my_seq = my_seq.ljust(chunk_size, 'X')
    
    return my_seq

In [6]:
# one-hot-encoding for the output
def output_encoding(seq):
    """
    input: the individual VDJ sequence
    output: the individual encoded sequence
    """
    map = np.asarray([[1,0,0,0,0],
                      [0,1,0,0,0],
                      [0,0,1,0,0],
                      [0,0,0,1,0],
                      [0,0,0,0,1]])
   # replace VDJN with corresponding numbers
    seq = seq.upper().replace('V','\x00').replace('D','\x01').replace('J', '\x02').replace('N', '\x03').replace('X','\x04')
    seq_array = np.fromstring(seq, np.int8)
    seq_array = seq_array % 5
    encoded_seq = map[seq_array]
    
    return encoded_seq

In [7]:
# get the output
def process_output(df):
    """
    input: the dataframe obtained from the annotation file
    output: the one-hot-encoded target label of the model
    """
    encoded_VDJ = []
    num_VDJ = []
    for i in range(len(df)):
        seq = translate_output(df,i)
        seq_num = output_catogory(seq)
        
        num_VDJ.append(seq_num)
        encoded_seq = output_encoding(seq)
        encoded_VDJ.append(encoded_seq)
        
    y = np.array(encoded_VDJ)
    y_class = np.array(num_VDJ)
    
    return y, y_class


In [8]:
# translate the VDJ sequence to 1,2,3,4 for performance evaluation
def output_catogory(seq):
    """
    input is the individual VDJ sequence
    output is a list of numbers including 0,1,2,3,4, which respectively correspond to VDJNX
    """
    seq = seq.replace('V','\x00').replace('D','\x01').replace('J', '\x02').replace('N', '\x03').replace('X','\x04')
    return np.fromstring(seq, np.int8)

In [4]:
def generate_Xy(fasta_file, annotation_file):
    sequences = input_file(fasta_file)
    X = process_input(sequences, chunk_size)
    df = pd.read_csv(annotation_file, sep='\t', header=0)
    y,y_class = process_output(df)
    return X,y,y_class

In [2]:
#fasta_file = os.path.join('..','repertoire','first_1000.fasta')
#sequences = input_file(fasta_file)
#chunk_size = 450
#X = process_input(sequences, chunk_size)

In [3]:
#annotation_file = os.path.join('..','repertoire','first_1000.tsv')
#df = pd.read_csv(annotation_file, sep='\t', header=0)
#y,y_class = process_output(df)