In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

In [2]:
# Preprocessing

def onehotify(tensor):
  vocab = {'A':'1', 'C':'2', 'G':'3', 'T':'0'}
  for key in vocab.keys():
    tensor = tf.strings.regex_replace(tensor, key, vocab[key])
  split = tf.strings.bytes_split(tensor)
  labels = tf.cast(tf.strings.to_number(split), tf.uint8)
  onehot = tf.one_hot(labels, 4)
  onehot = tf.reshape(onehot, (-1,))
  return onehot

def prep_data(ds_train, ds_test):
  ds_train = ds_train.map(lambda data, targets: (onehotify(data), tf.one_hot(targets, 10)), num_parallel_calls=tf.data.AUTOTUNE)
  ds_test = ds_train.map(lambda data, targets: (onehotify(data), tf.one_hot(targets, 10)))
  ds_train = ds_train.batch(8, num_parallel_calls=tf.data.AUTOTUNE).prefetch(100)
  ds_test = ds_test.batch(8).prefetch(10)

In [3]:
# Model creation

# Layer
class SimpleDense(tf.keras.layers.Layer):
  def __init__(self, units=256, activation=tf.nn.sigmoid):
    super(SimpleDense, self).__init__()
    self.units = units
    self.activation = activation
    
  def build(self, input_shape): 
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)
  
  def call(self, inputs):
    x = tf.matmul(inputs, self.w) + self.b
    x = self.activation(x)
    return x

# Model
class MyModel(tf.keras.Model):
  def __init__(self):
    super(MyModel, self).__init__()
    self.hidden1 = SimpleDense()
    self.hidden2 = SimpleDense()
    self.output = SimpleDense(units=10, activation=tf.nn.softmax)

    def call(self, inputs):
        x = self.hidden1(inputs)
        x = self.hidden2(x)
        x = self.output(x)
        return x

In [4]:
# Training & Testing

def train_step(model, input, target, loss_function, optimizer):
  # loss_object and optimizer_object are instances of respective tensorflow classes
  with tf.GradientTape() as tape:
    prediction = model(input)
    loss = loss_function(target, prediction)
    gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

def test(model, test_data, loss_function):
  # test over complete test data

  test_accuracy_aggregator = np.empty(0)
  test_loss_aggregator = np.empty(0)

  for (input, target) in test_data:
    prediction = model(input)
    sample_test_loss = loss_function(target, prediction)
    sample_test_accuracy =  np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
    sample_test_accuracy = np.mean(sample_test_accuracy)
    test_loss_aggregator = np.append(test_loss_aggregator, sample_test_loss.numpy())
    test_accuracy_aggregator = np.append(test_accuracy_aggregator, np.mean(sample_test_accuracy))

  test_loss = tf.reduce_mean(test_loss_aggregator)
  test_accuracy = tf.reduce_mean(test_accuracy_aggregator)

  return test_loss, test_accuracy

In [6]:
tf.keras.backend.clear_session()

# load dataset
ds_train, ds_test = tfds.load('genomics_ood', split=['train[:100000]', 'train[100000:101000]'], as_supervised=True, shuffle_files=True)

# preprocess dataset
prep_data(ds_train, ds_test)

# define hyperparams
learning_rate = tf.constant(0.1, dtype=tf.float32)
epochs = 10
model = MyModel()
loss_func = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.optimizers.SGD(learning_rate)

# arrays for later visualization
train_losses = np.empty(0)
test_losses = np.empty(0)
test_accuracies = np.empty(0)

# initial test before training
test_loss, test_acc = test(model, ds_test, loss_func)
test_losses = np.append(test_losses, test_loss)
test_accuracies = np.append(test_accuracies, test_acc)

# model performance before training
train_loss, _ = test(model, ds_train, loss_func)
train_losses = np.append(train_losses, train_loss)

# start training
for epoch in epochs:
  print(f'Epoch: {str(epoch)} starting with accuracy {test_accuracies[-1]}')

  epoch_loss_agg = np.empty(0)
  for input, target in ds_train:
    train_loss = train_step(model, input, target, loss_func, optimizer)
    epoch_loss_agg = np.append(epoch_loss_agg, train_loss)

  train_losses = np.append(train_losses, tf.reduce_mean(epoch_loss_agg))

  test_loss, test_acc = test(model, ds_test, loss_func)
  test_losses = np.append(test_losses, test_loss)
  test_accuracies = np.append(test_accuracies, test_acc)


TypeError: ignored