In [14]:
from IPython.display import HTML
import numpy as np
import copy
import random

shell = get_ipython()

def adjust_font_size():
    display(HTML('''<style>
        body{
            font-size: 32px;
        }
    '''))

if adjust_font_size not in shell.events.callbacks['pre_execute']:
    shell.events.register('pre_execute', adjust_font_size)

In [69]:
def printmatrix(m, pad = 4):
    '''
    Prints the provided 2D array in a readable format with labels for each index.
    '''
    
    # Make column labels across the top of the matrix
    for i in range(0, len(m[0]) + 1):
        print(f'{i:>{pad}}', end = '')
        
    print()
    row = 0
    for r in m:
        # Print labels for each row
        row += 1
        print(f'{str(row):>{pad}}', end = '')
        
        for d in r:
            print(f'{str(d):>{pad}}', end = '')
        print()

In [57]:
def get_sequences():
    '''
    Returns the sequences extracted from both population files, and an array of their labels.
    '''
    
    f1 = open('pop1_1.txt', 'r')
    f2 = open('pop2_1.txt', 'r')

    p1 = get_array(f1)
    p2 = get_array(f2)
    l = np.ones_like(p1)
    
    for g, p in enumerate(p2):
        for s in range(0, len(p2[g])):
            p1[g].append(p2[g][s])
            
    l = np.insert(l, len(p2[0]), np.full((len(p2[0]), 1), 2), axis = 1)
    
    return p1, l.astype(int)

def get_array(f):
    '''
    Extracts the sequences from a population file into a two dimensional list.
    Returns list with shape = (generations, population size).
    '''
    
    pop = []
    gen = -1
    for s in f:
        if s[0] == 'P' or s[0] == '\n':
            pop.append([])
            gen += 1
            continue
        pop[gen].append(s[:-1])
    
    return pop

def shuffle(a, b, n):
    '''
    Assumes a and b are 2D lists of the same shape, and shuffles them in the same way. Only the order of
    elements within the innermost lists are changed. The number of times to shuffle each index is the
    provided parameter n.
    '''
    
    for r in range(0, len(a)):
        for x in range(0, n):
            for i in range(0, len(a[r])):
                t = random.randint(0, len(a[r]) - 1)
                a[r][i], a[r][t] = a[r][t], a[r][i]
                b[r][i], b[r][t] = b[r][t], b[r][i]
    
    #printmatrix(b)


In [2]:
def analyze(seq1, seq2):
    '''
    Attempt to align the two given sequences and return the scoring matrix. The result of matched characters is 
    +10, for mismatched characters is -10, and the gap penalty is -8 points.
    '''
    
    gappenalty = -8
    
    # Initialize scoring matrix
    scoringmatrix = [[0 for i in range(0, len(seq1) + 1)] for j in range(0, len(seq2) + 1)]
    
    for i in range(0, len(scoringmatrix[0])):
        scoringmatrix[0][i] =  i * gappenalty
    for i in range(0, len(scoringmatrix)):
        scoringmatrix[i][0] = i * gappenalty

    # Populate scoring matrix for given sequences
    for r in range(1, len(scoringmatrix)):
        for c in range(1, len(scoringmatrix[0])):
            # Check the score for moving in each direction
            vert = scoringmatrix[r - 1][c] + gappenalty
            horz = scoringmatrix[r][c - 1] + gappenalty
            diag = scoringmatrix[r - 1][c - 1]
            if (seq1[c - 1] == seq2[r - 1]):
                diag += 10
            else:
                diag -= 10
            # Assign the best direction score
            scoringmatrix[r][c] = max(vert, horz, diag)
            
            #print(f'Index r: {r}, Index c: {c}, Last score: {scoringmatrix[r - 1][c - 1]}')
            
    return scoringmatrix[len(scoringmatrix) - 1][len(scoringmatrix[0]) - 1]

In [59]:
def compare(p):
    '''
    Attempts to align each sequence in the given list with every other sequence and returns a table of
    each alignment score, and the median score.
    '''
    scores = np.zeros((len(p), len(p))).astype(int)
    l = []
    
    # Check each sequence against all the other sequences in the generation
    for i, s in enumerate(p):
        for j in range(i + 1, len(p)):
            #print(f'Comparing {t} to {s}')
            scores[i][j] = analyze(s, p[j])
            scores[j][i] = scores[i][j] # the comparisons would be the same
            l.append(scores[i][j])
                
    return scores, np.median(np.array(l))

In [70]:
def check_row(s, r, key = []):
    make_key = key == []
    
    med = np.median(s[r])
    print('Median:', med)

    for y in range(0, len(s)):
        if s[r][y] == 0:
            if make_key:
                key.append(1)
            # skip the diagonal for normal checking
        elif s[r][y] > med:
            if make_key:
                key.append(1)
            else:
                s[r][y] = key[y]
        else:
            if make_key:
                key.append(2)
            else:
                s[r][y] = 3 - key[y]  

    return key
    
def predict(s, p):
    '''
    Predicts which group each individual belongs in, and returns an array with the final predictions.
    '''
    
    key = check_row(s, p)
    print(key)
    
    # Make sure changed rows are never checked
    for c in range(0, len(s)):
        check_row(s, c, key)

    #printmatrix(s, 3)
    
    return row_mode(s)

def row_mode(s):
    '''
    Returns an array of shape (len(s), ) that has the mode of each row. Assumes labels of 1 or 2.
    '''
    
    predictions = np.zeros((len(s), )).astype(int)

    for r in range(0, len(s)):
        count = 0
        for c in range(0, len(s[0])):
            if s[r][c] == 1:
                count += 1
        if count >= len(s) / 2:
            predictions[r] = 1
        else:
            predictions[r] = 2
            
    return predictions

def col_mode(s):
    '''
    Returns an array of shape (len(s), ) that has the mode of each column. Assumes labels of 1 or 2.
    '''
    
    predictions = np.zeros((len(s[0]), )).astype(int)

    for c in range(0, len(s[0])):
        count = 0
        for r in range(0, len(s)):
            if s[r][c] == 1:
                count += 1
        if count >= (len(s) + 1) / 2:
            predictions[c] = 1
        else:
            predictions[c] = 2
            
    return predictions

In [67]:
def calc_error(p, l):
    '''
    Returns a list of every index at which the two lists are different.
    '''
    
    errors = []
    
    for i in range(0, len(p)):
        if p[i] != l[i]:
             errors.append(i)
                
    return errors

def print_classes(pop, labels):
    '''
    Makes a prediction of the starting population for each sequence, and checks how accurate they were.
    '''
    
    # Run the analysis for every generation
    for gen in range(0, len(pop)):
        # Get the alignment matrix for the generation
        print(f'\nScores for generation {gen + 1}:')
        newscores, median = compare(pop[gen])
        printmatrix(newscores, 6)
        
        # Find the two most similar sequences, and assume the next closest 3 in that row are all in population 1
        predictions = []
        bestcol = np.unravel_index(newscores.argmax(), newscores.shape)[0]
        closest = np.argpartition(newscores[bestcol], -4)[-4:]
        closest = np.append(closest, bestcol)
        print(closest)

        print(f'\nPredictions for generation {gen + 1}:')
        for p in closest:
            #print(f'Key generated from individual {p + 1}')
            temp = copy.deepcopy(newscores)
            predictions.append(predict(temp, p))

        printmatrix(predictions)
        final = col_mode(predictions)

        print(f'Predicted labels: {final}')
        print(labels[gen])

        # Check the accuracy of the predictions
        errors = calc_error(final, labels[gen])

        # If we missed more than half, the key sequences must have been in population 2, so swap all predictions
        if len(errors) > 10:
            print('Key population assumption was incorrect.')
            final = 3 - final
            errors = calc_error(final, labels[gen])
                
        # Print the missed sequences
        for i in errors:
            print(f'Individual {i + 1} was from population {labels[gen][i]}, but {final[i]} was predicted!')

        print(f'Missed {len(errors)} out of {len(final)} predictions.')


In [68]:
print('Analysis of sorted population:')
combined_pop, labels = get_sequences()
#print_classes(combined_pop, labels)

print('Analysis of shuffled population:')
shuffle(combined_pop, labels, 10)
print_classes(combined_pop, labels)

Analysis of sorted population:
Analysis of shuffled population:

Scores for generation 1:
     0     1     2     3     4     5     6     7     8     9    10    11    12    13    14    15    16    17    18    19    20
     1     0   760   750  1458  1156   966   780   760   906   714   760   752  1282   734  1216   768  1230  1828  1920   722
     2   760     0  1552   786   786   728  1734  1874   720  1510  1920  1496   784  1484   740  1488   720   760   760  1508
     3   750  1552     0   780   672   662  1474  1506   662  1312  1552  1294   664  1484   646  1326   636   750   750  1306
     4  1458   786   780     0  1140   984   772   786   972   696   786   770  1252   726  1166   784  1146  1472  1458   756
     5  1156   786   672  1140     0   884   704   806   852   680   786   776  1748   640  1656   812  1542  1222  1156   760
     6   966   728   662   984   884     0   670   748  1860   564   728   724   924   694   928   714   836   946   966   690
     7   780  1734  1

KeyboardInterrupt: 