In [2]:
#!/usr/bin/env python

from __future__ import division
import numpy as np
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

__author__ = 'Alejandro Fontal'

curr_dir = os.getcwd()
seqdir = curr_dir + "/seqs/"
seqfiles = os.listdir(seqdir)
aa_string = "ARNDCEQGHILKMFPSTWYVX"  # 20 aa + X


def get_1h_dict(aa_string):
    """
    Given a string of unique characters, generates dictionary of 1-hot vectors
    with characters as keys and vectors as values.

    """

    aa_dict = {}

    for idx, aa in enumerate(aa_string):

        if idx > 0:
            aa_dict[aa] = np.zeros(idx-1).tolist() + [1] +\
                          np.zeros(len(aa_string)-idx).tolist()
        else:
            aa_dict[aa] = [1] + np.zeros(len(aa_string)-1).tolist()

    return aa_dict

def seq2onehot(seq, aa_dict):
    """
    Takes a sequence(string) of length n and a dictionary with m keys and
    converts it to a 2 dimensional vector of nxm.

    """
    onehot = []

    # Convert ambiguous amino acids into actual ones.
    for aa in seq:

        if aa == "Z":
            aa = "Q"
        elif aa == "B":
            aa = "R"
        elif aa == "J":
            aa = "L"

        onehot += [aa_dict[aa]]

    return onehot


def fasta_process(fasta_fn):
    """
    :param fasta_fn: Filename of the FASTA file to parse
    :return: A list containing the sequences in the FASTA file.
    """
    with open(fasta_fn) as fasta_file:
        fasta_list = fasta_file.read().splitlines()

        parsed_seqs = []
        for line in fasta_list:
            if line.startswith(">"):
                pass

            else:
                l = len(line)
                if l > 1000:
                    parsed_seqs.append(line[0:500] + line[l-500:l])

                else:
                    parsed_seqs.append(line[0:l] + "X"*(1000-l))

    return parsed_seqs


all_seqs = []

for file in seqfiles:
    all_seqs.append(fasta_process(seqdir+file))


total_tensor = []

aa_dict = get_1h_dict(aa_string)

for idx, sub_loc in enumerate(all_seqs):

    sublabel = np.zeros(len(all_seqs))
    sublabel[idx] = 1

    for seq in sub_loc:
        total_tensor.append((sum(seq2onehot(seq, aa_dict), []), sublabel))

print "Total number of sequences: {}".format(len(total_tensor))

train_n = int(round(0.8 * len(total_tensor), 0))

train_idxs = np.random.choice(len(total_tensor), train_n, replace=False)
test_idxs = list(set(range(len(total_tensor))) - set(train_idxs))

train_tensor = [total_tensor[i] for i in train_idxs]
test_tensor = [total_tensor[i] for i in test_idxs]

print "Sequences in training set: {}".format(len(train_tensor))
print "Sequences in test set: {}\n".format(len(test_tensor))


Total number of sequences: 5510
Sequences in training set: 4408
Sequences in test set: 1102



In [11]:
import tensorflow as tf

n_labels = 10
aa_vec_len = 21
seq_len = 1000
n_iters = 2000
minibatch_size = 500
learn_step = 0.5


def get_batch(tensor, n=100):
    """Gets a minibatch from a tensor

    Takes a tensor of shape t = [[[seq_1][lab_1]], ..., [[seq_n][lab_n]]] and
    randomly takes n samples, returning a tensor x = [[seq_1], ..., [seq_n]]
    and a tensor y = [[lab_1], ..., [lab_n]].
    """
    idxs = np.random.choice(len(tensor), n, replace=False)
    x = [tensor[i][0] for i in idxs]
    y = [tensor[i][1] for i in idxs]

    return x, y


def weight_variable(shape, name="W"):
    """Generates weight variables

    Provides a tensor of weight variables obtained from a truncated normal
    distribution with mean=0 and std=0.1. All values in range [-0.1, 0.1]
    """

    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape, name="B"):
    """Provides a tensor of bias variables with value 0.1"""


    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)



input_tensor = train_tensor  # Import train set

test_set = test_tensor

x_test = [test_set[i][0] for i in range(len(test_set))]
y_test = [test_set[i][1] for i in range(len(test_set))]

sess = tf.InteractiveSession()  # Start tensorflow session


def fc_layer(input_tensor, input_dim, output_dim, name="fc", relu=True):
    """Generates a fully connected layer with biases and weights

    Computes a fully connected layer when provided with an input tensor and
    returns an output tensor. Input and output channels must be specified.
    By default, the output uses a ReLu activation function.
    """

    with tf.name_scope(name):
        w = weight_variable([input_dim, output_dim])
        b = bias_variable([output_dim])
        out = tf.matmul(input_tensor, w) + b
        tf.summary.histogram("weights", w)
        tf.summary.histogram("biases", b)

        if relu:
            return tf.nn.relu(out)
        else:
            return out


# Define variables of the network:
x = tf.placeholder(tf.float32, [None, seq_len * aa_vec_len], name="x")
y_ = tf.placeholder(tf.float32, [None, n_labels], name="labels")

y = fc_layer(x, seq_len * aa_vec_len, n_labels, relu=False, name="fc")

tf.global_variables_initializer().run()  # Initialize variables

#  Define cost_function (cross entropy):
with tf.name_scope("crossentropy"):
    cross_entropy = tf.reduce_mean(
        tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y))
    tf.summary.scalar("crossentropy", cross_entropy)

with tf.name_scope("train"):
    train_step = tf.train.GradientDescentOptimizer(learn_step).\
        minimize(cross_entropy)

with tf.name_scope("accuracy"):
    correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    tf.summary.scalar("accuracy", accuracy)

summ = tf.summary.merge_all()
writer = tf.summary.FileWriter("/home/alejandro/Documents/GitHub/Thesis/logs")
writer.add_graph(sess.graph)

for i in range(n_iters):
    i += 1
    a, b = get_batch(input_tensor, n=minibatch_size)
    train_step.run(feed_dict={x: a, y_: b})
    tf.summary.scalar("crossentropy", cross_entropy)
    
    """
    if i % 5 == 0:
        a, b = get_batch(input_tensor, n=minibatch_size)
        [train_accuracy, s] = sess.run([accuracy, summ], feed_dict={x: a, y_: b})
        writer.add_summary(s, i)
   """
    if i % 100 == 0 or i == 1:  # Check only in iterations multiple of 100
        a, b = get_batch(input_tensor, n=len(input_tensor))  # Check in full train set
        train_acc = accuracy.eval(feed_dict={x: a, y_: b})
        test_acc = accuracy.eval(feed_dict={x: x_test, y_: y_test})
        print "Iteration number " + str(i) + ":\n"
        print "Train accuracy: {}%\t Test Accuracy: {}%\n".format(
            round(train_acc*100, 3), round(test_acc*100, 2))

        if train_acc == 1:
            break

Iteration number 1:

Train accuracy: 26.293%	 Test Accuracy: 26.41%



KeyboardInterrupt: 