In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np


In [2]:
class CustomDataset(Dataset):
    def __init__(self, data_path):
        # Load the data
        self.data = pd.read_csv(data_path)
        self.features = self.data.iloc[:, :-7].values
        self.labels = self.data.iloc[:, -7:].values

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        features = torch.tensor(self.features[idx], dtype=torch.float32)
        labels = torch.tensor(self.labels[idx], dtype=torch.float32)
        return features, labels


In [3]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_labels, d_model=128, nhead=8, num_encoder_layers=3, dim_feedforward=512, dropout=0.3):
        super(TransformerModel, self).__init__()
        
        # Linear projection of input features
        self.input_proj = nn.Linear(input_dim, d_model)
        
        # Positional encoding (can be dynamically sized)
        self.positional_encoding = nn.Parameter(torch.zeros(1, d_model))
        
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        
        # Output layer
        self.fc_out = nn.Linear(d_model, num_labels)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        # Ensure input has shape [batch_size, seq_len, input_dim]
        if x.dim() == 2:
            x = x.unsqueeze(1)  # Add seq_len dimension if it's missing

        batch_size, seq_len, _ = x.size()

        # Project the input features
        x = self.input_proj(x)

        # Add positional encoding
        positional_encoding = self.positional_encoding.unsqueeze(0).expand(batch_size, seq_len, -1)
        x += positional_encoding
        
        # Pass through the transformer encoder
        x = self.transformer_encoder(x)
        
        # Aggregate features (mean pooling)
        x = x.mean(dim=1)  # Average pooling over the sequence length
        
        # Pass through the output layer
        x = self.fc_out(x)
        
        # Sigmoid activation for multi-label classification
        return self.sigmoid(x)


In [4]:
def train_model(model, train_loader, num_epochs=20, learning_rate=0.001, device="cpu"):
    criterion = nn.BCELoss()  # Use binary cross-entropy loss for multi-label classification
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for features, labels in train_loader:
            features, labels = features.to(device), labels.to(device)  # Move to GPU
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

In [5]:
# Usage example (adjust the parameters as needed)
data_path = 'datasets/dataset_L.csv'  # Replace with your dataset path
dataset = CustomDataset(data_path)

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

batch_size = 32
learning_rate = 0.0001
n_epochs = 100

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [14]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

model = TransformerModel(input_dim=170, num_labels=7).to(device)
train_model(model, train_loader, num_epochs=n_epochs, learning_rate=learning_rate, device=device)

Epoch [1/100], Loss: 0.4039
Epoch [2/100], Loss: 0.3620
Epoch [3/100], Loss: 0.3473
Epoch [4/100], Loss: 0.3394
Epoch [5/100], Loss: 0.3340
Epoch [6/100], Loss: 0.3293
Epoch [7/100], Loss: 0.3252
Epoch [8/100], Loss: 0.3219
Epoch [9/100], Loss: 0.3194
Epoch [10/100], Loss: 0.3163
Epoch [11/100], Loss: 0.3137
Epoch [12/100], Loss: 0.3110
Epoch [13/100], Loss: 0.3083
Epoch [14/100], Loss: 0.3063
Epoch [15/100], Loss: 0.3035
Epoch [16/100], Loss: 0.3020
Epoch [17/100], Loss: 0.3008
Epoch [18/100], Loss: 0.2980
Epoch [19/100], Loss: 0.2962
Epoch [20/100], Loss: 0.2951
Epoch [21/100], Loss: 0.2928
Epoch [22/100], Loss: 0.2902
Epoch [23/100], Loss: 0.2897
Epoch [24/100], Loss: 0.2870
Epoch [25/100], Loss: 0.2856
Epoch [26/100], Loss: 0.2843
Epoch [27/100], Loss: 0.2829
Epoch [28/100], Loss: 0.2818
Epoch [29/100], Loss: 0.2794
Epoch [30/100], Loss: 0.2782
Epoch [31/100], Loss: 0.2773
Epoch [32/100], Loss: 0.2757
Epoch [33/100], Loss: 0.2750
Epoch [34/100], Loss: 0.2740
Epoch [35/100], Loss: 0

In [38]:
model.eval()  # Set the model to evaluation mode
test_loss = 0.0
correct = 0
total = 0
loader = test_loader
# loader = train_loader
results = []
results_x = []
criterion = nn.BCELoss() 

with torch.no_grad():  # Disable gradient computation
    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move to GPU
        outputs = model(inputs)
        results.append(outputs)
        
        # Calculate the loss
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        # Get the predicted class (highest log-probability)
        x, predicted = torch.max(outputs, 1)
        results_x.append((x, predicted, labels))
        
        # print(outputs)
        # Calculate the number of correct predictions
        correct += (predicted == labels.argmax(dim=1)).sum().item()  # labels.argmax(dim=1) for one-hot encoded labels
        total += labels.size(0)

# Calculate average loss and accuracy
avg_test_loss = test_loss / len(loader)
accuracy = correct / total * 100

print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {accuracy:.2f}%')


Test Loss: 0.2485, Test Accuracy: 56.82%


In [53]:
# Usage example (adjust the parameters as needed)
row = 5999
data_path = 'datasets/dataset_L.csv'  # Replace with your dataset path
model.eval()
dataset = CustomDataset(data_path)
outputs = model(dataset[row][0].unsqueeze(0).to(device))

# outputs, dataset[row][1]

# Outputs are already probabilities due to the sigmoid activation
probabilities = (outputs.to("cpu")).detach().numpy()  # Convert to numpy array if needed
percentages = probabilities * 100  # Convert probabilities to percentages

# Format the percentages for readability
formatted_percentages = ["{:.2f}%".format(p) for p in percentages.flatten()]

print("Probability percentages for each label:")
print(formatted_percentages)
print(dataset[row][1])

Probability percentages for each label:
['3.40%', '18.63%', '19.55%', '14.33%', '25.35%', '16.64%', '0.05%']
tensor([0., 1., 0., 0., 0., 0., 0.])


In [11]:
# Save the model
torch.save(model.state_dict(), 'model/transformer_model_2.pth')


In [52]:
# Load the model
model = TransformerModel(input_dim=170, num_labels=7).to(device)
model.load_state_dict(torch.load('model/transformer_model.pth'))



<All keys matched successfully>

## New dataset 

In [5]:
# Usage example (adjust the parameters as needed)
data_path = 'datasets/dataset_2_1.csv'  # Replace with your dataset path
dataset = CustomDataset(data_path)

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

batch_size = 32
learning_rate = 0.0001
n_epochs = 100

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [10]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

model = TransformerModel(input_dim=198, num_labels=7, num_encoder_layers=2).to(device)
train_model(model, train_loader, num_epochs=n_epochs, learning_rate=learning_rate, device=device)

Epoch [1/100], Loss: 0.4048
Epoch [2/100], Loss: 0.3789
Epoch [3/100], Loss: 0.3571
Epoch [4/100], Loss: 0.3445
Epoch [5/100], Loss: 0.3353
Epoch [6/100], Loss: 0.3274
Epoch [7/100], Loss: 0.3208
Epoch [8/100], Loss: 0.3148
Epoch [9/100], Loss: 0.3108
Epoch [10/100], Loss: 0.3060
Epoch [11/100], Loss: 0.3027
Epoch [12/100], Loss: 0.2998
Epoch [13/100], Loss: 0.2963
Epoch [14/100], Loss: 0.2925
Epoch [15/100], Loss: 0.2902
Epoch [16/100], Loss: 0.2872
Epoch [17/100], Loss: 0.2853
Epoch [18/100], Loss: 0.2836
Epoch [19/100], Loss: 0.2796
Epoch [20/100], Loss: 0.2787
Epoch [21/100], Loss: 0.2763
Epoch [22/100], Loss: 0.2743
Epoch [23/100], Loss: 0.2722
Epoch [24/100], Loss: 0.2704
Epoch [25/100], Loss: 0.2686
Epoch [26/100], Loss: 0.2663
Epoch [27/100], Loss: 0.2648
Epoch [28/100], Loss: 0.2631
Epoch [29/100], Loss: 0.2630
Epoch [30/100], Loss: 0.2613
Epoch [31/100], Loss: 0.2591
Epoch [32/100], Loss: 0.2576
Epoch [33/100], Loss: 0.2575
Epoch [34/100], Loss: 0.2547
Epoch [35/100], Loss: 0

In [24]:
from sklearn.metrics import f1_score, recall_score, roc_auc_score, roc_curve
import torch.nn.functional as F

model.eval()  # Set the model to evaluation mode
test_loss = 0.0
correct = 0
total = 0
# loader = test_loader
loader = train_loader
results = []
results_x = []
criterion = nn.BCELoss() 

all_labels = []
all_predictions = []
all_probs = []

with torch.no_grad():  # Disable gradient computation
    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move to GPU
        outputs = model(inputs)
        results.append(outputs)
        
        # Calculate the loss
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        # Get the predicted class (highest log-probability)
        x, predicted = torch.max(outputs, 1)
        results_x.append((x, predicted, labels))
        
        # print(outputs)
        # Calculate the number of correct predictions
        correct += (predicted == labels.argmax(dim=1)).sum().item()  # labels.argmax(dim=1) for one-hot encoded labels
        total += labels.size(0)

# Calculate average loss and accuracy
avg_test_loss = test_loss / len(loader)
accuracy = correct / total * 100

print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {accuracy:.2f}%')


Test Loss: 0.1897, Test Accuracy: 69.53%


In [23]:
from sklearn.metrics import f1_score, recall_score, roc_auc_score
import torch.nn.functional as F
import numpy as np

model.eval()  # Set the model to evaluation mode
loader = test_loader
# loader = train_loader
test_loss = 0.0
correct = 0
total = 0
criterion = nn.BCELoss()

all_labels = []
all_predictions = []
all_probs = []

with torch.no_grad():  # Disable gradient computation
    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move to GPU
        outputs = model(inputs)
        
        # Calculate the loss
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        
        # Convert outputs to probabilities
        probs = torch.sigmoid(outputs)
        all_probs.append(probs.cpu().numpy())
        
        # Apply binary threshold for predictions
        predicted = (probs > 0.5).float()
        
        # Store the labels and predictions
        all_labels.append(labels.cpu().numpy())
        all_predictions.append(predicted.cpu().numpy())
        
        # Calculate correct predictions (for accuracy)
        print(predicted.shape, labels.shape, (predicted == labels).sum(dim=1), labels.size(1))
        correct += ((predicted == labels).sum(dim=1) == labels.size(1)).sum().item()  # All labels correct for each sample
        total += labels.size(0)

# Convert lists to numpy arrays
all_labels = np.concatenate(all_labels, axis=0)
all_predictions = np.concatenate(all_predictions, axis=0)
all_probs = np.concatenate(all_probs, axis=0)

# Calculate metrics
f1 = f1_score(all_labels, all_predictions, average='weighted')
recall = recall_score(all_labels, all_predictions, average='weighted')
roc_auc = roc_auc_score(all_labels, all_probs, average='weighted')

# Calculate average loss and accuracy
avg_test_loss = test_loss / len(loader)
accuracy = correct / total * 100

print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {accuracy:.2f}%')
print(f'F1 Score: {f1:.4f}, Recall: {recall:.4f}, ROC AUC: {roc_auc:.4f}')


torch.Size([32, 7]) torch.Size([32, 7]) tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1], device='cuda:0') 7
torch.Size([32, 7]) torch.Size([32, 7]) tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1], device='cuda:0') 7
torch.Size([32, 7]) torch.Size([32, 7]) tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1], device='cuda:0') 7
torch.Size([32, 7]) torch.Size([32, 7]) tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1], device='cuda:0') 7
torch.Size([32, 7]) torch.Size([32, 7]) tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1], device='cuda:0') 7
torch.Size([32, 7]) torch.Size([32, 7]) tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 