In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Embedding, LayerNormalization, Dropout
from tensorflow.keras.models import Model
import numpy as np

# Scaled Dot-Product Attention called in below class of Multihead Attention
def scaled_dot_product_attention(query, key, value, mask):
    matmul_qk = tf.matmul(query, key, transpose_b=True)
    scale = tf.math.sqrt(tf.cast(tf.shape(key)[-1], tf.float32))
    logits = matmul_qk / scale

    if mask is not None:
        logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(logits, axis=-1)
    output = tf.matmul(attention_weights, value)
    return output, attention_weights


class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.depth = d_model // num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, query, key, value, mask):
        batch_size = tf.shape(query)[0]

        query = self.wq(query)
        key = self.wk(key)
        value = self.wv(value)

        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        output, attention_weights = scaled_dot_product_attention(query, key, value, mask)
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))

        return self.dense(output), attention_weights

def scaled_dot_product_attention(query, key, value, mask):
    matmul_qk = tf.matmul(query, key, transpose_b=True)

    dk = tf.cast(tf.shape(key)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, value)

    return output, attention_weights

def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])


class DecoderBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1, name=None, **kwargs):
        super(DecoderBlock, self).__init__(name=name, **kwargs)
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    @tf.function
    def call(self, x, enc_output, look_ahead_mask, padding_mask, training):
        attn1, _ = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(x + attn1)

        # attn2, _ = self.mha2(out1, enc_output, enc_output, padding_mask)
        # attn2 = self.dropout2(attn2, training=training)
        # out2 = self.layernorm2(out1 + attn2)

        ffn_output = self.ffn(out1)
        # ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        return self.layernorm3(out1 + ffn_output)

    def get_config(self):
        config = super(DecoderBlock, self).get_config()
        config.update({
            'd_model': self.mha1.d_model,
            'num_heads': self.mha1.num_heads,
            'dff': self.ffn.layers[0].units,
            'rate': self.dropout1.rate,
        })
        return config

def create_look_ahead_mask(size):
    mask = tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    mask = tf.cast(mask, tf.float32)
    return mask

class GPT(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, vocab_size, maximum_position_encoding, rate=0.1, name=None, **kwargs):
        super(GPT, self).__init__(name=name, **kwargs)
        self.num_layers = num_layers
        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.vocab_size = vocab_size
        self.maximum_position_encoding = maximum_position_encoding
        self.rate = rate

        self.embedding = Embedding(vocab_size, d_model)
        self.position_encoding = self.positional_encoding(maximum_position_encoding, d_model)
        self.decoder_blocks = [DecoderBlock(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)
        # self.final_layer = Dense(vocab_size)
        self.final_layer = Dense(vocab_size)

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        return tf.cast(angle_rads[np.newaxis, ...], tf.float32)

    def get_angles(self, position, i, d_model):
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
        return position * angle_rates

    def call(self, x, training):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x += self.position_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)

        look_ahead_mask = create_look_ahead_mask(seq_len)
        for block in self.decoder_blocks:
            x = block(x, x, look_ahead_mask, None, training=training)

        return self.final_layer(x)

    def get_config(self):
        config = super(GPT, self).get_config()
        config.update({
            'num_layers': self.num_layers,
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'vocab_size': self.vocab_size,
            'maximum_position_encoding': self.maximum_position_encoding,
            'rate': self.rate
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)



In [None]:
import pandas as pd
import tensorflow as tf
tf.config.run_functions_eagerly(True)
df = pd.read_csv('Data/ai_data.csv', index_col = [0])
df = df[['seq', 'SMILE', 'InChI']]
df.dropna(inplace = True)
len(df)

tmp = pd.DataFrame(df[['seq', 'SMILE']].agg(' '.join, axis=1).str.len()).rename(columns = {0:'length'})
df = df[tmp.length < 4097].copy()
#####

seq_map = pd.read_csv('Data/seq_map.csv', index_col = [0])
smile_map = pd.read_csv('Data/smile_map.csv', index_col = [0])
inchi_map = pd.read_csv('Data/inchi_map.csv', index_col = [0])


In [None]:
df.iloc[0].SMILE

In [None]:
# Example Usage
# Example Usage
vocabulary = list(set(['[START]', '[END]', '[SEP]'] + list(set ( list(seq_map.seq) + list(smile_map.smile)))))

vocab_size = len(vocabulary)
d_model = 512
num_heads = 8
dff = 61
num_layers = 1
# maximum_position_encoding = 12000
maximum_position_encoding = 4097

dropout_rate = 0.1

model = GPT(num_layers, d_model, num_heads, dff, vocab_size, maximum_position_encoding, dropout_rate)

# Dummy data for testing
input_sequence = tf.constant([[1, 2, 3, 9], [5, 6, 7, 8]])

# Forward pass
output = model(input_sequence, training=False)

In [None]:
model.summary()

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np
# max_length = 4096
class CustomTokenizer:
    def __init__(self, vocabulary):
        # Create vocabulary mappings
        self.vocab = {word: idx for idx, word in enumerate(vocabulary)}
        self.idx_to_word = {idx: word for word, idx in self.vocab.items()}

        # Handle special tokens
        self.start_token_id = self.vocab.get('[START]')
        self.end_token_id = self.vocab.get('[END]')
        self.sep_token_id = self.vocab.get('[SEP]')
        self.vocab_size = len(vocabulary)

    def encode(self, input_text):
        # Tokenize input and output text, adding separator
        input_tokens = input_text.split()


        input_ids = [self.vocab.get(token, self.vocab.get('[END]')) for token in input_tokens]
        # Combine with start and end tokens
        return [self.start_token_id] + input_ids

    def encode_test(self, input_text):
        # Tokenize input and output text, adding separator
        input_tokens = input_text.split('[SEP]')
        input_tokens = list(input_tokens[0]) + ['[SEP]'] + list(input_tokens[1])
        input_ids = [self.vocab.get(token, self.vocab.get('[END]')) for token in input_tokens]
        # Combine with start and end tokens
        return [self.start_token_id] + input_ids
    def decode(self, token_ids):
        # Convert token IDs back to text, excluding special tokens
        tokens = [self.idx_to_word.get(token_id) for token_id in token_ids if token_id not in [self.start_token_id, self.end_token_id, self.sep_token_id]]
        return ' '.join(tokens)
    def vocab_output_vector(self, token):
        # Initialize vector of zeros with size equal to vocab size
        output_vector = np.zeros(self.vocab_size)
        # Get the index of the token and set the corresponding index to 1
        token_id = self.vocab.get(token, self.vocab.get('[END]'))  # Default to [END] if token not found
        output_vector[token_id] = 1
        return output_vector


vocabulary = list(set(['[START]', '[END]', '[SEP]'] + list(set ( list(seq_map.seq) + list(smile_map.smile)))))
tokenizer = CustomTokenizer(vocabulary)

In [None]:
def out(inp):
  out = []
  if '[END]' in inp:
    inp = inp.split('[END]')[0]
    for j in list (inp):
      out.append(tokenizer.vocab_output_vector(j))
    out.append(tokenizer.vocab_output_vector('[END]'))
    out = np.array(out)
    return np.expand_dims(out, axis = 0)
  if '[SEP]' in inp:
    li = list(inp.split('[SEP]')[0]) + ['[SEP]'] + list(inp.split('[SEP]')[1])
  else:
    li = list(inp)
  for j in li:
    out.append(tokenizer.vocab_output_vector(j))
  out = np.array(out)
  return np.expand_dims(out, axis = 0)

def out_sparse(inp):
  out = []

  if '[END]' in inp:
      inp = inp.split('[END]')[0]  # Get input before [END]

      # Append token IDs instead of one-hot vectors
      for j in list(inp):
          out.append(tokenizer.vocab.get(j, tokenizer.end_token_id))  # Use vocab mapping

      out.append(tokenizer.end_token_id)  # Append [END] token ID
      return np.expand_dims(np.array(out), axis=0)  # Shape: (1, seq_len)

  if '[SEP]' in inp:
      # Handle [SEP] token correctly
      li = list(inp.split('[SEP]')[0]) + [tokenizer.sep_token_id] + list(inp.split('[SEP]')[1])
  else:
      li = list(inp)

  # Append token IDs instead of one-hot vectors
  for j in li:
      out.append(tokenizer.vocab.get(j, tokenizer.end_token_id))  # Use vocab mapping

  return np.expand_dims(np.array(out), axis=0)  # Shape: (1, seq_len

def pad_outputs(outputs):
    # Find the maximum length of the second dimension from the outputs
    max_middle_dim = max(arr.shape[1] for arr in outputs)

    padded_arr_list = []
    for arr in outputs:
        # Calculate the amount of padding needed for the second dimension
        padding_size = max_middle_dim - arr.shape[1]

        # Pad the second dimension with zeros
        padded_arr = np.pad(arr, ((0, 0), (0, padding_size)), mode='constant')
        padded_arr_list.append(padded_arr)

    # Concatenate all the padded arrays along the first dimension (batch dimension)
    output_data = np.concatenate(padded_arr_list, axis=0)
    return output_data

loss_fun = 2

batch = 221170*2
input_data = []
output_data = []
frac = len(df) / batch
for i in range(0,int(batch)):
  print(len(df) / batch)
  print(i*frac, (i + 1)* frac)
  tmp = df[int(i*frac) : int((i + 1)* frac )].copy()
  for i, row in tmp.iterrows():
    inputs = []
    outputs = []
    inp = row.seq + '[SEP]'
    inputs.append(inp)

    inp = inp + row.SMILE[0]
    if loss_fun == 1:
      # Categorical cross entropy
      outputs.append( out(inp[1:]) )

      for j in range(1,len(row.SMILE)):
        inputs.append(inp)
        inp = inp + str(row.SMILE[j])
        outputs.append( out(inp[1:]) )
      inputs.append(inp)
      outputs.append(out(inp[1:] + '[END]'))
      model.compile(
          # optimizer=tf.keras.optimizers.Adam(),
          optimizer='adam',
          loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
          metrics=['accuracy']
      )
      padded_arr_list = []
      max_middle_dim = max(arr.shape[1] for arr in outputs)
      for arr in outputs:
          padding_size = max_middle_dim - arr.shape[1]
          # Pad the second dimension with zeros
          padded_arr = np.pad(arr, ((0, 0), (0, padding_size), (0, 0)), mode='constant')
          padded_arr_list.append(padded_arr)
      output_data = np.concatenate(padded_arr_list, axis=0)
    if loss_fun == 2:
    # sparse categorical cross entropy
      outputs.append( out_sparse(inp[1:]) )

      for j in range(1,len(row.SMILE)):
        inputs.append(inp)
        inp = inp + str(row.SMILE[j])
        outputs.append( out_sparse(inp[1:]) )
      inputs.append(inp)
      outputs.append(out_sparse(inp[1:] + '[END]'))
      output_data = pad_outputs(outputs)

      model.compile(
          optimizer='adam',  # Choose an appropriate optimizer
          loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),  # Use from_logits=True if outputs are logits
          metrics=['accuracy']  # Optional: you can add other metrics as needed
      )


    max_middle_dim = max(arr.shape[1] for arr in outputs)

    # loss function :categorical cross entropy

    # padded_arr_list = []
    # for arr in outputs:
    #     padding_size = max_middle_dim - arr.shape[1]
    #     # Pad the second dimension with zeros
    #     padded_arr = np.pad(arr, ((0, 0), (0, padding_size), (0, 0)), mode='constant')
    #     padded_arr_list.append(padded_arr)
    # output_data = np.concatenate(padded_arr_list, axis=0)

    # #sparse categorical cross entropy
    # output_data = pad_outputs(outputs)

    for j in range(len(inputs)):
      inputs[j] = tokenizer.encode_test(inputs[j])

    max_length = max_middle_dim
    padded_array = np.array([np.pad(sublist, (0, max_length - len(sublist)), mode='constant', constant_values=0)
                            for sublist in inputs])
    input_data = padded_array
  break

In [None]:
input_data.shape, output_data.shape

In [None]:
history = model.fit(
    input_data,     # Padded input sequences
    output_data,    # One-hot encoded target tokens
    batch_size=1,  # Adjust based on your memory and dataset size
    epochs=10
)

In [None]:
max_length = 150
def infer (prompt):
  for i in range(max_length):
    seq = tokenizer.encode_test(prompt)
    seq_inp = np.expand_dims(np.array(seq), axis = 0)
    pred = model(seq_inp, training = False)
    last_token_logits = pred[:, -1, :]
    predicted_id = tf.argmax(last_token_logits, axis=-1).numpy()
    pred_char = tokenizer.decode([predicted_id[0]])
    if pred_char == '[END]':
      prompt += '.'
      return
    prompt += pred_char
  return prompt

result = infer (tmp.seq.iloc[0] + '[SEP]')
result