In [None]:

import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, optimizers, metrics
from tensorflow.keras.layers import Input, Conv1D, GlobalAveragePooling1D, Dense, Dropout, BatchNormalization, Concatenate, Lambda
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import sklearn.metrics


In [None]:

data = pd.read_csv('../Data/SMILES_RfD.csv')
data_clean = data.dropna(subset=['SMILES'])
smiles = data_clean['SMILES']
labels = data_clean['target'].astype(float).values


In [None]:

smiles_dict = {
    "#": 29, "%": 30, ")": 31, "(": 1, "+": 32, "-": 33, "/": 34, ".": 2,
    "1": 35, "0": 3, "3": 36, "2": 4, "5": 37, "4": 5, "7": 38, "6": 6,
    "9": 39, "8": 7, "=": 40, "A": 41, "@": 8, "C": 42, "B": 9, "E": 43,
    "D": 10, "G": 44, "F": 11, "I": 45, "H": 12, "K": 46, "M": 47, "L": 13,
    "O": 48, "N": 14, "P": 15, "S": 49, "R": 16, "U": 50,
}
MAX_SMI_LEN = 100
NUM_CHARS = max(smiles_dict.values()) + 1
XD = np.zeros((len(smiles), MAX_SMI_LEN, NUM_CHARS))
for i, smi in enumerate(smiles):
    for j, ch in enumerate(smi[:MAX_SMI_LEN]):
        XD[i, j, smiles_dict.get(ch, 0)] = 1


In [None]:

XDinput = Input(shape=(MAX_SMI_LEN, NUM_CHARS), name='XDinput')
encode_smiles = Conv1D(filters=64, kernel_size=2, activation='relu')(XDinput)
encode_smiles = Conv1D(filters=64, kernel_size=4, activation='relu')(encode_smiles)
encode_smiles = Conv1D(filters=128, kernel_size=4, activation='relu')(encode_smiles)
model_feature = keras.Model(inputs=XDinput, outputs=encode_smiles, name='model_feature')


In [None]:

class TopkRouting(layers.Layer):
    def __init__(self, qk_dim, topk=16, qk_scale=None):
        super().__init__()
        self.topk = topk
        self.qk_dim = qk_dim
        self.scale = qk_scale if qk_scale is not None else qk_dim**-0.5
        self.to_q = layers.Dense(qk_dim)
        self.to_k = layers.Dense(qk_dim)
        self.to_v = layers.Dense(qk_dim)
    def call(self, x):
        q = self.to_q(x)
        k = self.to_k(x)
        v = self.to_v(x)
        attn = tf.matmul(q, k, transpose_b=True) * self.scale
        topk = tf.nn.top_k(attn, k=self.topk).indices
        gather = tf.gather(v, topk, batch_dims=1)
        return tf.reduce_mean(gather, axis=2)

class Block(layers.Layer):
    def __init__(self, dim, num_heads=8, topk=16, mlp_ratio=3):
        super().__init__()
        self.attn = TopkRouting(dim, topk)
        self.norm1 = layers.LayerNormalization()
        self.mlp = keras.Sequential([
            layers.Dense(dim*mlp_ratio, activation='relu'),
            layers.Dense(dim)
        ])
        self.norm2 = layers.LayerNormalization()
    def call(self, x):
        h = self.attn(x)
        x = self.norm1(x + h)
        h2 = self.mlp(x)
        return self.norm2(x + h2)


In [None]:

dim = 128
transformer_block1 = Block(dim)
transformer_block2 = Block(dim)

input_inter = Input(shape=(MAX_SMI_LEN, NUM_CHARS))
features = model_feature(input_inter)
trans1 = transformer_block1(features)
trans1 = Lambda(lambda x: tf.norm(x, axis=-1, keepdims=True))(trans1)
trans2 = transformer_block2(features)
trans2 = Lambda(lambda x: tf.norm(x, axis=-1, keepdims=True))(trans2)
combined = Concatenate(axis=-1)([trans1, trans2])
pooled = GlobalAveragePooling1D()(combined)
difference = Lambda(lambda x: tf.expand_dims(x[:,0]-x[:,1], axis=-1))(pooled)
interactionModel = keras.Model(inputs=input_inter, outputs=difference, name='interactionModel')
interactionModel.summary()


In [None]:

XD_np = np.array(XD)
labels_np = labels
index = np.where(~np.isnan(labels_np))
XD_np = XD_np[index]
labels_np = labels_np[index]
X_train, X_test, y_train, y_test = train_test_split(XD_np, labels_np, test_size=0.2, random_state=9)
scaler_y = StandardScaler()
y_train = scaler_y.fit_transform(y_train.reshape(-1,1))
y_test = scaler_y.transform(y_test.reshape(-1,1))


In [None]:

METRICS_REGRESSION = [metrics.MeanAbsoluteError(name='mae'), metrics.MeanSquaredError(name='mse')]
class R2Callback(keras.callbacks.Callback):
    def __init__(self, data):
        super().__init__()
        self.x, self.y = data
    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.x, verbose=0)
        r2 = sklearn.metrics.r2_score(self.y, y_pred)
        logs = logs or {}
        logs['r2_score'] = r2
        print(f" - r2_score: {r2:.4f}")

opt = optimizers.Adam(learning_rate=1e-4)
interactionModel.compile(optimizer=opt, loss='mse', metrics=METRICS_REGRESSION)
cb = [keras.callbacks.EarlyStopping(monitor='loss', patience=30, restore_best_weights=True), R2Callback((X_train, y_train))]
interactionModel.fit(X_train, y_train, batch_size=256, epochs=1000, callbacks=cb, verbose=1)


In [None]:

new_transformer_block = Block(dim)
new_transformer_block2 = Block(dim)
new_transformer_block.set_weights(transformer_block1.get_weights())
new_transformer_block2.set_weights(transformer_block2.get_weights())
model_feature.trainable = False
new_transformer_block.trainable = False
new_transformer_block2.trainable = False
input_final = Input(shape=(MAX_SMI_LEN, NUM_CHARS))
feat_out = model_feature(input_final)
tr1 = new_transformer_block(feat_out)
tr1 = Lambda(lambda x: tf.norm(x, axis=-1, keepdims=True))(tr1)
tr2 = new_transformer_block2(feat_out)
tr2 = Lambda(lambda x: tf.norm(x, axis=-1, keepdims=True))(tr2)
concat = Concatenate(axis=-1)([tr1, tr2])
pooled_f = GlobalAveragePooling1D()(concat)
fc1 = Dense(512, activation='relu')(pooled_f)
fc1 = BatchNormalization()(fc1)
fc1 = Dropout(0.1)(fc1)
fc2 = Dense(256, activation='relu')(fc1)
fc2 = BatchNormalization()(fc2)
fc2 = Dropout(0.1)(fc2)
fc3 = Dense(64, activation='relu')(fc2)
output = Dense(1)(fc3)
model_final = keras.Model(inputs=input_final, outputs=output, name='regression_model')
model_final.summary()


In [None]:

opt_final = optimizers.Adam(learning_rate=1e-4)
model_final.compile(optimizer=opt_final, loss='mse', metrics=METRICS_REGRESSION)
cb_final = [keras.callbacks.EarlyStopping(monitor='loss', patience=30, restore_best_weights=True), R2Callback((X_train, y_train))]
model_final.fit(X_train, y_train, batch_size=256, epochs=1000, callbacks=cb_final, verbose=1)


In [None]:

def evaluate(model, X, y_scaled, y_orig):
    y_pred_scaled = model.predict(X)
    y_pred = scaler_y.inverse_transform(y_pred_scaled)
    mae = sklearn.metrics.mean_absolute_error(y_orig, y_pred)
    mse = sklearn.metrics.mean_squared_error(y_orig, y_pred)
    r2 = sklearn.metrics.r2_score(y_orig, y_pred)
    return mae, mse, r2

train_metrics = evaluate(model_final, X_train, y_train, scaler_y.inverse_transform(y_train))
test_metrics = evaluate(model_final, X_test, y_test, scaler_y.inverse_transform(y_test))
print("Train MAE, MSE, R2:", train_metrics)
print("Test MAE, MSE, R2:", test_metrics)
