This script will get all of the saved prototypes for the final epoch of the chosen ppnet, and form a table with the following columns:
| Class (Latin name of fish) | Prototype | Context | Original Sequence |
|---------------------------|--------------|-------------|------------------|
| manganais angularais      |(of length 10)|(of length ?)| (of length 70)   |
|                           |              |         |                  |
|                           |              |         |                  |

Behind the scenes, this script
- reads in .npy and outputs a .csv
- translates one-hot A,G,T,C to characters
- finds context given receptive field

In [237]:
from collections import defaultdict
import numpy as np
import pandas as pd

In [238]:
def array_to_sequence(sequence_arr):
    """Converts an string of DNA bases to a 4 channel numpy array.

    This function converts a 4 channel numpy array to a string of DNA bases,
    with A -> channel 0, T -> channel 1, C -> channel 2, and G -> channel 3.
    For probability mode, it uses fractional values for ambiguity codes.
    For example, N would be encoded as [.25, .25, .25, .25] since N means that
    the base could be either A, T, C, or G. A full list of these codes can be
    found at the https://droog.gs.washington.edu/mdecode/images/iupac.html 
    or https://www.dnabaser.com/articles/IUPAC%20ambiguity%20codes.html.
    If a character is not a member of the IUPAC ambiguity codes, then it will
    be encoded as [0, 0, 0, 0]. (This is how padding bases are encoded.)

    Args:
       sequence_arr (numpy.ndarray): A 4 x str_len array where the associated bases are the
            nth entries of every vector. For example,
            array_to_sequencec([[1 0 0 0], [0 1 0 0], [0 0 0 1], [0 0 1 0]]) would return
            'ATCG'

    Returns:
        sequence (str): A string of bases, e.g. 'AGTCCCTC'
    """
    mapping_char_to_arr = {
        'a':[1, 0, 0, 0],
        't':[0, 1, 0, 0],
        #'u':[0, 1, 0, 0], # u = t
        'c':[0, 0, 1, 0],
        'g':[0, 0, 0, 1],
        # two options
        'y':[0, 0.5, 0.5 ,0],
        'r':[0.5, 0, 0, 0.5],
        'w':[0.5, 0.5, 0, 0],
        's':[0, 0, 0.5, 0.5],
        'k':[0, 0.5, 0, 0.5],
        'm':[0.5, 0, 0.5, 0],
        # three options
        'd':[1/3, 1/3, 0, 1/3],
        'v':[1/3, 0, 1/3, 1/3],
        'h':[1/3, 1/3, 1/3, 0],
        'b':[0, 1/3, 1/3, 1/3],
        # four options
        'x':[0.25, 0.25, 0.25, 0.25],
        'n':[0.25, 0.25, 0.25, 0.25]
    }
    # reverse the keys and values of the dict
    mapping = {tuple(v): k for k, v in mapping_char_to_arr.items()}
    mapping = defaultdict(lambda: 'N', mapping)
    result = ''
    for i in range(len(sequence_arr[0])):
        vector = [sequence_arr[0][i],
                  sequence_arr[1][i],
                  sequence_arr[2][i],
                  sequence_arr[3][i]]
        result += mapping[tuple(vector)]
    result = result.upper()
    return result

In [239]:
def extract_context(full_str, sub_str):
    """
    I have two strings. the second string is contained within the first string.
    how can I get the 3 chars to the left of the second string in the first
    string (if they exist), concatenated with the second string, concatenated
    with the three chars to the right of the second string in the first string
    (if they exist)
    """
    idx = full_str.find(sub_str)
    if idx == -1:
        return None  # sub_str not found
    # Get 3 chars to the left (if they exist)
    left = full_str[max(0, idx-3):idx]
    # Get 3 chars to the right (if they exist)
    right = full_str[idx+len(sub_str):idx+len(sub_str)+3]
    return left + sub_str + right

In [240]:
# These two methods are now unnecessary since we can determinately get which
# species the prototype was for just by looking at the species_cat of the 
# train set. The first three prototypes were learned for species cat 0, the
# next 3 for species cat 1, the next 3 for species cat 2, and so on.
def get_species_info_by_prototype(
        sequence,
        test_file = r"C:\Users\Sam\OneDrive\Desktop\eDNA\datasets\test_same_as_zurich.csv",
        train_file = r"C:\Users\Sam\OneDrive\Desktop\eDNA\datasets\train_oversampled_same_as_zurich.csv",
        ):
    """
    Gets the species name, family, and order of any sequence that happens to
    contain the given prototype. The species that has the most matching
    sequences is returned.
    """
    # Search the train set and the test set, since the sequence could have come
    # from either one.
    matches = pd.DataFrame(columns=["Species", "Family", "Order"])
    for file in (test_file, train_file):
        df = pd.read_csv(file)
        for row_idx in df.index:
            row = df.loc[row_idx]
            # print(seq)
            # print(sequence)
            # print()
            if sequence.lower() in row["seq"].lower(): # also true if equivalent
                matches.loc[len(matches)] = [row["species"], row["family"], row["order"]]
                # return (row["species"], row["family"], row["order"])
    if len(matches) == 0:
        return ("not found", "not found", "not found")
    else:
        most_matched_species = matches['Species'].mode()[0]
        row = matches[matches['Species'] == most_matched_species].iloc[0]
        return (row["Species"], row["Family"], row["Order"])


In [241]:
# These two methods are now unnecessary since we can determinately get which
# species the prototype was for just by looking at the species_cat of the 
# train set. The first three prototypes were learned for species cat 0, the
# next 3 for species cat 1, the next 3 for species cat 2, and so on.
def strip_trailing_N(s):
    return s.rstrip('N')

def matching_chars(s1, s2):
    s1, s2 = strip_trailing_N(s1), strip_trailing_N(s2)
    return sum(a == b for a, b in zip(s1, s2))

def most_similar_row(df, col, target):
    # Remove trailing 'N' from the target string as well
    target = strip_trailing_N(target)
    scores = df[col].apply(lambda x: matching_chars(x, target))
    print(scores)
    return scores.idxmax()  # Returns the row index with the highest score

def get_species_info_by_comparison(
        sequence,
        train_file = r"C:\Users\Sam\OneDrive\Desktop\eDNA\datasets\train_oversampled_same_as_zurich.csv",
        ):
    """
    Gets the species name, family, and order of the sequence in the train set
    that best matches the given sequence. Comparison is done by number
    of matching bases.
    """
    df = pd.read_csv(train_file)
    df["seq"] = df["seq"].str.lower()
    sequence = sequence.lower()
    row_idx = most_similar_row(df, "seq", sequence)
    row = df.loc[row_idx]
    return (row["species"], row["family"], row["order"])

In [242]:
# Folder of best prototypes:

# activations: an array of 31 integers, since the prototype is compared at 31 locations
# original: an array containing 4 arrays, each of length 70
# patch: the 10 bases in the original sequence that are the prototype (since prototype length is 5, and there is a max pool that halves the input)
path = r'C:\Users\Sam\OneDrive\Desktop\eDNA\protopnet\saved_prototypes\1892566_8_-1_latent_0.7\epoch-214\prototype_0_patch.npy'
data = np.load(path)
print(data)
print(data.shape)
print(array_to_sequence(data))

[[1. 0. 1. 0. 0. 0. 0. 1. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 1. 1. 1. 1. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
(4, 10)
ACACCCCACA


In [243]:
df = pd.DataFrame(columns=["Species", "Genus", "Family", "Order", "Original_Sequence", "Learned_Prototype", "Receptive_Field"])
species_cat = 0
counter = 0
for ptype_num in range(215):
    base_path = rf'C:\Users\Sam\OneDrive\Desktop\eDNA\protopnet\saved_prototypes\1892566_8_-1_latent_0.7\epoch-214\prototype_{ptype_num}_'
    
    orig_seq_path = base_path + 'original.npy'
    activations_path = base_path + 'activations.npy'
    patch_path = base_path + 'patch.npy'

    try:
        test = np.load(orig_seq_path)
    except:
        # Ignore prototypes that weren't saved I guess?
        continue

    orig_seq_arr = np.load(orig_seq_path)
    orig_seq_str = array_to_sequence(orig_seq_arr)

    patch_arr = np.load(patch_path)
    patch_str = array_to_sequence(patch_arr)

    receptive_field_str = extract_context(orig_seq_str, patch_str)

    train_df = pd.read_csv(r"C:\Users\Sam\OneDrive\Desktop\eDNA\datasets\train_oversampled_same_as_zurich.csv")
    row = train_df[train_df['species_cat'] == species_cat].iloc[0]

    # These two methods are now unnecessary since we can determinately get which
    # species the prototype was for just by looking at the species_cat of the 
    # train set. The first three prototypes were learned for species cat 0, the
    # next 3 for species cat 1, the next 3 for species cat 2, and so on.
    # species_info = get_species_info_by_prototype(patch_str)
    # species_info = get_species_info_by_comparison(orig_seq_str)

    df.loc[len(df)] = [row["species"], row["genus"], row["family"], row["order"], orig_seq_str, patch_str, receptive_field_str]
    # df.loc[len(df)] = ["Salmo salar", "waggoner marshes", "first order", orig_seq_str, patch_str, receptive_field_str]
    counter += 1
    if counter == 3:
        counter = 0
        species_cat += 1


In [244]:
df.to_csv("learned_prototypes.csv", index=False)

In [245]:
maine_species = pd.read_csv(r"C:\Users\Sam\OneDrive\Desktop\eDNA\datasets\all_data_maine.csv")

# Get boolean mask: True where df1['A'] values are in df2['B']
mask = df['Species'].isin(maine_species['Species'])

matches = df.loc[mask, 'Species'].unique()
print("Species also in maine dataset: ", matches)

Species also in maine dataset:  []
