In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertTokenizer, BertModel
from torch.utils.data import TensorDataset, DataLoader
import os
from tqdm import tqdm

In [None]:
# Path to the folder containing the.csv files
pasta = r'D:\OneDrive\Documentos\LabMol\IC-Citotoxicidade\DeepCytosafe multiparam\Data\teste'

# Load all.csv files in the specified folder
datasets = []
for arquivo in os.listdir(pasta):
    if arquivo.endswith('.csv'):
        caminho_completo = os.path.join(pasta, arquivo)
        df = pd.read_csv(caminho_completo)
        datasets.append(df)

# Function to tokenize texts using BERT
def tokenize_with_bert(tokenizer, texts, max_length=128):
    return tokenizer(texts, padding='max_length', truncation=True, max_length=max_length, return_tensors='pt')

# Prepare the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize the datasets
tokenized_datasets = []
for df in datasets:
    tokenized = tokenize_with_bert(tokenizer, df['final_smiles'].tolist())
    labels = torch.tensor(df['Outcome'].values)
    tokenized_datasets.append((tokenized['input_ids'], tokenized['attention_mask'], labels))

# Concatenate the tokenized datasets
input_ids = torch.cat([x[0] for x in tokenized_datasets])
attention_masks = torch.cat([x[1] for x in tokenized_datasets])
labels = torch.cat([x[2] for x in tokenized_datasets])

# Create a TensorDataset
dataset = TensorDataset(input_ids, attention_masks, labels)

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [0.8, 0.1, 0.1])

# Create DataLoaders
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
train_loader.torch_save('train_loader.pt')
val_loader.torch_save('val_loader.pt')
test_loader.torch_save('test_loader.pt')

In [None]:
class MultiTaskBERT(nn.Module):
    def __init__(self, bert_model, num_labels_per_task):
        super(MultiTaskBERT, self).__init__()
        self.bert = bert_model
        self.num_labels_per_task = num_labels_per_task
        self.classifier1 = nn.Sequential(
            nn.Linear(self.bert.config.hidden_size, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, self.num_labels_per_task[0])
        )
        self.classifier2 = nn.Sequential(
            nn.Linear(self.bert.config.hidden_size, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, self.num_labels_per_task[1])
        )
        # Add more classifiers for additional tasks as needed

    def forward(self, input_ids, attention_mask, task_idx):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        if task_idx == 0:
            logits = self.classifier1(pooled_output)
        elif task_idx == 1:
            logits = self.classifier2(pooled_output)
        # Add more logits for additional tasks as needed
        return logits


In [None]:
# Configuração do modelo e otimizador
model = MultiTaskBERT(BertModel.from_pretrained('bert-base-uncased'), num_labels_per_task=[2, 2])
optimizer = optim.AdamW(model.parameters(), lr=2e-5)
criterion = nn.BCEWithLogitsLoss()  # Use BCEWithLogitsLoss for binary classification

# Função de treinamento
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    for batch in dataloader:
        input_ids, attention_mask, labels = batch
        input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
        
        optimizer.zero_grad()
        logits = model(input_ids, attention_mask, task_idx=0)  # Assuming task_idx=0 for the first task
        loss = criterion(logits, labels.float())  # Use float() to convert labels to float
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item() * labels.size(0)
        predicted = torch.sigmoid(logits)  # Apply sigmoid to get probabilities
        predicted = (predicted > 0.5).int()  # Convert probabilities to binary labels
        total_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)
    
    return total_loss / total_samples, total_correct / total_samples

# Função de avaliação
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for batch in dataloader:
            input_ids, attention_mask, labels = batch
            input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
            
            logits = model(input_ids, attention_mask, task_idx=0)  # Assuming task_idx=0 for the first task
            loss = criterion(logits, labels.float())  # Use float() to convert labels to float
            
            total_loss += loss.item() * labels.size(0)
            predicted = torch.sigmoid(logits)  # Apply sigmoid to get probabilities
            predicted = (predicted > 0.5).int()  # Convert probabilities to binary labels
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
    
    return total_loss / total_samples, total_correct / total_samples


In [None]:
#Treinamento
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available():
    print(torch.cuda.get_device_name(0))
          
model.to(device)

best_val_acc = 0.0
for epoch in range(num_epochs):
    train_loss, train_acc = 0.0, 0.0
    with tqdm(train_loader, unit="batch") as tepoch:
        for batch in tepoch:
            input_ids, attention_mask, labels = batch
            input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
            
            optimizer.zero_grad()
            logits = model(input_ids, attention_mask, task_idx=0)  # Assuming task_idx=0 for the first task
            _, predicted = torch.max(logits, 1)
            correct = (predicted == labels).sum().item()
            train_acc += correct
            train_loss += (1 - (correct / labels.size(0))) * labels.size(0)
            
            # Define the loss as a PyTorch tensor
            loss_tensor = torch.tensor(1 - (correct / labels.size(0)), requires_grad=True)
            loss_tensor.backward()
            optimizer.step()
            
            tepoch.set_postfix({'loss': f'{loss_tensor.item():.4f}', 'acc': f'{correct / labels.size(0):.4f}'})

    train_loss /= len(train_loader.dataset)
    train_acc /= len(train_loader.dataset)
    
    val_acc = evaluate_model(model, val_loader, device)
    
    print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'best_model.pth')  # Salva o melhor modelo

# Avaliação final no conjunto de teste
model.load_state_dict(torch.load('best_model.pth'))
test_acc = evaluate_model(model, test_loader, device)
print(f'Final Test Acc: {test_acc:.4f}')