In [1]:
import gensim
from gensim import corpora, models
from sklearn.model_selection import train_test_split
import math
import csv
from collections import defaultdict
import random

In [2]:
"""
Uses a latent dirichlet allocation model to characterize protein melting temperature 
through the use of derived "topics."
"""

TEST_DATA_SET = []
# DATA_FILE = "../Data/data_full_simple_short_2.csv"
DATA_FILE = "synthetic_data_mutated.csv"
SYNTHETIC_DATA_TEST_FILE = "test_synthetic_data_mutated.csv"
PROTEIN_SEQUENCE_INDEX = 1
TM_INDEX = 0
INTERVAL = 15
MAX_RANGE = 15
MAX_RANGES = 1
TOPICS_TO_GET = 10
WORDS_PER_TOPIC = 4

In [3]:
def set_dictionary(interval, min_temp, max_temp):
    """
    :param interval: temperature interval to be used; e.g. if it is 15, then one range is 300-314
    :param min_temp: the minimum temperature of the dataset
    :param max_temp: the maximum temperature of the dataset
    :return: an array of ranges [min, max, string representing the range]
    :return: a dictionary mapping the range to a (min, max) tuple
    """
    vals_and_strings = []
    range_dict = {}
    for i in [x for x in range(int(min_temp) - interval, int(max_temp) + interval) if x % interval == 0]:
        vals_and_strings.append([int(i), int(i + interval - 1), str(i) + "-" + str(i + interval - 1)])
        range_dict[str(i) + "-" + str(i + interval - 1)] = (int(i), int(i + interval - 1))
    
    return vals_and_strings, range_dict

def separate_learn_and_test_data(data, using_synthetic_data):
    """
    :param data: data to be randomized, in array structure
    :param using_synthetic_data: a flag indicating if using synthetic data - thus, test data already prepared
    :return: learning data, testing data
    """
    if using_synthetic_data:
        syn_data = get_data(SYNTHETIC_DATA_TEST_FILE)
        """
        """
        new_data = []
        for i in data:
            if i not in syn_data:
                new_data.append(i)
        return new_data, get_data(SYNTHETIC_DATA_TEST_FILE)
        
        """
        """
        return data, get_data(SYNTHETIC_DATA_TEST_FILE)
    random.shuffle(data)
    return(data[int(len(data) / 5):], data[:int(len(data) / 5)])
    
def set_data_based_on_dictionary(data, vals_and_strings):
    """
    :param data: the data considered
    :param vals_and_strings: an array of ranges [min, max, string representing the range], the ranges used, returned from set_dictionary(...)
    :return: proteins sorted in their respective ranges
    """
    topic_analysis_data = {}
    for item in vals_and_strings:
        for point in data:
            if point[0] >= item[0] and point[0] <= item[1]:
                if topic_analysis_data.get(item[2]) is not None:
                    topic_analysis_data[item[2]].append(point[1])
                else:
                    topic_analysis_data[item[2]] = [point[1]]
    return topic_analysis_data

def compare_topic_arrs(topic1, topic2):
    """
    :param topic1: a topic to be compared to topic2; an array of words (strings)
    :param topic2: a topic to be compared to topic1; an array of words (strings)
    :return: a boolean value on whether or not the two topics are "equal." does not consider the order of the "words" in the "topic"
    """
    for i in topic1:
        if i not in topic2:
            return False
    for i in topic2:
        if i not in topic1:
            return False
    return True

def get_predicted_range(learned_topics, test_result_topics, range_dict):
    """
    :param learned_topics: a dictionary matching each learned topic to the amino acids ("words") in that topic
    :param test_result_topic: a 2d array of length TOPICS_TO_GET, which is the topics returned 
    from the LDA for the protein being tested
    :param range_dict: a dictionary mapping each range string to a (min, max) tuple
    :return: returns the range predicted, a (min, max) tuple
    """
    d = {}
    matching_topics = []
    for topic in learned_topics.keys():
        for result in test_result_topics:
            for t in learned_topics[topic]:
                if compare_topic_arrs(result, t):
                    if d.get(topic, None) is None:
                        d[topic] = 1
                    else:
                        d[topic] += 1
                    matching_topics.append(topic)
    
    min_val = None
    max_val = None
    for t in matching_topics:
        # Return the most probable (the first in the array) topic.
        return range_dict[t][0], range_dict[t][1]
    
        # In the future, if ranges are fine-grained enough, we can expand our method to include multiple ranges.
        # The intervals are demarcated randomly. This could help resolve some of the error that is derived from that
        # randomness.
        if min_val is None or range_dict[t][0] < min_val:
            min_val = range_dict[t][0]
        if max_val is None or range_dict[t][1]  > max_val:
            max_val = range_dict[t][1] 
        if max_val - min_val > MAX_RANGE - INTERVAL:
            break
    return min_val, max_val
    
def get_topic_data(data_array):
    """
    :param data_array: an array of strings, representing the a range of data
    :return: a 2D array of topics, where each topic is a few amino acids, of the form [['A', 'G', 'L'], ['G', 'F', 'L']...]
    """
    texts = []
    for item in data_array:
        amino_acid = item.upper()
        new_str = ""
        for ch in amino_acid:
            new_str += ch + " "
        new_str.strip()
        if new_str != "":
            to_append = new_str.split(" ")
            to_append.pop(len(to_append) - 1)
            texts.append(to_append)
    dictionary = corpora.Dictionary(texts)
    corpus = [dictionary.doc2bow(text) for text in texts]
    # LDA - Latent Dirichlet Allocation
    ldamodel = gensim.models.ldamodel.LdaModel(corpus, num_topics=TOPICS_TO_GET, id2word=dictionary, passes=10) #alpha='auto', eval_every=5)  #
    # print(ldamodel.print_topics(num_topics=15, num_words=30))
    topics = []
    for i in range(TOPICS_TO_GET):
        topic_filetered = []
        topic = ldamodel.show_topic(i)
        for j in range(WORDS_PER_TOPIC):
            topic_filetered.append(topic[j][0])
        topics.append(topic_filetered)
    return topics

def get_range_from_val(true_val, ranges):
    """
    :true_val: an integer
    :ranges: a 2d array of ranges, where each array has 3 elements: [min, max, and "min-max" string]
    :return: returns the range for true_val 
    """
    for arr in ranges:
        if int(true_val) >= arr[0] and int(true_val) <= arr[1]:
            print((arr[0], arr[1]))
            return (arr[0], arr[1])

def get_data(file_name):
    """
    :param file_name: the file name to be read
    :return: the data, where the TM_INDEX gives the index of the TM and PROTEIN_SEQUENCE_INDEX gives the index of the protein
    """
    with open(file_name, encoding='utf-8-sig') as f:
        csv_reader = csv.reader(f)
        two_d_arr = []
        for row in csv_reader:
            row_in_row = []
            for v in row:
                v = v.strip()
                if len(row_in_row) == PROTEIN_SEQUENCE_INDEX:
                    row_in_row.append(str(v))
                elif len(row_in_row) == TM_INDEX:
                    row_in_row.append(float(v))
            two_d_arr.append(row_in_row)
        return two_d_arr

In [None]:
# MARK:- Main program
times = 1 # Times to run the entire program

# STATISTICS
summation = 0
# Dictionary keys for statistics 
MAX_NAME = "MAX"
MIN_NAME = "MIN"
DEVIATION = "SIGMA"
MEAN = "MEAN"
statistics = {MAX_NAME: 0, MIN_NAME: 1, DEVIATION: 0, MEAN: 0}
c_matrix = {} # Confusion matrix

points = [] # An array of every accuracy value reported. This is of length "times" (see declaration above)
for _ in range(times):
    
    print("Starting...")
    data = get_data(DATA_FILE)
    min_val = None
    max_val = None
    for i in data:
        if min_val is None or i[TM_INDEX] < min_val:
            min_val = i[TM_INDEX]
        if max_val is None or i[TM_INDEX] > max_val:
            max_val = i[TM_INDEX]
    ranges, range_dict = set_dictionary(INTERVAL, min_val, max_val)
    
    # Split using sci-kit learn.
    # learning_data, test_data = train_test_split(data)
    
    # Split deterministically for testing.
    learning_data, test_data = separate_learn_and_test_data(data, True)
    
    # Learn, set the dictionary for proteins in ranges
    dictionary_data = set_data_based_on_dictionary(learning_data, ranges)
    
    # Learn the topics
    learned_topics = {}
    count = 0
    for key in dictionary_data.keys():
        count += 1
        learned_topics[key] = get_topic_data(dictionary_data[key])

    #  Test
    dictionary_data = set_data_based_on_dictionary(test_data, ranges)
    test_results = {}
    for item in test_data:
        data = get_topic_data(item[PROTEIN_SEQUENCE_INDEX])
        if data is not None:
            test_results[item[PROTEIN_SEQUENCE_INDEX]] = data, item[TM_INDEX]

    # Keep track of wins, losses, and nones
    win = 0
    loss = 0
    none_loss = 0
    
    # 
    for result in test_results.keys():
        
        # A true value for an individual protein
        true_val = test_results[result][1]
        # Get the predicted range for the individual protein 
        range_result = get_predicted_range(learned_topics, test_results[result][0], range_dict)
        # Get the true range for the predicted protein
        true_range = get_range_from_val(true_val, ranges)
        
        # Compare the ranges
        if range_result[0] is not None and range_result[0] != 0 and range_result[1] is not None:
            if int(true_val) >= range_result[0] and int(true_val) <= range_result[1]:
                
                # Update statistics
                win += 1
                
                # Print out the wins
                print("win: " + str(range_result))
                print("actual = " + str(true_val))
                
                # Update confusion matrix
                if true_range not in c_matrix:
                    c_matrix[true_range] = {}
                    c_matrix[true_range][range_result] = 1
                else:
                    if range_result not in c_matrix[true_range]:
                        c_matrix[true_range][range_result] = 1
                    else:
                        c_matrix[true_range][range_result] += 1
            else:
                
                # Update statistics
                loss += 1
                
                # Print out the loss
                print("loss:" + str(range_result))
                print("actual = " + str(true_val))
                
                # Update confusion matrix
                if true_range not in c_matrix:
                    c_matrix[true_range] = {}
                    c_matrix[true_range][range_result] = 1
                else:
                    if range_result not in c_matrix[true_range]:
                        c_matrix[true_range][range_result] = 1
                    else:
                        c_matrix[true_range][range_result] += 1
            print("")
        else:
            
            # Update statistics
            none_loss += 1
            
            # Print out the none loss
            print("None: " + str(true_val))
            print("")
            
            # Update confusion matrix
            if true_range not in c_matrix:
                    c_matrix[true_range] = {}
                    c_matrix[true_range]["NONE"] = 1
            else:
                if "NONE" not in c_matrix[true_range]:
                    c_matrix[true_range]["NONE"] = 1
                else:
                    c_matrix[true_range]["NONE"] += 1
    
    # Print out statistics for this particular test case
    summation += win/(loss + win)
    print(win)
    print(loss)
    print(none_loss)
    print("Final Results")
    print("Total Loss: " + str(win/(loss + win + none_loss)))
    print("Predicted Loss: " + str(win/(loss + win)))
    
    # Update overall statistics
    pred_loss = win/(loss + win)
    points.append(pred_loss)
    if pred_loss < statistics[MIN_NAME]:
        statistics[MIN_NAME] = pred_loss
    if pred_loss > statistics[MAX_NAME]:
        statistics[MAX_NAME] = pred_loss
    statistics[MEAN] += 1/float(times) * float(pred_loss)

# Print out overall statistics
print(points)
for pt in points:
    statistics[DEVIATION] += (pt - statistics[MEAN])**2
if times > 1:
    statistics[DEVIATION] *= float(1)/(times - 1)
else:
    statistics[DEVIATION] = 0
dev = statistics[DEVIATION]
statistics[DEVIATION] = math.sqrt(dev)
print(summation/float(times))
print(statistics)
print(c_matrix)

Starting...


# 