In [3]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import os

### Data Preprocessing

In [6]:
# path = "/content/DreamWalker/data/processed_data/GAN"
pretrain_sequences = np.load("enc_uniprot.npz")['data']
print(pretrain_sequences)
print(pretrain_sequences.shape)
print(type(pretrain_sequences))

[[21  6  2 ...  0  0  0]
 [21  4 15 ...  0  0  0]
 [21  4 15 ...  0  0  0]
 ...
 [21 11 17 ...  0  0  0]
 [21 18  5 ...  0  0  0]
 [21 12  5 ...  0  0  0]]
(1078141, 40)
<class 'numpy.ndarray'>


In [7]:
# Config
vocab_size = 23
seq_length = 40
embed_dim = 64
num_transformer_blocks = 4
num_heads = 8
ff_dim =512
batch_size = 64
lr = 1e-4
np.random.seed(8701)
noise_level = 0.1

In [8]:
# Convert your NumPy array to a TensorFlow tensor
sequences_tensor = tf.constant(pretrain_sequences, dtype=tf.int32)

dataset = tf.data.Dataset.from_tensor_slices(sequences_tensor)
dataset = dataset.shuffle(buffer_size=10000).batch(batch_size)

In [9]:

# Add noise to the sequence
# I still think this is a necessary step to add to the pre_training process

# def add_noise(sequences, noise_level):
#     num_sequences, sequence_length = sequences.shape
#     num_amino_acids = 21  # Including a 'noise' token
#     num_noisy_positions = tf.cast(sequence_length * noise_level, tf.int32)
#     positions = tf.random.uniform((num_sequences, num_noisy_positions),
#                                   minval=0, maxval=sequence_length, dtype=tf.int32)

#     noise_values = tf.random.uniform((num_sequences, num_noisy_positions),
#                                      minval=1, maxval=num_amino_acids, dtype=tf.int32)

#     mask = tf.sequence_mask(positions, sequence_length)
#     sequences = tf.where(mask, noise_values, sequences)

#     return sequences

def add_noise(sequences, noise_level):
    num_sequences, sequence_length = sequences.shape
    # Correct range for indices [0, 21], ensuring no index exceeds 21
    num_amino_acids = 22 # This means indices can go up to 21, as the upper bound is exclusive in tf.random.uniform

    # Create a noise mask with the same shape as sequences
    noise_mask = tf.random.uniform(shape=tf.shape(sequences), minval=0, maxval=1.0) < noise_level

    # Generate noise values within the correct range
    noise_values = tf.random.uniform(
        shape=tf.shape(sequences), minval=0, maxval=num_amino_acids, dtype=tf.int32)

    # Apply noise where mask is True, else keep original sequence
    noised_sequences = tf.where(noise_mask, noise_values, sequences)

    return noised_sequences



## Diffusion Model

### Pretrain

In [10]:
class TransformerEncoderBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerEncoderBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


class TransformerDecoderBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerDecoderBlock, self).__init__()
        self.att1 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.att2 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)  # For skip connections
        self.ffn = tf.keras.Sequential([
            layers.Dense(ff_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.dropout3 = layers.Dropout(rate)

    def call(self, inputs, enc_output, training):
        attn1_output = self.att1(inputs, inputs)
        attn1_output = self.dropout1(attn1_output, training=training)
        out1 = self.layernorm1(inputs + attn1_output)

        # Skip Connection
        attn2_output = self.att2(out1, enc_output)
        attn2_output = self.dropout2(attn2_output, training=training)
        out2 = self.layernorm2(out1 + attn2_output)

        # Feed forward network
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        return self.layernorm3(out2 + ffn_output)

In [11]:
def build_amp_diffusion_unet(vocab_size, seq_length, embed_dim, num_transformer_blocks, num_heads, ff_dim):
    sequence_input = layers.Input(shape=(seq_length,), dtype='int32')

    x = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)(sequence_input)

    # Encoder
    skip_connections = []
    for _ in range(num_transformer_blocks // 2):
        x = TransformerEncoderBlock(embed_dim, num_heads, ff_dim)(x)
        skip_connections.append(x)

    # Decoder
    for i in range(num_transformer_blocks // 2):
        enc_output = skip_connections[-(i + 1)]
        x = TransformerDecoderBlock(embed_dim, num_heads, ff_dim)(x, enc_output)

    x = layers.Dense(vocab_size)(x)  # Ensure this is logits for numerical stability

    return models.Model(inputs=sequence_input, outputs=x)

In [12]:
# # diffusion model
# class UNetDiffusionModel(models.Model):
#     def __init__(self, vocab_size, max_length, embed_dim):
#         super(UNetDiffusionModel, self).__init__()
#         self.embedding = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)

#         # Define the U-Net architecture with Transformer blocks
#         self.encoder = [TransformerBlock(embed_dim=embed_dim, num_heads=8, ff_dim=512) for _ in range(3)]
#         self.decoder = [TransformerBlock(embed_dim=embed_dim, num_heads=8, ff_dim=512) for _ in range(3)]

#         self.output_layer = layers.Dense(vocab_size, activation='softmax')

#     def call(self, inputs):
#         x = self.embedding(inputs)

#         for layer in self.encoder:
#             x = layer(x)

#         for layer in self.decoder:
#             x = layer(x)

#         return self.output_layer(x)


In [13]:
# Compile AMP_Diffusion

# def build_amp_diffusion_unet(vocab_size, seq_length, embed_dim, num_transformer_blocks, num_heads, ff_dim):
#     sequence_input = layers.Input(shape=(seq_length,), dtype='int32')
#     timestep_input = layers.Input(shape=(1,))

#     x = layers.Embedding(input_dim=23, output_dim=embed_dim)(sequence_input)

#     # Encoder
#     skip_connections = []
#     for _ in range(num_transformer_blocks // 2):  # an equal split assumption
#         x = TransformerEncoderBlock(embed_dim, num_heads, ff_dim)(x)
#         skip_connections.append(x)

#     # Decoder
#     for i in range(num_transformer_blocks // 2):
#         enc_output = skip_connections[-(i+1)]
#         x = TransformerDecoderBlock(embed_dim, num_heads, ff_dim)(x, enc_output)

#     x = layers.Dense(vocab_size, activation='softmax')(x)

#     return models.Model(inputs=[sequence_input, timestep_input], outputs=x)
# Adjusted Output Layer of the Model


In [14]:
model = build_amp_diffusion_unet(vocab_size, seq_length, embed_dim,num_transformer_blocks, num_heads, ff_dim)

In [15]:
optimizer = tf.keras.optimizers.Adam(lr)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)


In [16]:
def train_step(model, sequences, optimizer, loss_fn, noise_level):
    with tf.GradientTape() as tape:
        noised_sequences = add_noise(sequences, noise_level)
        predictions = model(noised_sequences, training=True)
        loss = loss_fn(sequences, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss


In [17]:
from tensorflow.keras.callbacks import TensorBoard
import os
import datetime


In [18]:

epochs = 5

for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    epoch_loss_avg = tf.keras.metrics.Mean()

    for batch_sequences in dataset:
        loss = train_step(model, batch_sequences, optimizer, loss_fn, noise_level)
        epoch_loss_avg.update_state(loss)

    print(f"Epoch {epoch+1}: Loss: {epoch_loss_avg.result().numpy()}")

model.save('UNet_Diffusion.h5')

Epoch 1/5




Epoch 1: Loss: 0.5315845608711243
Epoch 2/5
Epoch 2: Loss: 0.5270335078239441
Epoch 3/5
Epoch 3: Loss: 0.5263277292251587
Epoch 4/5
Epoch 4: Loss: 0.5258371233940125
Epoch 5/5


  saving_api.save_model(


Epoch 5: Loss: 0.5253657698631287


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [1]:
model.save('UNet_Diffusion.h5')


NameError: name 'model' is not defined