In [4]:
import os
import re
import json
import exrex
import numpy
import Bio.SeqIO
import pandas as pd
from sklearn.svm import SVC
from joblib import Parallel, delayed
from sklearn.metrics import f1_score
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score

In [9]:
# Function to convert degenerative k-mers into a list of k-mers 
def convert_degenerative_k_mers(k_mer):
	k_mers = []
	temp_k_mer = ""
	
	for n in range(len(k_mer)): 
		if k_mer[n] == "A": temp_k_mer = temp_k_mer + "[A]"
		if k_mer[n] == "C": temp_k_mer = temp_k_mer + "[C]"
		if k_mer[n] == "G": temp_k_mer = temp_k_mer + "[G]"
		if k_mer[n] == "T": temp_k_mer = temp_k_mer + "[T]"
		if k_mer[n] == "N": temp_k_mer = temp_k_mer + "[ACGT]"
		if k_mer[n] == "X": temp_k_mer = temp_k_mer + "[ACGT]"
		if k_mer[n] == "V": temp_k_mer = temp_k_mer + "[^ACG]"
		if k_mer[n] == "H": temp_k_mer = temp_k_mer + "[^ACT]"
		if k_mer[n] == "D": temp_k_mer = temp_k_mer + "[^AGT]"
		if k_mer[n] == "B": temp_k_mer = temp_k_mer + "[^CGT]"
		if k_mer[n] == "M": temp_k_mer = temp_k_mer + "[AC]"
		if k_mer[n] == "W": temp_k_mer = temp_k_mer + "[AT]"
		if k_mer[n] == "R": temp_k_mer = temp_k_mer + "[AG]"
		if k_mer[n] == "K": temp_k_mer = temp_k_mer + "[GT]"
		if k_mer[n] == "S": temp_k_mer = temp_k_mer + "[CG]"
		if k_mer[n] == "Y": temp_k_mer = temp_k_mer + "[CT]"

	for i in exrex.generate(temp_k_mer): 
		k_mers.append(i)
	return k_mers

In [10]:
# Function to load test data from a fasta file
def loadDataTrain(file_path):
	# Initialize the data matrix
	D = []
	# Iterate through the fasta file
	for record in Bio.SeqIO.parse(file_path, "fasta"):
		# If there is a class label, save id, sequence and class label in the data list
		try: 
			indexes = [i for i, c in enumerate(record.description) if c == "|"]
			D.append([record.description, str(record.seq.upper()).replace('N',''), record.description[indexes[len(indexes)-1] +1 :]])
		# If there is a no class label, save id and sequence in the data list
		except: D.append([record.descrition, str(record.seq.upper())])
	# Return the data matrix
	return D

In [11]:
# Function to load test data from a fasta file
def loadDataTest(file_path, D_train):
	# Load train data to remove it from test data
	id = [d[0] for d in D_train]
	D_train.clear()
	# Initialize the data matrix
	D = []
	# Iterate through the fasta file
	for record in Bio.SeqIO.parse(file_path, "fasta"):
		# If there is a class label, save id, sequence and class label in the data list
		try: 
			# Get index of last separator
			indexes = [i for i, c in enumerate(record.description) if c == "|"]
			# Save id, sequence and class label
			if record.description not in id:
				D.append([record.description, str(record.seq.upper()), record.description[indexes[len(indexes)-1] +1 :]])
		# If there is a no class label, save id and sequence in the data list
		except: D.append([record.descrition, str(record.seq.upper())])
	# Return the data matrix
	return D

In [12]:
# Function to compute the occurence vector of sequence
def computeSequenceVector(d, K, k):
	# Generate an empty dictionary
	x = {}
	# Initialize the dictionary with targets as keys and 0 as value
	x = x.fromkeys(K.keys(), 0)
	# Go through the sequence 
	for i in range(0, len(d[1]) - k + 1, 1):
		# Try to increment the current k-mer value by 1
		try: x[d[1][i:i + k]] = x[d[1][i:i + k]] + 1
		# Pass if the k-mers does not exist
		except: pass
	# Return the vector and associated target
	return [list(x.values()), d[2]]

In [13]:
# Function to generate the samples matrix (X) and the target values (y)
def generateSamplesTargets(D, K, k):
	# Samples matrix
	X = []
	# Target values
	y = []
	# Iterate through the data
	data = Parallel(n_jobs = -1)(delayed(computeSequenceVector)(d, K, k) for d in D)
	# Add to the matrices
	for d in data: 
		X.append(d[0])
		y.append(d[1])

	# Convert to numpy array format
	X = numpy.asarray(X)
	y = numpy.asarray(y)
	# Return the samples matrix (X) and the target values (y)
	return X, y

In [15]:
# Compute and save performance metrics for KEVOLVE and CASTOR-KRFE
data = {}

with open('KEVOLVE.json') as d:
    KEVOLVE_DATA = json.load(d)


# Iterate throught prediction files
for n in range(1, 101): 
    print()
    y_true = []
    y_pred = []
    k_mers = []
    for i in range(1, 11):
        for record in Bio.SeqIO.parse("KEVOLVE/" + str(n) + "/k_mers/" + str(i) + ".fasta", "fasta"): k_mers.append(record.seq)
  
    k_mers = set(k_mers)

    K = dict.fromkeys(k_mers, 0)
    D_train = loadDataTrain("data/SARS-CoV-2_train_" + str(n) + ".fasta")
    X_train, y_train = generateSamplesTargets(D_train, K , 9)

    D_test = loadDataTest("data/SARS-CoV-2.fasta", D_train)
    X_test, y_test = generateSamplesTargets(D_test, K , 9)

    model = SVC(kernel = 'linear', C = 1, cache_size = 1000)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    # Compute the performance metrics
    f1 = f1_score(y_test, y_pred, average='macro')
    recall = recall_score(y_test, y_pred, average='macro')
    precision = precision_score(y_test, y_pred, average='macro')
    print(str(n) +") precision", precision, "recall", recall, "f1_score", f1, KEVOLVE_DATA[str(n)])

    # Save the predictions
    file = open("KEVOLVE/" + str(n) + "/prediction.csv", "w")
    file.write("id,y_pred\n")
    for i, y in enumerate(y_pred): file.write(D_test[i][0] + "," + y + "\n")
    file.close()

    # Save the results in dictionary 
    data[str(n)] = {"precision":precision, "recall":recall, "f1_score":f1}

# Save the results in JSON format
with open("KEVOLE.json", "w") as outfile:
    json.dump(data, outfile)


{'precision': 0.9404000327485272, 'recall': 0.9979948836965216, 'f1_score': 0.9632747808590979}
1) precision 0.9821277340501251 recall 0.9997389962197982 f1_score 0.9905562569779264
{'precision': 0.966120058913931, 'recall': 0.9995627572565521, 'f1_score': 0.979686335198838}
2) precision 0.9896601166255948 recall 0.9998407053988707 f1_score 0.9944872131257719
{'precision': 0.9863161846725665, 'recall': 0.9995462229775663, 'f1_score': 0.9927638755945505}
