In [None]:
import numpy as np
import tensorflow as tf
import keras
from dataclasses import dataclass
from pymatgen.core.periodic_table import Element

# SMILES, corresponding weight and Temp info
smiles    = ['!','\_...','\_...',...] \
weight    = [1,...] \
temp_info = [injection Temp, increasing speed, time]

In [None]:
def split_smiles_detailed(smiles):
    result = []
    len_smiles = len(smiles)
    n = 0
    while n < len_smiles:
        if smiles[n].isalpha():
            if n+1 < len_smiles:
                if smiles[n].isupper() and smiles[n+1].islower():
                    try:
                        atom = Element(smiles[n:n+2])
                        result.append(smiles[n:n+2])
                        n += 2
                    except:
                        result.append(smiles[n])
                        result.append(smiles[n+1])
                        n += 2
                else:
                    result.append(smiles[n])
                    n += 1
            else:
                result.append(smiles[n])
                n += 1
        else:
            result.append(smiles[n])
            n += 1
    return result

In [None]:
smiles = []
weight_smiles = []
temp_info = []
idx_train = []
idx_test = []
idx_tot = []
with open('../data/input_SMILES.txt','r') as f:
    i = 0
    while True:
        tmp = f.readline()
        if len(tmp) == 0:
            break
        tmp = tmp.split()
        temp_info.append([float(x) for x in tmp[-5:-1]])
        smiles.append(['!']+list(map(lambda x: '_' + x ,tmp[:-5][::2])))
        weight_smiles.append([1.0]+[float(x) for x in tmp[:-5][1::2]])
        if tmp[-1] == 'T':
            idx_train.append(i)
            idx_tot.append(i)
        elif tmp[-1] == 'F':
            idx_test.append(i)
            idx_tot.append(i)
        i += 1

In [None]:
smiles_vocab = set()
MAX_LENGTH_SMILES = 1
for i in range(len(smiles)):
    for j in range(len(smiles[i])):
        if MAX_LENGTH_SMILES < len(smiles[i][j]):
            MAX_LENGTH_SMILES = len(smiles[i][j])
        for char in split_smiles_detailed(smiles[i][j]):
            smiles_vocab.add(char)
smiles_vocab = sorted(list(smiles_vocab))
smiles_to_index = dict([(char), i+1] for i, char in enumerate(smiles_vocab))
SMILES_VOCAB_SIZE = len(smiles_to_index)+1
print(smiles_to_index)
print(SMILES_VOCAB_SIZE)

In [None]:
@dataclass
class Config:
    MAX_LENGTH_SMILES = MAX_LENGTH_SMILES
    EMBED_DIM = 256
    SMILES_ATT_NUM_HEADS = 4
    SMILES_ATT_DFF = EMBED_DIM*4
    SMILES_VOCAB_SIZE = SMILES_VOCAB_SIZE
    CHEMICAL_ATT_NUM_HEADS = 4
    CHEMICAL_ATT_DFF = EMBED_DIM*4
    CHEMICAL_TRANSFORMER_NUM_LAYERS = 3
    PROCESS_ATT_NUM_HEADS = 1
    PROCESS_ATT_DFF = EMBED_DIM*4
    PROCESS_TRANSFORMER_NUM_LAYERS = 3
    LR = 1e-4
    BATCH_SIZE = 512
config = Config()

## Data information

In [None]:
len_max = np.max(list(map(lambda x: len(x), smiles)))
n_tot_data = len(idx_tot)
print(f'Total data               : {n_tot_data}')
print(f'Maximum length           : {len_max}')
print(f'Maximum length of smiles : {config.MAX_LENGTH_SMILES}')
print(f'Vocab size               : {config.SMILES_VOCAB_SIZE}')

np.random.shuffle(idx_test)
np.random.shuffle(idx_train)

print(f'Size of dataset : {n_tot_data}')
print(f'Train size      : {len(idx_train)}')
print(f'Test_size       : {len(idx_test)}')

## Convert string to numpy array

In [None]:
encoder_array = np.zeros([len(smiles),len_max,config.MAX_LENGTH_SMILES]) # if there is no data, the array will be zero.
weight_array  = np.zeros([len(weight_smiles),len_max])
temp_info     = np.array(temp_info)[:,[0,-1]]
for i in range(len(smiles)):
    for j in range(len(smiles[i])):
        weight_array[i,j] = weight_smiles[i][j]
        for k, char in enumerate(split_smiles_detailed(smiles[i][j])):
            encoder_array[i,j,k] = smiles_to_index[char]
dim_X = encoder_array.shape[1:]
print(f'dim_X : {dim_X}')

## Decoder

In [None]:
inv_smiles_to_index = {v: k for k, v in smiles_to_index.items()}

def decoding_smiles(encoder_array,inv_smiles_to_index):
    tot_smiles = []
    for i in range(encoder_array.shape[0]):
        if encoder_array[i,0] == 0:
            break
        else:
            smiles = ''
        for j in range(encoder_array.shape[1]-1):
            if encoder_array[i,j+1] == 0:
                break
            else:
                smiles += inv_smiles_to_index[int(encoder_array[i,j+1])]
        tot_smiles.append(smiles)
    return tot_smiles

## Load Y (Absorbance)

In [None]:
target = np.load('../data/Absorbances.npy')[:,:,1]
dim_target = target.shape[1:]
print(dim_target)

---

# Test, Training data

In [None]:
X_train        = tf.cast(encoder_array[idx_train],tf.float32)
X_weight_train = tf.cast(weight_array[idx_train],tf.float32)
X_temp_train   = tf.cast(temp_info[idx_train],tf.float32)
y_train        = target[idx_train]

X_test        = tf.cast(encoder_array[idx_test],tf.float32)
X_weight_test = tf.cast(weight_array[idx_test],tf.float32)
X_temp_test   = tf.cast(temp_info[idx_test],tf.float32)
y_test        = target[idx_test]

X_tot        = tf.cast(encoder_array[idx_tot],tf.float32)
X_weight_tot = tf.cast(weight_array[idx_tot],tf.float32)
X_temp_tot   = tf.cast(temp_info[idx_tot],tf.float32)
y_tot        = target[idx_tot]

print('Training data information')
print(f'X_train        = {X_train.shape}')
print(f'X_weight_train = {X_weight_train.shape}')
print(f'X_temp_train   = {X_temp_train.shape}')
print(f'y_train        = {y_train.shape}')

print('Test data information')
print(f'X_test        = {X_test.shape}')
print(f'X_weight_test = {X_weight_test.shape}')
print(f'X_temp_test   = {X_temp_test.shape}')
print(f'y_test        = {y_test.shape}')

# Transformer based UV-vis prediction model

### Configurations

In [None]:
class CustomLearningRateSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, warmup_steps, initial_lr, peak_lr, tot_steps,n_epoch,**kwargs):
        super(CustomLearningRateSchedule, self).__init__(**kwargs)
        self.warmup_steps = warmup_steps
        self.initial_lr   = initial_lr
        self.peak_lr      = peak_lr
        self.tot_steps   = tot_steps
        self.n_epoch    = n_epoch
    def __call__(self, step):
        # Linear warmup
        warmup_lr = self.initial_lr + (self.peak_lr - self.initial_lr) * (step / self.warmup_steps)
        # decay_lr  = self.peak_lr *(1 - (step-self.warmup_steps)/(self.tot_steps*self.n_epoch-self.warmup_steps))
        learning_rate = tf.cond(
            step < self.warmup_steps,
            lambda: tf.cast(warmup_lr, dtype=tf.float32),
            lambda: tf.cast(self.peak_lr,dtype=tf.float32)
        )
        return learning_rate
    def get_config(self):
        return {
                "warmup_steps" : self.warmup_steps,
                "initial_lr" : self.initial_lr,
                "peak_lr" : self.peak_lr,
                "tot_steps" : self.tot_steps,
                "n_epoch" : self.n_epoch,
        }
    
    @classmethod
    def from_config(cls, config):
        # Recreate the schedule from its config
        return cls(**config)

class RoPE(keras.layers.Layer):
    def __init__(self, dim, **kwargs):
        super(RoPE, self).__init__(**kwargs)
        self.dim = dim
        self.inv_freq = 1. / (10000 ** (tf.range(0, dim, 2.0) / dim))

    def call(self, inputs_even,inputs_odd, positions=None):
        # Determine the sequence length
        seq_len = tf.shape(inputs_even)[1]
        # Compute positions if not provided
        if positions is None:
            positions = tf.range(seq_len, dtype=tf.float32)
        # 
        # Compute sinusoidal embeddings
        freqs = tf.einsum('i,j->ij', positions, self.inv_freq) # Shape : (seq_len, dim/2)
        cos = tf.cos(freqs) # Shape : (seq_len, dim/2)
        sin = tf.sin(freqs) # Shape : (seq_len, dim/2)
        # 
        # Split input into even and odd dimensions for rotation
        # 
        # Apply RoPE rotaiton
        x_rotated_even = inputs_even * cos - inputs_odd * sin
        X_rotated_odd  = inputs_even * sin + inputs_odd * cos
        # 
        # Interleave the even and odd dimensions back together
        x_rotated = tf.reshape(
            tf.concat([x_rotated_even, X_rotated_odd], axis=-1), (-1, seq_len, self.dim)
        )
        return x_rotated
    
    def get_config(self):
        return {
            "dim": self.dim,
        }
    
    @classmethod
    def from_config(cls, config):
        # Recreate the schedule from its config
        return cls(**config)

class MaskProcessor(keras.layers.Layer):
    def __init__(self,**kwargs):
        super(MaskProcessor, self).__init__(**kwargs)
    
    def call(self, padding_mask):
        padding_mask = tf.cast(padding_mask, tf.float32)
        attention_mask = padding_mask[:,:,tf.newaxis] * padding_mask[:,tf.newaxis,:]
        
        return attention_mask

def chemical_bert_module(query, key, value, attention_mask, i):
    # Multi headed self-attention
    assert config.EMBED_DIM % config.CHEMICAL_ATT_NUM_HEADS == 0
    attention_output, attention_scores = keras.layers.MultiHeadAttention(
        num_heads=config.CHEMICAL_ATT_NUM_HEADS,
        key_dim=config.EMBED_DIM // config.CHEMICAL_ATT_NUM_HEADS,
        name=f"ChemENC_{i}_multiheadattention",
    )(query, value, key, attention_mask=attention_mask, return_attention_scores=True)
    attention_output = keras.layers.Dropout(0.1,name=f"ChemENC_{i}_att_dropout")(attention_output)
    attention_output = keras.layers.LayerNormalization(
        epsilon=1e-6, name=f"ChemENC_{i}_att_layernormalization"
    )(value + attention_output)
    
    # Feed-forward layer
    ffn = keras.Sequential(
        [
            keras.layers.Dense(config.CHEMICAL_ATT_DFF, activation='silu'),
            keras.layers.Dense(config.EMBED_DIM)    
        ],
        name=f"ChemENC_{i}_ffn"
    )
    ffn_output = ffn(attention_output)
    ffn_output = keras.layers.Dropout(0.1,name=f"ChemENC_{i}_ffn_dropout")(ffn_output)
    sequence_output = keras.layers.LayerNormalization(
        epsilon=1e-6, name=f"ChemENC_{i}_ffn_layernormalization"
    )(attention_output + ffn_output)
    return sequence_output

def process_bert_module(query, key, value, attention_mask, i):
    # Multi headed self-attention
    assert (config.EMBED_DIM) % config.PROCESS_ATT_NUM_HEADS == 0
    attention_output, attention_scores = keras.layers.MultiHeadAttention(
        num_heads=config.PROCESS_ATT_NUM_HEADS,
        key_dim=(config.EMBED_DIM) // config.PROCESS_ATT_NUM_HEADS,
        name=f"ProcENC_{i}_multiheadattention",
    )(query, value, key, attention_mask=attention_mask, return_attention_scores=True)
    attention_output = keras.layers.Dropout(0.1,name=f"ProcENC_{i}_att_dropout")(attention_output)
    attention_output = keras.layers.LayerNormalization(
        epsilon=1e-6, name=f"ProcENC_{i}_att_layernormalization"
    )(value + attention_output)
    # Feed-forward layer
    ffn = keras.Sequential(
        [
            keras.layers.Dense(config.PROCESS_ATT_DFF, activation='silu'),
            keras.layers.Dense(config.EMBED_DIM)    
        ],
        name=f"ProcENC_{i}_ffn"
    )
    ffn_output = ffn(attention_output)
    ffn_output = keras.layers.Dropout(0.1,name=f"ProcENC_{i}_ffn_dropout")(ffn_output)
    sequence_output = keras.layers.LayerNormalization(
        epsilon=1e-6, name=f"ProcENC_{i}_ffn_layernormalization"
    )(attention_output + ffn_output)
    return sequence_output

class Expanded_matrix(keras.layers.Layer):
    def __init__(self, embedding_dim,*args,**kwargs):
        super(Expanded_matrix,self).__init__(*args,**kwargs)
        self.embedding_dim = embedding_dim
    
    def call(self, inputs):
        expanded_m = tf.expand_dims(inputs,axis=-1)
        outputs = tf.tile(expanded_m,[1,1,self.embedding_dim])
        return outputs
    
    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embedding_dim" : self.embedding_dim
            }
        )
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

class ReshapeLayer(keras.layers.Layer):
    def __init__(self, shape,*args,**kwargs):
        super(ReshapeLayer, self).__init__(*args,**kwargs)
        self.shape = shape
    
    def call(self, inputs):
        x_reshaped = tf.reshape(inputs, self.shape)
        return x_reshaped
    
    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "shape" : self.shape
            }
        )
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

## Building a model

In [None]:
input_smiles = keras.layers.Input(shape=dim_X,name='input_smiles') # (None, length_process, lenght_smiles)
input_weight = keras.layers.Input(shape=dim_X[:-1],name='input_weight') #(None, length_process)
input_temp = keras.layers.Input(shape=(X_temp_train.shape[-1],),name='input_temp') #(None,3)

reshape_smiles = ReshapeLayer(shape=(-1,dim_X[1]))
reshape_process = ReshapeLayer(shape=(-1,dim_X[0],config.EMBED_DIM))
embedding_layer = keras.layers.Embedding(input_dim=config.SMILES_VOCAB_SIZE, output_dim=config.EMBED_DIM,name='embed_layer') # (None*length_process,length_smiles) -> (None*length_process,length_smiles,config.EMBED_DIM)
embed_weight = Expanded_matrix(config.EMBED_DIM) # (None,length_process) -> (None,length_process,config.WEIGHT_DIM)

x = input_smiles
process_padding_mask = keras.layers.Embedding(len_max,config.EMBED_DIM,mask_zero=True,trainable=False)(x[:,:,0])._keras_mask
process_attention_mask = MaskProcessor()(process_padding_mask)
x = reshape_smiles(input_smiles)
chemical_padding_mask = keras.layers.Embedding(config.SMILES_VOCAB_SIZE,config.EMBED_DIM,mask_zero=True,trainable=False)(x)._keras_mask
chemical_attention_mask = MaskProcessor()(chemical_padding_mask)

x = embedding_layer(x)
x *= tf.math.sqrt(tf.cast(config.EMBED_DIM, tf.float32))
for i in range(config.CHEMICAL_TRANSFORMER_NUM_LAYERS):
    query = RoPE(config.EMBED_DIM)(x[:,:,0::2],x[:,:,1::2])
    key   = RoPE(config.EMBED_DIM)(x[:,:,0::2],x[:,:,1::2])
    x = chemical_bert_module(query,key,x,chemical_attention_mask,i)
x = x[:,0,:] # Pick the class token of smiles input (BATCH,MAX_LENGTH_SMILES,EMBED_DIM) -> (BATCH,EMBED_DIM)


x = reshape_process(x) # (BATCH,EMBED_DIM) -> (BATCH,length_process,EMBED_DIM)
x_weight = embed_weight(input_weight)
scaling_weight = keras.layers.Dense(1,activation='softplus',name='WeightScale')(tf.ones((1,1), dtype=tf.float32)) # Trainable scaling layer for weight information
x_weight = x_weight * scaling_weight
x = x * x_weight
for i in range(config.PROCESS_TRANSFORMER_NUM_LAYERS):
    query = RoPE(config.EMBED_DIM)(x[:,:,0::2],x[:,:,1::2])
    key   = RoPE(config.EMBED_DIM)(x[:,:,0::2],x[:,:,1::2])
    x = process_bert_module(query,key,x,process_attention_mask,i)
x = x[:,0,:]

scaling_temp = keras.layers.Dense(X_temp_train.shape[-1],activation='sigmoid',name='TempScale')(tf.ones((1,1), dtype=tf.float32)) # Trainable scaling layer for temperature information
x_temp = input_temp * scaling_temp
x = keras.layers.Concatenate(axis=-1)([x,x_temp])
x = keras.layers.Dense(dim_target[0]*8, activation='silu')(x)
x = keras.layers.Dropout(0.5)(x)
x = keras.layers.Dense(dim_target[0]*4, activation='silu')(x)
x = keras.layers.Dropout(0.5)(x)
output = keras.layers.Dense(dim_target[0], activation='softplus')(x)

model = keras.Model(inputs=[input_smiles,input_weight,input_temp],outputs=[output])
model.summary() 

In [None]:
lr_schedule = CustomLearningRateSchedule(
    warmup_steps=5000,
    initial_lr=0.0,
    peak_lr=config.LR,
    tot_steps=np.ceil(len(X_train)/config.BATCH_SIZE),
    n_epoch=100000
)
# opt = tf.keras.optimizers.Adam(learning_rate=config.LR)
opt = tf.keras.optimizers.AdamW(learning_rate=lr_schedule,beta_1=0.9,beta_2=0.999,weight_decay=0.01)
# opt = tf.keras.optimizers.AdamW(learning_rate=5e-5,beta_1=0.9,beta_2=0.999,weight_decay=0.01)
model.compile(
    optimizer=opt,
    loss=tf.keras.losses.MeanSquaredError(reduction='sum_over_batch_size', name='mean_squared_error'),
    metrics=['accuracy'],
)
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='UVvis_attention_model.keras', monitor='val_loss', verbose=2, save_best_only=True, mode='min', initial_value_threshold=0.0003)

In [None]:
history = model.fit(x=[X_train,X_weight_train,X_temp_train], y=y_train, epochs=30000, batch_size=512, verbose=0, validation_data=([X_test,X_weight_test,X_temp_test],y_test), callbacks=[checkpoint])

# Load pre-trained parameters

In [None]:
pretrained_model = keras.models.load_model(
    '../pre-trained_model/UVvis_attention_model.keras',custom_objects={
        "CustomLearningRateSchedule":CustomLearningRateSchedule,
        "RoPE":RoPE,
        "ReshapeLayer":ReshapeLayer,
        "Expanded_matrix":Expanded_matrix,
        "MaskProcessor":MaskProcessor,
        }
)