In [3]:
#  Data Preprocessing
import torch
import Bio
import pandas as pd

device = 'gpu' if torch.cuda.is_available() else 'cpu'
print(device)

cpu


In [None]:
# Generate K-Mer from sequence
s = "GTTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACT"

# Generate array of k-mer or return original sequence if 
# from https://github.com/jerryji1993/DNABERT/blob/master/motif/motif_utils.py
#
# @param sequence : sequence you want to process
# @param k : how many length you want in k-mer. If k=-1 then original sequence is returned.
# @n_k_mer : how many k-mers are retrieve. If all kmers are required, please put -1.
def create_k_mer(sequence, k, n_k_mer):
    # Clean sequence from N characters.
    sequence = ''.join(c for c in sequence if c not in ['N'])
    if k > 0:
        arr = [sequence[i:i+k] for i in range(len(sequence)+1-k)]
        if n_k_mer > 0:
            arr = arr[0:n_k_mer]
        kmer = ' '.join(arr)
        return kmer
    else:
        return sequence

kmer = create_k_mer(s, 6, 10)
print(kmer)

In [None]:
# Create fine tuning file from fasta file.
from Bio import SeqIO
import os

# Generate file for fine tuning using FASTA file.
# @param fasta_file : Original fasta file.
# @param label_for_this_file : What label for this fine tuning file.
# @param output_file_path : What and where the fine tuning is named and stored. 
#                           If file path exists, existing file will be removed.
# @param n_samples : How many sequence will be put in fine tuning file. 
#                    If all sequence is to be generated, please put -1.
# @param k_mer : Size of k-mer. If k-mer is not required, please put -1.
# @param n_k_mer : How many kmers are written to file for each sequence in fasta file. 
#                  If all kmers are written, please put -1.
def generate_sample_fine_tuning_file(fasta_file, label_for_this_file, output_file_path, n_samples, k_mer, n_k_mer):
    records = list(SeqIO.parse(fasta_file, 'fasta'))
    if len(records) >= n_samples:
        records = records[0:n_samples]
    
    if (os.path.exists(output_file_path)):
        os.remove(output_file_path)
        
    output_file = open(output_file_path, 'w+')
    for r in records:
        output_file.write(create_k_mer(str(r.seq), k_mer, n_k_mer) + '\t' + str(label_for_this_file) + '\n')
    output_file.close()
    return output_file_path

# Merge two files together.
# @param fp : First file path.
# @param gp : Second file path.
# @param hp : Third file as result from merging two files together.
def merge_file(fp, gp, hp):
    data1 = data2 = ""
    with open(fp) as f:
        data1 = f.read()
    with open(gp) as g:
        data2 = g.read()
    
    final_data = data1 + data2      
    with open (hp, 'w') as h:
        h.write(final_data)
        h.close()
        
# Merge files into single file.
# @param origin_files : Original files in list.
# @param merged_file : Merged file.
# @param headers : Header for this file in list. Each header is separated by tabs.
def merge_files(origin_files, merged_file_path, headers):
    merged_data = ""
    
    for file_path in origin_files:
        print('reading file {}'.format(file_path))
        with open(file_path, 'r') as of:
            d = of.read()
            merged_data += d
            
    merged_file = open(merged_file_path, 'w+')
    if (headers):
        header = headers[0]
        for h in headers[1:]:
            header +='\t' + h
        merged_file.write(header + '\n')
    
    merged_file.write(merged_data)
    merged_file.close()


In [3]:
# Transform DNA into RNA by changing base T into U.
# Write the transformed sequence (RNA seq) into file, if necessary.
# @param dna_seq : A set of DNA sequence.
# @param write_to_file_path : File in which RNA seq is written.
# @return : A set of RNA sequence.
def transform_DNA_to_RNA(dna_seqs, write_to_file_path):
    rna_seqs = [seq.replace('T', 'U') for seq in dna_seqs]
    if (write_to_file_path):
        print('writing RNA seq at {}'.format(write_to_file_path))
        f = open(write_to_file_path, 'w')
        for seq in rna_seqs:
            f.write(seq + '\n')
    return rna_seqs

s = "GTTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACT"
t = "TTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACTG"
u = "TCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACTGT"

print(transform_DNA_to_RNA([s,t,u], ''))

['GUUCUCUAAACGAACUUUAAAAUCUGUGUGGCUGUCACUCGGCUGCAUGCUUAGUGCACU', 'UUCUCUAAACGAACUUUAAAAUCUGUGUGGCUGUCACUCGGCUGCAUGCUUAGUGCACUG', 'UCUCUAAACGAACUUUAAAAUCUGUGUGGCUGUCACUCGGCUGCAUGCUUAGUGCACUGU']


In [4]:
# Nucleotide sequences.
# B.1.1.7 (Alpha), 
ALPHA_FASTA_PATH="data/sars-cov-2/raw/nucl/complete-nucl-sars_cov_2-B.1.1.7-human_origin.fasta"
# B.1.351 (Beta), 
BETA_FASTA_PATH="data/sars-cov-2/raw/nucl/complete-nucl-sars_cov_2-B.1.351-human_origin.fasta"
# B.1.617.2 (Delta), 
DELTA_FASTA_PATH="data/sars-cov-2/raw/nucl/complete-nucl-sars_cov_2-B.1.617.2-human_origin.fasta"
# P.1 (Gamma)

# Protein sequences.
ALPHA_PROT_FASTA_PATH = "data/raw/prot/complete-prot-sars_cov_2-B.1.1.7-human_origin.fasta"
BETA_PROT_FASTA_PATH = "data/raw/prot/complete-prot-sars_cov_2-B.1.351-human_origin.fasta"
DELTA_PROT_FASTA_PATH = "data/raw/prot/complete-prot-sars_cov_2-B.1.617.2-human_origin.fasta"

ALPHA_CLASS = 1
BETA_CLASS = 2
DELTA_CLASS = 3

K_MER_6 = 6
K_MER_5 = 5
K_MER_4 = 4
K_MER_3 = 3
N_K_MER = 100 # How many k-mers are retrieved per sequence.
N_SAMPLES = 100 # How many sequences are retrieved.

PREFIX = 'sarscov2'
DEST_DIR = 'data'
DEST_DIR_ALPHA = DEST_DIR + '/alpha'
DEST_DIR_BETA = DEST_DIR + '/beta'
DEST_DIR_DELTA = DEST_DIR + '/delta'

# Wuhan 2020 Isolate
WUHAN_FASTA_PATH = "data/sars-cov-2/wuhan/MN908947.fna"
WUHAN_TEXT_PATH = "data/sars-cov-2/wuhan/MN908947.txt"

In [None]:
"""
from Bio import SeqIO
records = list(SeqIO.parse(WUHAN_FASTA_PATH, "fasta"))
print("len {}".format(len(str(records[0].seq))))
records
"""

"""
Reads the text file. Since genome is divided into several gene which consists of 70 bases,
file is read line by line, creating set of genes.
"""
f = open(WUHAN_TEXT_PATH, "r")
seqs = []
for i, line in enumerate(f):
    if i > 0:
        # Removing newline char.
        l = ''.join(c for c in line if c not in ['\n'])
        seqs.append(l)
        # print(l)

seqs = transform_DNA_to_RNA(seqs, '')
print(seqs)


In [6]:
from Bio import SeqIO
records = SeqIO.read(WUHAN_FASTA_PATH, "fasta")
print("len {}".format(len(records)))

dnaSeq = records.seq
nucleotides = {}
for c in dnaSeq:
    if c in nucleotides:
        nucleotides[c] += 1
    else:
        nucleotides[c] = 1
print(nucleotides)

import pandas as pd

# Create dataframe.
table = pd.DataFrame(data=nucleotides, index=[0]).T.reset_index()
table = table.rename(columns={0: 'frequency', 'index': 'nucleotides'})
table = table.sort_values(by=['frequency'], ascending=True)

len 29903
{'A': 8954, 'T': 9594, 'G': 5863, 'C': 5492}


In [None]:
# Generate dataset for prediction.
for i in [3, 4, 5, 6]:
    ep = generate_sample_fine_tuning_file(ALPHA_FASTA_PATH, 0, 'data/ft/fine_tuning_sample_alpha_k-mer_'+str(i)+'.txt', N_SAMPLES, i, N_K_MER)
    fp = generate_sample_fine_tuning_file(BETA_FASTA_PATH, 1, 'data/ft/fine_tuning_sample_beta_k-mer_'+str(i)+'.txt', N_SAMPLES, i, N_K_MER)
    gp = generate_sample_fine_tuning_file(DELTA_FASTA_PATH, 2, 'data/ft/fine_tuning_sample_delta_k-mer_'+str(i)+'.txt', N_SAMPLES, i, N_K_MER)
    

In [None]:
# Merge files into single fine tuning file.
for i in [3, 4, 5, 6]:
    fp = 'data/ft/fine_tuning_sample_alpha_k-mer_'+str(i)+'.txt'
    gp = 'data/ft/fine_tuning_sample_beta_k-mer_'+str(i)+'.txt'
    hp = 'data/ft/fine_tuning_sample_delta_k-mer_'+str(i)+'.txt'
    merge_files([fp, gp, hp], 'data/ft/fine_tuning_sample_k-mer_{}_ALPHA_BETA_DELTA.tsv'.format(str(i)), ['sequence', 'label'])

In [None]:
# Generate protein dataset for prediction.
for i in [3,4,5,6]:
    ep = generate_sample_fine_tuning_file(ALPHA_PROT_FASTA_PATH, 0, 'data/ft/prot/fine_tuning_prot_sample_alpha_k-mer_'+str(i)+'.txt', N_SAMPLES, i, N_K_MER)
    fp = generate_sample_fine_tuning_file(BETA_PROT_FASTA_PATH, 1, 'data/ft/prot/fine_tuning_prot_sample_beta_k-mer_'+str(i)+'.txt', N_SAMPLES, i, N_K_MER)
    gp = generate_sample_fine_tuning_file(DELTA_PROT_FASTA_PATH, 2, 'data/ft/prot/fine_tuning_prot_sample_delta_k-mer_'+str(i)+'.txt', N_SAMPLES, i, N_K_MER)
    merge_files([ep, fp, gp], 'data/ft/prot/fine_tuning_sample_prot_k-mer_{}_ALPHA_BETA_DELTA.tsv'.format(str(i)), ['sequence', 'label'])

In [None]:
# Splitting the sequence collection into files based on its sequence id in fasta.
# Filename = prefix-sequence_id_from_fasta-class-k_mer.txt
# @param fasta_file : Fasta file as source.
# @param prefix : Filename prefix.
# @param class_name : The class for this fasta file in number (0, 1, 2, etc.)
# @k_mer_size : Size of k-mer
# @dest_dir : Intended file directory.
def generate_sequence_file(fasta_file, prefix, class_name, k_mer_size, dest_dir):
    records = list(SeqIO.parse(fasta_file, 'fasta'))
    for record in records:
        if not (os.path.exists(dest_dir)):
            # os.mkdir(dest_dir)
            import pathlib
            pathlib.Path(dest_dir).mkdir(parents=True, exist_ok=True)
        output_file_name = dest_dir + '/' + prefix + '-' + record.id + str(class_name) + str(k_mer_size) + '.txt'
        if (os.path.exists(output_file_name)):
            os.remove(output_file_name)
        output_file = open(output_file_name, 'w+')
        seq = create_k_mer(str(record.seq), k_mer_size, -1)
        output_file.write(seq + '\t' + str(class_name))
        output_file.close()

In [None]:
# Generate individual sequence.
# generate_sequence_file(ALPHA_FASTA_PATH, PREFIX, ALPHA_CLASS, -1, DEST_DIR) # don't do it. source file is too big.
generate_sequence_file(BETA_FASTA_PATH, PREFIX, BETA_CLASS, -1, DEST_DIR_BETA)
generate_sequence_file(DELTA_FASTA_PATH, PREFIX, DELTA_CLASS, -1, DEST_DIR_DELTA)

In [None]:
# Generate data for predictions.
# @param fasta_file : Path to fasta file.
# @param output_file_path : What and where the prediction file is named and stored. 
#                           If file path exists, existing file will be removed.
# @seq_index : From where sequence is read.
# @n_samples : How many sequences are used to create prediction file.
# @k_mer : Size of k-mer.
# @n_k_mer : How many kmers are written to file for each sequence in fasta file.
def generate_data_to_predict(fasta_file, output_file_path, seq_index, n_samples, k_mer, n_k_mer):
    records = list(SeqIO.parse(fasta_file, 'fasta'))
    if (len(records)) > n_samples:
        records = records[seq_index:n_samples]
    
    if (os.path.exists(output_file_path)):
        os.remove(output_file_path)
    
    output_file = open(output_file_path, 'w+')
    for r in records:
        output_file.write(create_k_mer(str(r.seq), k_mer, n_k_mer) + '\n')
    output_file.close()

In [None]:
# Generate data for prediction
generate_data_to_predict(ALPHA_FASTA_PATH, 'prediction_sample_alpha_'+str(K_MER)+'.txt', N_SAMPLES, N_SAMPLES, K_MER, N_K_MER)
generate_data_to_predict(BETA_FASTA_PATH, 'prediction_sample_alpha_'+str(K_MER)+'.txt', N_SAMPLES, N_SAMPLES, K_MER, N_K_MER)
generate_data_to_predict(DELTA_FASTA_PATH, 'prediction_sample_alpha_'+str(K_MER)+'.txt', N_SAMPLES, N_SAMPLES, K_MER, N_K_MER)