In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

from torch.utils.data import DataLoader, Dataset,TensorDataset,ConcatDataset
from sklearn import preprocessing
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt


path = 'Project for ML/schneider50k/'
'''
# 加载数据集文件
train_dataset = torch.load(path+"train_dataset.pt")
test_dataset = torch.load(path+"test_dataset.pt")
val_dataset = torch.load(path+"val_dataset.pt")
'''
train_dataset = torch.load(path+"train_filter_dataset.pt")
test_dataset = torch.load(path+"test_filter_dataset.pt")
val_dataset = torch.load(path+"val_filter_dataset.pt")

# 提取所有的y并组成一个array类型的列表
train_features = []
train_labels = []
test_features = []
test_labels = []
val_features = []
val_labels = []
for train_sample in train_dataset:
    train_feature,train_label = train_sample
    train_features.append(train_feature)
    train_labels.append(train_label)

for test_sample in test_dataset:
    test_feature,test_label = test_sample
    test_features.append(test_feature)
    test_labels.append(test_label)

for val_sample in val_dataset:
    val_feature,val_label = val_sample
    val_features.append(val_feature)
    val_labels.append(val_label)

label_num = preprocessing.LabelEncoder()
label_num.fit(train_labels+test_labels+val_labels)

train_labels_num = label_num.transform(train_labels)
test_labels_num = label_num.transform(test_labels)
val_labels_num = label_num.transform(val_labels)



# 定义神经网络模型
class Classifer(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Classifer, self).__init__()
        self.fc1 = nn.Linear(input_dim, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, output_dim)
        self.dropout = nn.Dropout(p=0.3)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc4(x)
        x = self.softmax(x)
        return x

class ZSLCrossEntropyLoss(nn.Module):
        def __init__(self, weight_decay=0.001):
            super(ZSLCrossEntropyLoss, self).__init__()
            self.loss_fn = nn.CrossEntropyLoss()
            self.weight_decay = weight_decay

        def forward(self, logits, targets, params=None):
            loss = self.loss_fn(logits, targets)

            # 计算L2范数
            l2_reg = 0
            if params is not None:
                for param in params:
                    l2_reg += torch.norm(param, p=2)**2
            else:
                for param in self.parameters():
                    l2_reg += torch.norm(param, p=2)**2

            # 加入L2正则化项
            loss += self.weight_decay * l2_reg

            return loss
        
# 定义数据集及其相关的参数
train_dataset = TensorDataset(torch.tensor(np.array(train_features,dtype=np.float32)),torch.tensor(np.array(train_labels_num)))
test_dataset = TensorDataset(torch.tensor(np.array(test_features,dtype=np.float32)),torch.tensor(np.array(test_labels_num)))
val_dataset = TensorDataset(torch.tensor(np.array(val_features,dtype=np.float32)),torch.tensor(np.array(val_labels_num)))
input_size = train_dataset[0][0].shape[0]
hidden_size = 256
num_classes = len(label_num.classes_)
batch_size = 64
num_epochs = 1000
lr = 0.05

train_dataset = ConcatDataset([train_dataset, val_dataset])
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)
#val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Classifer(input_size, num_classes).to(device)
loss_fn = ZSLCrossEntropyLoss()
loss_fn = loss_fn.to(device)
optimizer = optim.SGD(model.parameters(), lr=lr)




# 训练模型
loss_list = []
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (features, labels) in enumerate(train_loader):
        # 将数据移动到GPU
        features = features.to(device)
        labels = labels.to(device)
        
        # 前向传播
        outputs = model(features)
        loss = loss_fn(outputs, labels.long(),params=model.parameters())
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * features.size(0)
        
        if (i+1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, i+1, len(train_dataset)//batch_size, loss.item()))
    
    epoch_loss = running_loss / len(train_dataset)
    loss_list.append(epoch_loss)

plt.plot(loss_list)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training Loss Curve')
plt.show()

torch.save(model.state_dict(),path+'model_filter.pt')
# 在测试集上评估模型
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for features, labels in test_loader:
        # 将数据移动到GPU
        features = features.to(device)
        labels = labels.to(device)

        # 前向传播
        outputs = model(features)
        _, predicted = torch.max(outputs.data, dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Accuracy of the network on the test features: {} %'.format(100 * correct / total))




In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd

from torch.utils.data import DataLoader, Dataset,TensorDataset,ConcatDataset
from sklearn import preprocessing
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt


path = '/kaggle/input/important2/Project for ML/schneider50k/'
# 加载template与数字对应文件
labels_num =pd.read_csv(path+'labels_nums_filter.csv',header=None)
labels_num = list(labels_num[0])

# 加载数据集文件
test_dataset = torch.load(path+"test_filter_dataset.pt")
test_features = []
test_labels = []
for test_sample in test_dataset:
    test_feature,test_label = test_sample
    test_features.append(test_feature)
    test_labels.append(test_label)
test_labels_num = [labels_num.index(label) for label in test_labels]
test_dataset = TensorDataset(torch.tensor(np.array(test_features,dtype=np.float32)),torch.tensor(np.array(test_labels_num)))

# 定义神经网络模型
class Classifer(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Classifer, self).__init__()
        self.fc1 = nn.Linear(input_dim, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, output_dim)
        self.dropout = nn.Dropout(p=0.3)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc4(x)
        x = self.softmax(x)
        return x
    
# 定义数据集及其相关的参数
input_size = 2048
num_classes = len(labels_num)
batch_size = 64

test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Classifer(input_size, num_classes).to(device)
model.load_state_dict(torch.load(path+'model_filter.pt'))

# 在测试集上评估模型
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for features, labels in test_loader:
        # 将数据移动到GPU
        features = features.to(device)
        labels = labels.to(device)

        # 前向传播
        outputs = model(features)
        _, predicted = torch.max(outputs.data, dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Accuracy of the network on the test features: {} %'.format(100 * correct / total))

def split_reaction(reaction):
    rct, prd = reaction.split('>>')
    prd_mols = [x for x in prd.split('.')]
    
    reactions = []
    for p in prd_mols:
        reaction = [rct].copy()
        reaction.append(p)
        reactions.append(reaction)
    
    return reactions
#随便找个进行预测

feature = test_dataset[144][0].unsqueeze(0).to(device)

output = model(feature)
_, predicted = torch.max(output.data, dim=1)
print(labels_num[int(predicted)])
print(split_reaction(labels_num[int(predicted)]))


Accuracy of the network on the test features: 49.331103678929765 %
[C:2]-[NH;D2;+0:1]-[C:3]>>C-C(-C)(-C)-O-C(=O)-[N;H0;D3;+0:1](-[C:2])-[C:3]
[['[C:2]-[NH;D2;+0:1]-[C:3]', 'C-C(-C)(-C)-O-C(=O)-[N;H0;D3;+0:1](-[C:2])-[C:3]']]


In [21]:
print(test_dataset)

<torch.utils.data.dataset.TensorDataset object at 0x7e0e54793a90>


In [16]:
import torch
from rdkit import Chem
from rdkit.Chem import AllChem

# Define the model architecture for single-step retrosynthesis prediction
class RetrosynthesisModel(torch.nn.Module):

    def __init__(self, num_atom_types, num_bond_types, hidden_size):
        super(RetrosynthesisModel, self).__init__()

        self.embedding = torch.nn.Embedding(num_atom_types + num_bond_types, hidden_size)
        self.encoder = torch.nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.decoder = torch.nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.output = torch.nn.Linear(hidden_size, num_atom_types + num_bond_types)

    def forward(self, inputs, targets=None):
        embedded = self.embedding(inputs)
        encoded_states, _ = self.encoder(embedded)
        if targets is not None:
            targets_embedded = self.embedding(targets)
            decoded_states, _ = self.decoder(targets_embedded, encoded_states[:, -1:, :])
            outputs = self.output(decoded_states.squeeze(1))
        else:
            outputs = self.output(encoded_states)
        return outputs


# Load the pre-trained model for Task 1
def load_model_task1():
    model = RetrosynthesisModel(num_atom_types=len(AllChem.GetPeriodicTable().GetElements()),
                                num_bond_types=len(Chem.rdchem.BondType.values),
                                hidden_size=128)
    model.load_state_dict(torch.load('/kaggle/input/important2/Project for ML/schneider50k/model_filter.pt', map_location=torch.device('cpu')))
    model.eval()
    return model


# Define a function to generate possible reactants for a given molecule using the trained model from Task 1
def generate_possible_reactants(model, target_molecule):
    with torch.no_grad():
        product_fp = get_fingerprint([target_molecule])[0]
        product_fp_tensor = torch.tensor(product_fp, dtype=torch.float32).unsqueeze(0)
        predicted_reactant_fps = model(product_fp_tensor).sigmoid().round().squeeze().tolist()
        possible_reactants = []
        for fp in predicted_reactant_fps:
            reactant_mol = Chem.MolFromSmiles(fingerprints_to_smiles(np.unpackbits(fp)))
            if reactant_mol is not None:
                possible_reactants.append(reactant_mol)
        return possible_reactants


# Define a function to predict the reaction template for a given pair of reactants and products
def predict_reaction_template(model, reactant_mol, product_mol):
    with torch.no_grad():
        reactant_fp = get_fingerprint([reactant_mol])[0]
        product_fp = get_fingerprint([product_mol])[0]
        reactant_fp_tensor = torch.tensor(reactant_fp, dtype=torch.float32).unsqueeze(0)
        product_fp_tensor = torch.tensor(product_fp, dtype=torch.float32).unsqueeze(0)
        predicted_template = model(reactant_fp_tensor, product_fp_tensor).argmax(-1).item()
        return predicted_template


# Define a helper function to convert Morgan fingerprints to SMILES strings
def fingerprints_to_smiles(fp):
    bv = np.zeros((1,))
    np.put(bv, np.arange(len(fp)), fp)
    mol = Chem.MolFromSmiles('')
    fp = AllChem.DataStructs.cDataStructs.ExplicitBitVect(2048, bv)
    return Chem.MolToSmiles(AllChem.ReconstructRDKitMol(fp, mol))
    

# Define a function to convert reactant molecules to Morgan fingerprints
def get_fingerprint(reactants_mol):
    fps = []
    for mol in reactants_mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2)
        arr = np.zeros((1,))
        AllChem.DataStructs.ConvertToNumpyArray(fp, arr)
        fps.append(np.packbits(arr))
    return fps


In [None]:
from rdkit import Chem
from rdkit.Chem import AllChem

# Load the models for Tasks 1 and 2
model_task1 = load_model_task1()
model_task2 = load_model_task2()

# Define the target molecule and product molecule for Task 3
target_mol = 'CC(=O)Nc1ccc(C(=O)O)c(c1)C(=O)O'
product_mol = 'CC(=O)Nc1ccc(cc1C(=O)O)C(=O)O'

# Convert the target and product molecules to RDKit molecules
target_mol = Chem.MolFromSmiles(target_mol)
product_mol = Chem.MolFromSmiles(product_mol)

# Generate possible reactants for the target molecule using the model from Task 1
possible_reactants = generate_possible_reactants(model_task1, target_mol)

# Evaluate the cost of each possible reactant using the model from Task 2
reactant_costs = []
for reactant in possible_reactants:
    fp = get_fingerprint([reactant])[0]
    cost = evaluate_cost(model_task2, fp)
    reactant_costs.append(cost)

# Choose the reactant with the lowest cost as the starting material for synthesis planning (Task 3)
starting_reactant = possible_reactants[np.argmin(reactant_costs)]

# Predict the reaction template for the synthesis pathway using the models from Tasks 1 and 3
reaction_template = predict_reaction_template(model_task1, starting_reactant, product_mol)

# Print the results
print('Possible reactants: ', [Chem.MolToSmiles(mol) for mol in possible_reactants])
print('Reactant costs: ', reactant_costs)
print('Starting material: ', Chem.MolToSmiles(starting_reactant))
print('Reaction template: ', reaction_template)


In [None]:
import numpy as np
import torch

# Define the model architecture for multi-step retrosynthesis planning
class MultiStepRetrosynthesisModel(torch.nn.Module):

    def __init__(self, num_atom_types, num_bond_types, hidden_size):
        super(MultiStepRetrosynthesisModel, self).__init__()

        self.embedding = torch.nn.Embedding(num_atom_types + num_bond_types, hidden_size)
        self.encoder = torch.nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.decoder = torch.nn.GRU(hidden_size + 1, hidden_size, batch_first=True)
        self.output = torch.nn.Linear(hidden_size, num_atom_types + num_bond_types)

    def forward(self, inputs, targets=None):
        embedded = self.embedding(inputs[:, :-1])
        encoded_states, _ = self.encoder(embedded)
        if targets is not None:
            targets_embedded = self.embedding(targets[:, :-1])
            decoder_input = torch.cat([targets[:, -1:, :], targets_embedded], dim=-1)
            decoded_states, _ = self.decoder(decoder_input, encoded_states[:, -1:, :])
            outputs = self.output(decoded_states.squeeze(1))
        else:
            outputs = self.output(encoded_states)
        return outputs


# Load the pre-trained model for Task 2
def load_model_task2():
    model = MultiStepRetrosynthesisModel(num_atom_types=117,
                                         num_bond_types=6,
                                         hidden_size=128)
    model.load_state_dict(torch.load('model_task2.pt', map_location=torch.device('cpu')))
    model.eval()
    return model


# Define a function to evaluate the cost of a given reactant using the trained model from Task 2
def evaluate_cost(model, reactant_fp):
    with torch.no_grad():
        reactant_fp_tensor = torch.tensor(reactant_fp, dtype=torch.float32).unsqueeze(0)
        predicted_cost = model(reactant_fp_tensor).squeeze().item()
        return predicted_cost


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd

from torch.utils.data import DataLoader, Dataset,TensorDataset,ConcatDataset
from sklearn import preprocessing
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt


path = '/kaggle/input/important2/Project for ML/schneider50k/'
# 加载template与数字对应文件
labels_num =pd.read_csv(path+'labels_nums_filter.csv',header=None)
labels_num = list(labels_num[0])

# 加载数据集文件
test_dataset = torch.load(path+"test_filter_dataset.pt")
test_features = []
test_labels = []
for test_sample in test_dataset:
    test_feature,test_label = test_sample
    test_features.append(test_feature)
    test_labels.append(test_label)
test_labels_num = [labels_num.index(label) for label in test_labels]
test_dataset = TensorDataset(torch.tensor(np.array(test_features,dtype=np.float32)),torch.tensor(np.array(test_labels_num)))

# 定义神经网络模型
class Classifer(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Classifer, self).__init__()
        self.fc1 = nn.Linear(input_dim, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, output_dim)
        self.dropout = nn.Dropout(p=0.3)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc4(x)
        x = self.softmax(x)
        return x
    
# 定义数据集及其相关的参数
input_size = 2048
num_classes = len(labels_num)
batch_size = 64

test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Classifer(input_size, num_classes).to(device)
model.load_state_dict(torch.load(path+'model_filter.pt'))

# 在测试集上评估模型
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for features, labels in test_loader:
        # 将数据移动到GPU
        features = features.to(device)
        labels = labels.to(device)

        # 前向传播
        outputs = model(features)
        _, predicted = torch.max(outputs.data, dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Accuracy of the network on the test features: {} %'.format(100 * correct / total))

def split_reaction(reaction):
    rct, prd = reaction.split('>>')
    prd_mols = [x for x in prd.split('.')]
    
    reactions = []
    for p in prd_mols:
        reaction = [rct].copy()
        reaction.append(p)
        reactions.append(reaction)
    
    return reactions
#随便找个进行预测
def get_mfp(product):
    mol = Chem.MolFromSmarts(product)
    fp = AllChem.GetMorganFingerprintAsBitVect(mol,2,nBits=2048)
    onbits = list(fp.GetOnBits())
    arr = np.zeros(fp.GetNumBits(),dtype = bool)
    arr[onbits] = 1
    return arr
tempa = get_mfp("[C:4]-[N;H0;D3;+0:5](-[C:6])-[C;H0;D3;+0:1](-[C:2])=[O;D1;H0:3]")
feature = tempa.unsqueeze(0).to(device)

output = model(feature)
_, predicted = torch.max(output.data, dim=1)
print(labels_num[int(predicted)])
print(split_reaction(labels_num[int(predicted)]))


In [None]:
import numpy as np
from rdkit import Chem

# 导入
#model1 = load_model_task1()
#model2 = load_model_task2()

# Depth-first search
def dfs(curr_mol, target_mol):
    # 搜索结束
    if curr_mol == target_mol:
        return [Chem.MolToSmiles(curr_mol)]

    # task1
    output = model(feature)
    _, predicted = torch.max(output.data, dim=1)
    print(labels_num[int(predicted)])
    print(split_reaction(labels_num[int(predicted)]))
    
    possible_reactants = generate_possible_reactants(model1, curr_mol)

    next_mol = possible_reactants[0]

    # 递归
    child_paths = []
    if next_mol is not None:
        child_paths = [dfs(next_mol, target_mol)]
        
    # 合成路径
    curr_path = [Chem.MolToSmiles(curr_mol)] + child_paths[0]
    
    return curr_path


# main
def multi_step_retrosynthesis_planning(starting_molecules, target_molecules):
    all_paths = []
    for start_mol in starting_molecules:
        for target_mol in target_molecules:
            path = dfs(start_mol, target_mol)
            if path is not None:
                all_paths.append(path)

    return all_paths

from rdkit.Chem import Draw
mol_path_list = multi_step_retrosynthesis_planning(['O-[C;H0;D3;+0:1](-[C:2])=[O;D1;H0:3]'], ['[C:4]-[N;H0;D3;+0:5](-[C:6])-[C;H0;D3;+0:1](-[C:2])=[O;D1;H0:3]'])
for mol in mol_path_list:
    Draw.ShowMol(mol, size=(150,150), kekulize=False)

In [7]:
!pip install rdkit

Collecting rdkit
  Downloading rdkit-2023.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (29.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m29.7/29.7 MB[0m [31m44.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: rdkit
Successfully installed rdkit-2023.3.1
[0m

In [26]:
from rdkit import Chem
from rdkit.Chem import AllChem

# Load the models for Tasks 1 and 2
model_task1 = load_model_task1()
model_task2 = load_model_task2()

# Define the target molecule and product molecule for Task 3
target_mol = 'CC(=O)Nc1ccc(C(=O)O)c(c1)C(=O)O'
product_mol = 'CC(=O)Nc1ccc(cc1C(=O)O)C(=O)O'

# Convert the target and product molecules to RDKit molecules
target_mol = Chem.MolFromSmiles(target_mol)
product_mol = Chem.MolFromSmiles(product_mol)

# Generate possible reactants for the target molecule using the model from Task 1
possible_reactants = generate_possible_reactants(model_task1, target_mol)

# Evaluate the cost of each possible reactant using the model from Task 2
reactant_costs = []
for reactant in possible_reactants:
    fp = get_fingerprint([reactant])[0]
    cost = evaluate_cost(model_task2, fp)
    reactant_costs.append(cost)

# Choose the reactant with the lowest cost as the starting material for synthesis planning (Task 3)
starting_reactant = possible_reactants[np.argmin(reactant_costs)]

# Predict the reaction template for the synthesis pathway using the models from Tasks 1 and 3
reaction_template = predict_reaction_template(model_task1, starting_reactant, product_mol)

# Print the results
print('Possible reactants: ', [Chem.MolToSmiles(mol) for mol in possible_reactants])
print('Reactant costs: ', reactant_costs)
print('Starting material: ', Chem.MolToSmiles(starting_reactant))
print('Reaction template: ', reaction_template)

Possible reactants:  ['COC(=O)CC(C)(C)Cl', 'CC(C)(C)C(=O)Nc1ccc(C(=O)O)c(c1)C(=O)O', 'COc1ccc(cc1C(=O)O)C(=O)O']
Reactant costs:  [0.9999921915349426, 1.0, 0.9333480596542358]
Starting material:  COc1ccc(cc1C(=O)O)C(=O)O
Reaction template:  CC(C)(C)C(=O)N>>CO

