# Imports + Globals

In [None]:
MAX_LEN = 243
MIN_LEN = 0

TASK_TYPE = 'DNA'
MAX_LEN_PROTEIN = MAX_LEN // 3 - 6
MIN_LEN_PROTEIN = MIN_LEN // 3 - 6
print(f'Protein length: {MAX_LEN_PROTEIN}')

SEQ_LENGTH = MAX_LEN
DIM = 50
KERNEL_SIZE = 5
BATCH_SIZE = 128
N_CHAR = 5
NOISE_SHAPE = 128

# Check for RAM capacity purposes
print(f'RESOURCE: {SEQ_LENGTH * DIM}')


# SELECT PATH
# for protein sequences
path = '/content/gdrive/My Drive/protein_cleaned.csv'
# path for fbgan history
LOSS_PATH = '/content/gdrive/My Drive/CS496 final project/FBGAN-history/loss.csv'
BEST_PATH = '/content/gdrive/My Drive/CS496 final project/FBGAN-history/best.csv'
AVERAGE_PATH = '/content/gdrive/My Drive/CS496 final project/FBGAN-history/average.csv'

Protein length: 75
RESOURCE: 12150


In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.preprocessing import text, sequence
from keras.preprocessing.text import Tokenizer
from sklearn.model_selection import train_test_split
from keras.metrics import categorical_accuracy
from keras.utils import to_categorical
import seaborn as sns
from matplotlib import rc
import matplotlib as plt
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing import sequence
from keras.models import Model, Input
from tensorflow.keras.layers import Conv1D,Input,Dense, Reshape, ReLU, Permute,Softmax, LSTM, Embedding, Dense, TimeDistributed, Bidirectional, LayerNormalization
from sklearn.preprocessing import MultiLabelBinarizer


from google.colab import drive
drive.mount('/content/gdrive')

sns.set_context("paper")
sns.color_palette("cubehelix", 8)
sns.set_style("whitegrid", {'axes.grid' : False})
# Using seaborn's style
plt.style.use('seaborn')
# With LaTex fonts
sns.set_context("paper")


# Set the global font to be DejaVu Sans, size 10 (or any other sans-serif font of your choice!)
rc('font',**{'family':'sans-serif','sans-serif':['DejaVu Sans'],'size':9})

# Set the font used for MathJax - more on this later
rc('mathtext',**{'default':'regular'})

%config InlineBackend.figure_format = 'retina'

Mounted at /content/gdrive


## Plots

In [None]:
from matplotlib import rc

plt.style.use('default')
sns.set_context("paper")
sns.set_palette("husl", 5)


# Set the global font to be DejaVu Sans, size 10 (or any other sans-serif font of your choice!)
rc('font',**{'family':'sans-serif','sans-serif':['DejaVu Sans'],'size':9})

# Set the font used for MathJax - more on this later
rc('mathtext',**{'default':'regular'})

%config InlineBackend.figure_format = 'retina'


def plot_history(history,name,metrics):

  fig, axs = plt.subplots(len(metrics)//2,figsize=(10,15))
  plt.subplots_adjust(hspace=0.4)
  fig.suptitle(name)
  # summarize history for each metric
  for i,metric in enumerate(metrics[:4]):
    #first half of metric dictionary is train, second half - validation
    axs[i].plot(history.history[metric])
    axs[i].plot(history.history['val_'+metric])
    if metric == 'loss':
      metric = 'binary_crossentropy'
    axs[i].set_title('model`s ' + metric)
    axs[i].set_ylabel(metric)
    axs[i].set_xlabel('epoch')
    axs[i].legend(['train', 'test'], loc='upper left')
  plt.show()

## Data misc

In [None]:
DNA_protein_MAP = {
            'ATA': 'I', 'ATC': 'I', 'ATT': 'I', 'ATG': 'M',
            'ACA': 'T', 'ACC': 'T', 'ACG': 'T', 'ACT': 'T',
            'AAC': 'N', 'AAT': 'N', 'AAA': 'K', 'AAG': 'K',
            'AGC': 'S', 'AGT': 'S', 'AGA': 'R', 'AGG': 'R',
            'CTA': 'L', 'CTC': 'L', 'CTG': 'L', 'CTT': 'L',
            'CCA': 'P', 'CCC': 'P', 'CCG': 'P', 'CCT': 'P',
            'CAC': 'H', 'CAT': 'H', 'CAA': 'Q', 'CAG': 'Q',
            'CGA': 'R', 'CGC': 'R', 'CGG': 'R', 'CGT': 'R',
            'GTA': 'V', 'GTC': 'V', 'GTG': 'V', 'GTT': 'V',
            'GCA': 'A', 'GCC': 'A', 'GCG': 'A', 'GCT': 'A',
            'GAC': 'D', 'GAT': 'D', 'GAA': 'E', 'GAG': 'E',
            'GGA': 'G', 'GGC': 'G', 'GGG': 'G', 'GGT': 'G',
            'TCA': 'S', 'TCC': 'S', 'TCG': 'S', 'TCT': 'S',
            'TTC': 'F', 'TTT': 'F', 'TTA': 'L', 'TTG': 'L',
            'TAC': 'Y', 'TAT': 'Y', 'TAA': 'P', 'TAG': 'P',
            'TGC': 'C', 'TGT': 'C', 'TGA': 'P', 'TGG': 'W',
        }

protein_DNA_MAP = {v: k for k, v in DNA_protein_MAP.items()}
protein_DNA_MAP['P'] = 'TAG'


def protein_to_DNA(protein_sequences):
    global protein_DNA_MAP

    parsed = parse(protein_sequences)

    DNA_sequences = []
 
    if type(parsed[0]) in (str, np.str_):
        DNA_merged = ''.join([a for a in parsed])
        DNA_sequences += ['ATG'  +  DNA_merged + "TAG"]
        return DNA_sequences

    for seq in parsed:
        DNA = [protein_DNA_MAP[a] for a in seq]
        DNA_merged = ''.join([a for a in DNA])
        DNA_sequences += ['ATG'  +  DNA_merged + "TAG"]

    DNA_sequences = np.array(DNA_sequences).reshape(-1,1)
    return DNA_sequences

def parse(sequences):
    if type(sequences) == str:
        parsed = np.array([a for a in sequences])
        return parsed

    parse = lambda seq: np.array([a for a in seq])
    parsed = pd.DataFrame(sequences).iloc[:,0].apply(parse).to_numpy()

    return parsed

def translate(seq): 
       
    table = { 
        'ATA':'I', 'ATC':'I', 'ATT':'I', 'ATG':'M', 
        'ACA':'T', 'ACC':'T', 'ACG':'T', 'ACT':'T', 
        'AAC':'N', 'AAT':'N', 'AAA':'K', 'AAG':'K', 
        'AGC':'S', 'AGT':'S', 'AGA':'R', 'AGG':'R',                  
        'CTA':'L', 'CTC':'L', 'CTG':'L', 'CTT':'L', 
        'CCA':'P', 'CCC':'P', 'CCG':'P', 'CCT':'P', 
        'CAC':'H', 'CAT':'H', 'CAA':'Q', 'CAG':'Q', 
        'CGA':'R', 'CGC':'R', 'CGG':'R', 'CGT':'R', 
        'GTA':'V', 'GTC':'V', 'GTG':'V', 'GTT':'V', 
        'GCA':'A', 'GCC':'A', 'GCG':'A', 'GCT':'A', 
        'GAC':'D', 'GAT':'D', 'GAA':'E', 'GAG':'E', 
        'GGA':'G', 'GGC':'G', 'GGG':'G', 'GGT':'G', 
        'TCA':'S', 'TCC':'S', 'TCG':'S', 'TCT':'S', 
        'TTC':'F', 'TTT':'F', 'TTA':'L', 'TTG':'L', 
        'TAC':'Y', 'TAT':'Y', 'TAA':'', 'TAG':'', 
        'TGC':'C', 'TGT':'C', 'TGA':'', 'TGG':'W', 
    } 
    protein ="" 
    seq = seq[0].split('P')[0]
    for i in range(0, len(seq), 3): 
      try:
        codon = seq[i:i + 3] 
        protein+= table[codon] 
      except:
        protein+= ""
    return protein

def DNA_to_protein(sequences):
    result = []
    for seq in sequences:
      result.append(translate(seq))
    return result

In [None]:
class OneHot_Seq:
    def __init__(self, letter_type='amino acids', letters = None, max_length=MAX_LEN):
        """
        :param letter_type: str 'amino acids' or 'DNA'. If a different type is used, provide custom letters.
        :param max_length: int maximum length of a sequence. Sequences will be padded to this length.
        """

        if letter_type == 'amino acids':
            self.letters = ['A', 'R', 'N', 'D', 'C', 'Q', 'E', 'G', 'H', 'I', 'L', 'K', 'M', 'F', 'P', 'S', 'T',
                            'W', 'Y', 'V']
        elif letter_type == 'DNA':
            self.letters = ['A', 'T', 'C', 'G']

        else:
            assert letters is not None
            self.letters = letters

        self.letters_dict = {f'{aa}': i + 1 for i, aa in enumerate(self.letters)}
        self.invert_dict = {v: k for k, v in self.letters_dict.items()}
        self.invert_dict[0] = 'P'

        self.max_length = max_length

    def _parse_pad_sequences(self, sequences):

        parse = lambda seq: np.array([a for a in seq])
        parsed = pd.DataFrame(sequences).iloc[:, 0].apply(parse)

        for i in range(parsed.shape[0]):
            parsed[i] = np.vectorize(self.letters_dict.get)(parsed[i])

        parsed = pad_sequences(parsed, maxlen=self.max_length, value=0, padding='post')

        return parsed

    def seq_to_onehot(self, sequences):
        """
        Return an array of one-hot encodings from sequence strings.
        :param sequences: ndarray of strings, shape = (N,1) where N is the number of samples
        :return: array of onehot encoded sequences, shape = (N, max_length, amino_acids)
        """
        sequences = self._parse_pad_sequences(sequences)
        onehot = []

        for seq in sequences:
            onehot_seq = np.zeros((seq.size, len(self.letters) + 1))
            onehot_seq[np.arange(seq.size), seq] = 1
            onehot.append(onehot_seq)

        return np.array(onehot)

    def onehot_to_seq(self, sequences):
        """
        Returns an array of strings from one-hot encoding.
        :param sequences: ndarray of shape (N, max_length, amino_acids) where N is the number of samples
        :return: array of strings of shape (N, 1)
        """
        if sequences.ndim == 2:
            sequences = np.argmax(sequences, axis=1)
            sequences = np.vectorize(self.invert_dict.get)(sequences)
            decoded_sequences = [''.join([aa for aa in sequences])]
            return decoded_sequences

        sequences = np.argmax(sequences, axis=2)
        sequences = np.vectorize(self.invert_dict.get)(sequences)
        decoded_sequences = [[''.join([aa for aa in seq])] for seq in sequences]

        return decoded_sequences

def get_sequences(path, split = 0.2, min_len = MIN_LEN, max_len = MAX_LEN_PROTEIN):
    df = pd.read_csv(path)
    input_seqs, target_seqs = df[['seq', 'sst8']][(df.len >= min_len) & (df.len <= max_len) & (~df.has_nonstd_aa)].values.T
    seq_train, seq_test, target_train, target_test = train_test_split(input_seqs, target_seqs, test_size= split,random_state=1)
    
    return seq_train,seq_test,target_train,target_test

def get_dataset(sequences, batch_size = BATCH_SIZE):
    dataset = tf.data.Dataset.from_tensor_slices(sequences)
    dataset = dataset.shuffle(sequences.shape[0], seed=0).batch(batch_size)
    return dataset

def prepare_dataset(path, split = 0.01):
    # Load dataset for training  FBGAN

    # Load protein sequences and shuffle them
    X_train,_, _, _ = get_sequences(path, split)
    X_train = X_train.tolist()
    np.random.shuffle(X_train)

    print(f'Number of training samples: {len(X_train)}')

    # Translate to DNA encoding
    X = protein_to_DNA(X_train)
    #print(f'Example of translated DNA sequences: \n {X[:3]}')

    # One Hot encode into 5 categories, ATCG and P for padded positions
    OneHot = OneHot_Seq(letter_type= 'DNA')
    real_sequences = OneHot.seq_to_onehot(X)
    real_sequences

    #print(f'Example of OneHot encoding of DNA sequences: {real_sequences[0]}')

    return real_sequences

# GAN

## ResBlock

In [None]:
def softmax(logits):
    shape = tf.shape(logits)
    res = tf.nn.softmax(tf.reshape(logits, [-1,N_CHAR]))
    return tf.reshape(res, shape)

class ResidualBlock(tf.keras.layers.Layer):

    def __init__(self):
        super(ResidualBlock, self).__init__()
        self.relu = ReLU()
        self.conv1d_1 = Conv1D(filters=DIM, kernel_size=KERNEL_SIZE, padding='same', strides=1, activation='relu')
        self.conv1d_2 = Conv1D(filters=DIM, kernel_size=KERNEL_SIZE, padding='same', strides=1)

    def __call__(self,X,alpha = 0.3):
        x = self.relu(X)
        x = self.conv1d_1(x)
        x = self.conv1d_2(x)
        return x + alpha*x

class Generator(tf.keras.Model):

    def __init__(self):
        """
        implementation of Generator
        :param input_size: size of the sequence (input noise)
        """
        super(Generator, self).__init__(name='generator')

        self.model = tf.keras.models.Sequential()
        self.model.add(Input(shape = (NOISE_SHAPE,), batch_size = BATCH_SIZE))
        self.model.add(Dense(units = DIM*SEQ_LENGTH))
        self.model.add(Reshape((SEQ_LENGTH, DIM)))

        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())

        self.model.add(Conv1D(filters = N_CHAR, kernel_size = 1))

    def call(self, inputs):
        x = self.model(inputs)
        x = softmax(x)
        return x

class Discriminator(tf.keras.Model):

    def __init__(self, clip = 1):
        """
        implementation of Discriminator
        :param clip: value to which you clip the gradients (or False)
        """
        super(Discriminator, self).__init__(name='discriminator')

        self.model = tf.keras.models.Sequential()
        self.model.add(Input(shape = (SEQ_LENGTH,N_CHAR), batch_size = BATCH_SIZE))
        self.model.add(Conv1D(filters = DIM, kernel_size = 1))

        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())
        self.model.add(ResidualBlock())

        self.model.add(Reshape((-1,DIM*SEQ_LENGTH)))
        self.model.add(Dense(units = DIM*SEQ_LENGTH))
        self.model.add(Dense(units = 1))

    def call(self,inputs,training = False):
        """
        model's forward pass
        :param X: input of the size [batch_size, seq_length];
        :param training: specifies the behavior of the call;
        :return: Y: probability of each sequences being real of shape [batch_size, 1]
        """
        x = self.model(inputs)
        return x

## GAN class

In [None]:
class GAN():

    def __init__(self, batch_size = BATCH_SIZE, discriminator_steps = 0, lr = 0.0002, 
                 gradient_penalty_weight = 5, generator_weights_path=None, discriminator_weights_path=None):
        self.batch_size = batch_size
        self.G = Generator()
        self.D = Discriminator()

        self.d_steps = discriminator_steps
        
        self.history = {"G_losses": [], "D_losses": [], "gradient_penalty": [], "sequences": []}

        self.G_optimizer = tf.keras.optimizers.Adam(learning_rate=lr, beta_1=0.5, beta_2=0.9)
        self.D_optimizer = tf.keras.optimizers.Adam(learning_rate=lr, beta_1=0.5, beta_2=0.9)

        self.gp_weight = gradient_penalty_weight
        self.step_log = None

        self.checkpoint_dir = '/content/gdrive/My Drive/CS496 final project/weights/'

        if generator_weights_path:
            self.G.load_weights(generator_weights_path)

        if discriminator_weights_path:
            self.D.load_weights(discriminator_weights_path)

        
    def generate_samples(self, number=None, decoded = False):
        if number is None:
            number = self.batch_size
        z = tf.random.normal([number, NOISE_SHAPE])
        generated = self.G(z)
        
        if decoded:
            OneHot = OneHot_Seq(letter_type= TASK_TYPE)
            generated = OneHot.onehot_to_seq(generated)
            
        return generated

    def generator_loss(self, fake_score):
        return -tf.math.reduce_mean(fake_score)

    def discriminator_loss(self, real_score, fake_score):
        # fake_score_mean = tf.math.reduce_mean(fake_score)
        # real_score_mean = tf.math.reduce_mean(real_score)
        # loss = real_score_mean - fake_score_mean
        # return, loss, fake_score_mean, real_score_mean
        return tf.math.reduce_mean(fake_score) - tf.math.reduce_mean(real_score)

    #@tf.function
    def gradient_penalty(self, real_samples, fake_samples):
        alpha = tf.random.normal([self.batch_size, 1, 1], 0.0, 1.0)
        real_samples = tf.cast(real_samples, tf.float32)
        diff = fake_samples - real_samples
        interpolated = real_samples + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = self.D(interpolated, training=True)

        grads = gp_tape.gradient(pred, [interpolated])[0]
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        
        return gp

    #@tf.function
    def G_train_step(self):
        with tf.GradientTape() as tape:
            fake_samples = self.generate_samples()
            fake_score = self.D(fake_samples, training=True)
            G_loss = self.generator_loss(fake_score)

        G_gradients = tape.gradient(G_loss, self.G.trainable_variables)
        self.G_optimizer.apply_gradients((zip(G_gradients, self.G.trainable_variables)))

        return G_loss

    #@tf.function
    def D_train_step(self, real_samples):
        with tf.GradientTape() as tape:
            fake_samples = self.generate_samples()
            real_score = self.D(real_samples, training=True)
            fake_score = self.D(fake_samples, training=True)

            D_loss = self.discriminator_loss(real_score, fake_score)
            GP = self.gradient_penalty(real_samples, fake_samples) * self.gp_weight
            D_loss = D_loss + GP

        D_gradients = tape.gradient(D_loss, self.D.trainable_variables)
        self.D_optimizer.apply_gradients((zip(D_gradients, self.D.trainable_variables)))

        return D_loss, GP

    def create_dataset(self, inputs):
        dataset = tf.data.Dataset.from_tensor_slices(inputs)
        dataset = dataset.shuffle(inputs.shape[0], seed=0).batch(self.batch_size, drop_remainder = True)
        return dataset


    def train(self, inputs, epochs, step_log = 50, save_per_epochs = None):
        n_steps = len(self.create_dataset(inputs)) * epochs
        step = 0
        self.step_log = step_log
        
        if save_per_epochs is None:
            save_per_epochs = epochs - 1
        
        # Pre-train discriminator 
        print('Pretraining discriminator...')
        for step in range(self.d_steps):
            dataset = self.create_dataset(inputs)
            
            for sample_batch in dataset:
                self.D_train_step(sample_batch)

        # Train discriminator and generator
        for epoch in range(epochs):
            dataset = self.create_dataset(inputs)

            print(f"Epoch {epoch}/{epochs}:")

            for sample_batch in dataset:
                G_loss = self.G_train_step()
                D_loss, GP = self.D_train_step(sample_batch)
                
                if step %  self.step_log == 0:
                    example_sequence = self.get_highest_scoring()
                    self.history["G_losses"].append(G_loss.numpy())
                    self.history["D_losses"].append(D_loss.numpy())
                    self.history['gradient_penalty'].append(GP.numpy())
                    self.history['sequences'].append(example_sequence)
                    print(f'\t Step {step}/{n_steps} \t Generator: {G_loss.numpy()} \t Discriminator: {D_loss.numpy()} \t Sequence: {example_sequence}')
                step += 1
            
            # if epoch % save_per_epochs == 0:
            #     self.G.save_weights(os.path.join(self.checkpoint_dir, f'E{epoch}_Generator'))
            #     self.D.save_weights(os.path.join(self.checkpoint_dir, f'E{epoch}_Discriminator'))
                

    def get_highest_scoring(self, num_to_generate = BATCH_SIZE, num_to_return = 1, decoded = True):
        fake_samples = self.generate_samples(num_to_generate)
        fake_scores = self.D(fake_samples)
        best_indx = np.argmax(fake_scores)
        best_seq = fake_samples[best_indx].numpy()

        if decoded:
            OneHot = OneHot_Seq(letter_type=TASK_TYPE)
            best_seq = OneHot.onehot_to_seq(best_seq)

        return best_seq

    def plot_history(self):
        D_losses = np.array(self.history['D_losses'])
        G_losses = np.array(self.history['G_losses'])

        plt.plot(np.arange(D_losses.shape[0]), D_losses, label='Discriminator loss')
        plt.plot(np.arange(G_losses.shape[0]), G_losses, label='Generator loss')
        plt.ylabel('Loss')
        plt.xlabel(f'Steps (x{self.step_log})')
        plt.legend()
        
        plt.show()

    def show_sequences_history(self):
        sequences_history = self.history['sequences']
        print('History of top scoring generated sequences... \n')
        for i in range(len(sequences_history)):
            print(f'Step {i*self.step_log}: \t {sequences_history[i][0]}')

# Feedback Net

## Train from scratch

### Get & Transform data

In [None]:
global n_words, MAX_LEN
MAX_LEN = 128 #length of max sequence we want to consider for training
n_tags = 8 #number of classes in 8-state prediction


def triplets(sequences):
    """
    Apply sliding window of length 3 to each sequence in the input list
    :param sequences: list of sequences
    :return: numpy array of triplets for each sequence
    Usage: Split protein sequence into triplets of aminoacids
    """
    return np.array([[aminoacids[i:i+3] for i in range(len(aminoacids))] for aminoacids in sequences])

def transform_sequence(seqs, tokenizer_encoder = None):
  # transforms sequences for input into feedback net, tokenizes + adds padding
  # if there is no given tokenizer_encoder -> initialize one and fit it on a given sequence
  # o.w. just transform the sequence with given tokenizer
  # returns transformed sequences + tokenizer that was fit on the input dataset
  if not tokenizer_encoder:
    tokenizer_encoder = Tokenizer()
    input_grams = triplets(seqs)
    tokenizer_encoder.fit_on_texts(input_grams)
  transformed = tokenizer_encoder.texts_to_sequences(input_grams)
  transformed = sequence.pad_sequences(transformed, maxlen=MAX_LEN, padding='post')
  return transformed,tokenizer_encoder


def get_data_for_feedback(path='/content/gdrive/My Drive/protein_structure.csv', MAX_LEN = 128):
  df = pd.read_csv('/content/gdrive/My Drive/protein_structure.csv')
  input_seqs, target_seqs = df[['seq', 'sst8']][(df.len <= MAX_LEN) & (~df.has_nonstd_aa)].values.T

  # Transform features
  input_data,tokenizer = transform_sequence(input_seqs)
  #Transform targets
  mlb = MultiLabelBinarizer()
  target_data = mlb.fit_transform(target_seqs)

  X_train, X_test, y_train, y_test = train_test_split(input_data, target_data, test_size=.3, random_state=1)
  return X_train,X_test,y_train,y_test, tokenizer

### Declare & train the net

In [None]:
global n_words,save_feedback
X_train, X_test, y_train, y_test, tokenizer = get_data_for_feedback()
n_words = len(tokenizer.word_index) + 1
save_feedback = "/content/gdrive/My Drive/saved_models/multilabel_feedback" #path where we want to save weights

class Feedback():
  def __init__(self):
    input = Input(shape=(MAX_LEN,))
    x = Embedding(input_dim=n_words, output_dim=128, input_length=MAX_LEN)(input)
    x = LayerNormalization()(x)
    x = Bidirectional(LSTM(units=128, return_sequences=True,use_bias=True))(x)
    x = Bidirectional(LSTM(units=128, return_sequences=True,use_bias=True))(x)
    x = Bidirectional(LSTM(units=128,use_bias=True))(x)
    y = Dense(n_tags, activation="sigmoid")(x)
    self.model = Model(input, y)
  
  def train(self,OPTIM="rmsprop", LOSS='binary_crossentropy', BATCH_SIZE =128, EPOCHS = 5):
    self.model.compile(optimizer=OPTIM, loss=LOSS, metrics=[tf.keras.metrics.Precision(), 
                                                                            tf.keras.metrics.Recall(),
                                                                            tf.keras.metrics.Hinge()])
    history = self.model.fit(X_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS,
                              validation_data=(X_test, y_test), verbose=1)
    self.model.save(save_feedback)
    return history

In [None]:
#model = Feedback()
#history = model.train()
#plot_history(history, "Feedback Net for Multilabel Classification of Sequence",list(history.history.keys()))

## Load weights for feedback

In [None]:
save_feedback = "/content/gdrive/My Drive/saved_models/multilabel_feedback" #path where weights are saved

Feedback = tf.keras.models.load_model(save_feedback)

KeyboardInterrupt: ignored

# Feedback GAN

In [None]:


class GAN_FBNet():

    def __init__(self, generator_path=None, discriminator_path=None,
                 fbnet_path="/content/gdrive/My Drive/saved_models/multilabel_feedback", features=[]):
        self.GAN = GAN(generator_weights_path=generator_path, discriminator_weights_path=discriminator_path)
        self.FBNet = tf.keras.models.load_model(fbnet_path)
        _,_,_,_,self.tokenizer = get_data_for_feedback()
        self.label_order = np.array(['B','C','E','G','H','I','S','T'])
        self.desired_features = features
        self.data = None
        self.checkpoint_dir = './'

    def get_scores(self, inputs):
        # convert the DNA sequences to protein sequences
        protein_sequence = DNA_to_protein(inputs)
        input_grams = triplets(protein_sequence)
        transformed = self.tokenizer.texts_to_sequences([list(i) for i in input_grams])
        transformed = sequence.pad_sequences(transformed, maxlen=MAX_LEN, padding='post')
        # use FBNet to grade the sequences
        scores = self.FBNet.predict(transformed)
        return scores

    def get_score_per_feature(self, scores):
        scores = np.array(scores)
        avg_scores = np.rint(100*np.mean(scores, axis = 0))
        score_per_feature = []
        for feature in self.desired_features:
            i = int(np.where(feature == self.label_order)[0])
            score_i = int(avg_scores[i])
            fscore = (feature, score_i)
            score_per_feature.append(fscore)     
        return score_per_feature
    
    def get_best_score_per_feature(self, scores):
      #return feature-score pairs
      score_per_feature = []
      best_scores = np.rint(100*np.max(scores,axis=0))
      for feature in self.desired_features:
            i = int(np.where(feature == self.label_order)[0])
            score_i = int(best_scores[i])
            fscore = (feature, score_i)
            score_per_feature.append(fscore)     
      return score_per_feature

    def add_samples(self, generated, scores, score_threshold=0.1, replace=False):
        best_index = scores > score_threshold
        best_samples = []
        best_scores = []
        for i in range(len(best_index)):
          passed_threshold = set(self.label_order[best_index[i]])
          if set(self.desired_features).issubset(passed_threshold):
            best_samples.append(generated[i])
            best_scores.append(scores[i])
        if replace:
            pass
        else:
          if best_samples: #make sure array is not empty before adding
            self.data = np.concatenate((self.data, np.array(best_samples)), axis=0)
        return best_samples, best_scores

    def train(self, inputs, epochs, step_log=50, steps_per_epoch = 100, batch_size = BATCH_SIZE):
        self.data = inputs
        self.batch_size = BATCH_SIZE
        with open(BEST_PATH,'w') as f:
          writer = csv.writer(f, delimiter=',')
          writer.writerow([x for x in self.desired_features])
        with open(AVERAGE_PATH,'w') as f:
          writer = csv.writer(f, delimiter=',')
          writer.writerow([x for x in self.desired_features])
        for epoch in range(epochs):
            dataset = self.create_dataset(self.data)

            print(f'Epoch {epoch} / {epochs}')
            
            step = 0

            for sample_batch in dataset:
                G_loss = self.GAN.G_train_step()
                D_loss, GP = self.GAN.D_train_step(sample_batch)

                generated = self.GAN.generate_samples(number=BATCH_SIZE, decoded=False)
                OneHot = OneHot_Seq(letter_type= TASK_TYPE)
                decoded_generated = OneHot.onehot_to_seq(generated)
                scores = self.get_scores(decoded_generated)
                generated = tf.cast(generated, tf.float32)
                best_samples,best_scores = self.add_samples(generated, scores)
                if step % step_log == 0:
                        with open(LOSS_PATH,'a') as f:
                          writer = csv.writer(f, delimiter=',')
                          writer.writerow([G_loss.numpy(),D_loss.numpy(),int((len(self.data) - len(inputs)) / len(self.data)*100)])
                        print(f'\tStep {step}\n   \tGenerator: {G_loss.numpy()}   Discriminator: {D_loss.numpy()}   Samples: {len(self.data)}')

                        print('\tBest scores per feature: ', end = ' ') 
                        score_per_feature = self.get_best_score_per_feature(scores)
                        pprint = [f'{sc[0]}: {sc[1]}%' for sc in score_per_feature]
                        with open(BEST_PATH,'a') as f:
                          writer = csv.writer(f, delimiter=',')
                          writer.writerow([sc[1] for sc in score_per_feature])
                        print(*pprint, sep = ' ')

                        print('\tAverage scores per feature: ', end = ' ') 
                        score_per_feature = self.get_score_per_feature(scores)
                        with open(AVERAGE_PATH,'a') as f:
                          writer = csv.writer(f, delimiter=',')
                          writer.writerow([sc[1] for sc in score_per_feature])
                        pprint = [f'{sc[0]}: {sc[1]}%' for sc in score_per_feature]
                        print(*pprint, sep = ' ')
                    
                if step == 100:
                    break

                step += 1
            percent_fake = int((len(self.data) - len(inputs)) / len(self.data)*100)
            print(f'\tPercent of the fake samples in the discriminator: {percent_fake}%.')
                
                
    def create_dataset(self, inputs):
        dataset = tf.data.Dataset.from_tensor_slices(inputs)
        dataset = dataset.shuffle(inputs.shape[0], seed=0).batch(self.batch_size, drop_remainder=True)
        return dataset

# Run

Loop writes to csv file located at path LOSS_PATH. Each row represents a log step. Columns are as follows:

Generator Loss, Discriminator Loss, Samples

It also writes average scores to AVERAGE_PATH with columns representing desired features (same for BEST_PATH with best scores)

In [None]:
real_sequences = prepare_dataset(path)
path_G = '/content/gdrive/My Drive/CS496 final project/weights/weights_generator_243'
path_D = '/content/gdrive/My Drive/CS496 final project/weights/weights_discriminator_243'

ganfb = GAN_FBNet(path_G,path_D, features=['I'])

Number of training samples: 33633


In [None]:
print("SEQUENCES BEFORE TRAINING:")
print(ganfb.GAN.generate_samples(number = 3, decoded=True))


SEQUENCES BEFORE TRAINING:
[['ATGGGTTCTGAGGCTTCGTAGGAGCAGGAGTCTGATGATTAGACGAAGAAGGATTCTGTTTTGGPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP'], ['ATGGGTGAGTATCAGGCTTTGAAGGATCAGGGGCATGAGCAGGGTTCTGAGGATCGTGGTAATGGGCAGGGTTCTGATACTTCTAAGGGTCGTGCTGCTCATGGTTTGGAGGGTTTGTTTGAGTATGCGTATTTGTCTTAGTAGATGATTAAGCAGAGTTTGPCPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP'], ['ATGGGTTTTGGGTAGTTGGGTATTTGTTTTGAGGATTATGGTAATGGTTTGAAGGGTTCTGGTTTGTATGGTTAGGTTGGGTGTGGGTCGTAGGTTTTGAAGGAGCATCGGGGTTAGTAGATTATTTAGPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP']]


In [None]:
import csv
ganfb.train(real_sequences, epochs = 50, step_log = 20)

Epoch 0 / 50


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

	Step 0
   	Generator: 4.701275825500488   Discriminator: -1.131258487701416   Samples: 33633
	Best scores per feature:  I: 7%
	Average scores per feature:  I: 0%
	Step 20
   	Generator: 4.331458568572998   Discriminator: -1.3525761365890503   Samples: 33634
	Best scores per feature:  I: 6%
	Average scores per feature:  I: 0%
	Step 40
   	Generator: -1.3928693532943726   Discriminator: -1.2159347534179688   Samples: 33635
	Best scores per feature:  I: 5%
	Average scores per feature:  I: 0%
	Step 60
   	Generator: 0.8399980664253235   Discriminator: -1.5080925226211548   Samples: 33635
	Best scores per feature:  I: 3%
	Average scores per feature:  I: 0%
	Step 80
   	Generator: 2

In [None]:
samples = ganfb.GAN.generate_samples(number = 3, decoded=True)
print(samples)