In [1]:
import numpy as np
import keras
import tensorflow as tf
from tensorflow.keras import layers
from keras.layers import Dense, Dropout, Layer, LayerNormalization, MultiHeadAttention, GlobalAveragePooling1D, Input
from keras.models import Model, load_model
from sklearn.model_selection import train_test_split

# Define amino acid vocabulary
amino_acids = 'ACDEFGHIKLMNPQRSTVWY'  # 20 standard amino acids
vocab_size = len(amino_acids)
aa_to_index = {aa: i for i, aa in enumerate(amino_acids)}

# Function to one-hot encode peptide sequences
def one_hot_encode_sequences(sequences, vocab_size):
    max_length = max(len(seq) for seq in sequences)  # Get maximum length for padding
    encoded = np.zeros((len(sequences), max_length, vocab_size))
    for i, seq in enumerate(sequences):
        for j, aa in enumerate(seq):
            encoded[i, j, aa_to_index[aa]] = 1
    return encoded

# Sample peptide sequences (with variable lengths)
peptide_sequences = ['ARG', 'TC', 'G', 'CDEF', 'HIKLMN', 'PQR', 'STVWY']  # Example peptide sequences
targets = np.random.randint(0, 2, len(peptide_sequences))  # Random binary targets (0 or 1)

# One-hot encode the peptide sequences
encoded_sequences = one_hot_encode_sequences(peptide_sequences, vocab_size)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(encoded_sequences, targets, test_size=0.2, random_state=42)

# Transformer Block
class TransformerBlock(Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super(TransformerBlock, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.rate = rate

        self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [Dense(ff_dim, activation="relu"), Dense(embed_dim)]
        )
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

    def get_config(self):
        config = super(TransformerBlock, self).get_config()
        config.update({
            'embed_dim': self.embed_dim,
            'num_heads': self.num_heads,
            'ff_dim': self.ff_dim,
            'rate': self.rate,
        })
        return config

# Token and position embedding layer
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, vocab_size, embed_dim, **kwargs):
        super(TokenAndPositionEmbedding, self).__init__(**kwargs)
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)

    def call(self, x):
        # x is one-hot encoded input
        batch_size, max_length, vocab_size = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2]

        # Get token embeddings by summing over the last dimension (one-hot)
        token_indices = tf.argmax(x, axis=-1)  # Get the indices from the one-hot encoding
        token_embeddings = self.token_emb(token_indices)  # Get embeddings

        # Create positional embeddings
        positions = tf.range(start=0, limit=max_length, delta=1)
        pos_embeddings = self.token_emb(positions)  # Using the same embedding for positions

        # Add token and positional embeddings
        return token_embeddings + pos_embeddings[:max_length, :]  # Only take up to max_length positions

    def get_config(self):
        config = super(TokenAndPositionEmbedding, self).get_config()
        config.update({
            'vocab_size': self.token_emb.input_dim,
            'embed_dim': self.token_emb.output_dim,
        })
        return config

# Model Creation
def create_model(embed_dim, num_heads, ff_dim, vocab_size):
    inputs = Input(shape=(None, vocab_size))  # Variable length input
    x = TokenAndPositionEmbedding(vocab_size, embed_dim)(inputs)  # Apply token and position embedding
    transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
    x = transformer_block(x)
    x = GlobalAveragePooling1D()(x)
    x = Dropout(0.1)(x)
    x = Dense(20, activation="relu")(x)
    x = Dropout(0.1)(x)
    outputs = Dense(1, activation="sigmoid")(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Create and compile the model
model = create_model(embed_dim=64, num_heads=4, ff_dim=128, vocab_size=vocab_size)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

2024-09-25 00:51:12.466568: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, None, 20)]        0         
_________________________________________________________________
token_and_position_embedding (None, None, 64)          1280      
_________________________________________________________________
transformer_block (Transform (None, None, 64)          83200     
_________________________________________________________________
global_average_pooling1d (Gl (None, 64)                0         
_________________________________________________________________
dropout_2 (Dropout)          (None, 64)                0         
_________________________________________________________________
dense_2 (Dense)              (None, 20)                1300      
_________________________________________________________________
dropout_3 (Dropout)          (None, 20)                0     

In [3]:
# Model parameters
embed_dim = 32  # Embedding size for each token
num_heads = 2  # Number of attention heads
ff_dim = 32  # Hidden layer size in the feed-forward network

# Create and compile the model
model.compile(optimizer='Adam', loss="binary_crossentropy", metrics=["accuracy"])

# Train the model
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=5, batch_size=8)

2024-09-25 00:51:17.185266: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f8b668064c0>

In [4]:
model.save("peptide_transformer_model.h5")

In [5]:
#Load model

loaded_model = load_model("peptide_transformer_model.h5", custom_objects={
    "TransformerBlock": TransformerBlock,
    "TokenAndPositionEmbedding": TokenAndPositionEmbedding
})

# Optionally freeze Transformer Block layer for finetuning. 
#Could be applied to the name-generator model as well

for layer in loaded_model.layers:
    if isinstance(layer, TransformerBlock) or isinstance(layer, TokenAndPositionEmbedding):
        layer.trainable = False 

In [6]:
from keras.callbacks import LearningRateScheduler

# Fine-tune the model on a new dataset
new_peptide_sequences = ['AR', 'GTC', 'F', 'DE', 'HIK']  # New sequences
new_targets = np.random.randint(0, 2, len(new_peptide_sequences))  # New targets

# One-hot encode new sequences
new_encoded_sequences = one_hot_encode_sequences(new_peptide_sequences, vocab_size)

# Train/test split for new data
X_train_new, X_test_new, y_train_new, y_test_new = train_test_split(new_encoded_sequences, new_targets, test_size=0.2, random_state=42)

# Define a function that returns the desired learning rate
def scheduler(epoch, lr):
    # Set a constant low learning rate for finetuning
    return 0.0001

# Create the learning rate scheduler callback
lr_scheduler = LearningRateScheduler(scheduler)

# Compile your model with 'Adam' as optimizer
loaded_model.compile(optimizer='Adam', loss='binary_crossentropy', metrics=['accuracy'])

# Fit the model with the learning rate scheduler callback
loaded_model.fit(X_train_new, y_train_new, validation_data=(X_test_new, y_test_new), 
                 epochs=5, batch_size=8, callbacks=[lr_scheduler])

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f8b5dccb4c0>