In [1]:
from google.colab import drive
drive.mount('/content/drive/')

import os


Mounted at /content/drive/


In [2]:
# Finding the Data Directory
data_dir = '/content/drive/My Drive/Ml_IR/1kP(24)/'
os.chdir(data_dir)

# List the contents of the directory
print(os.listdir())

print("------------")


['ir_spectroscopy_dataset.pt', 'expanded_ir_spectroscopy_dataset.pt', 'IR_NN(4L)(1k).ipynb']
------------


In [9]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F

data = torch.load('expanded_ir_spectroscopy_dataset.pt')
X_values = data['X_values']
Y_values = data['Y_values']
Labels = data['Labels']

#print(X_values)


def preprocess_data(X_values, Y_values, Labels, cut_front=10, cut_end=10):
    """
    Preprocess the X_values and Y_values tensors by trimming a few points from the beginning and end,
    and removing data points with all NaN values or only NaN intensities.

    Parameters:
    - X_values: torch.Tensor
        The tensor containing the X values (wavenumbers) of the IR spectra
    - Y_values: torch.Tensor
        The tensor containing the Y values of the IR spectra
    - Labels: torch.Tensor
        The tensor containing the labels corresponding to Y_values
    - cut_front: int, default=10
        The number of data points to cut from the front of each spectrum
    - cut_end: int, default=10
        The number of data points to cut from the end of each spectrum

    Returns:
    - processed_X: torch.Tensor
        The processed X values tensor
    - processed_Y: torch.Tensor
        The processed Y values tensor
    - processed_Labels: torch.Tensor
        The processed labels tensor
    """
    # Trim the front and end of each spectrum
    processed_X = X_values[cut_front:-cut_end]
    processed_Y = Y_values[:, cut_front:-cut_end]

    # Create a mask for non-NaN rows and rows that are not all NaN
    mask = ~torch.isnan(processed_Y).all(dim=1) & ~torch.isnan(processed_Y).any(dim=1)

    # Apply the mask to Y_values and Labels
    processed_Y = processed_Y[mask]
    processed_Labels = Labels[mask]

    return processed_X, processed_Y, processed_Labels

# Example usage:
processed_X_values, processed_Y_values, processed_Labels = preprocess_data(X_values, Y_values, Labels)
print(f"Original X shape: {X_values.shape}, Processed X shape: {processed_X_values.shape}")
print(f"Original Y shape: {Y_values.shape}, Processed Y shape: {processed_Y_values.shape}")
print(f"Original labels: {Labels.shape}, Processed labels: {processed_Labels.shape}")



Original X shape: torch.Size([1000]), Processed X shape: torch.Size([980])
Original Y shape: torch.Size([423, 1000]), Processed Y shape: torch.Size([417, 980])
Original labels: torch.Size([423, 24]), Processed labels: torch.Size([417, 24])


  data = torch.load('expanded_ir_spectroscopy_dataset.pt')


In [10]:
import numpy as np
import torch

def select_diverse_examples(processed_Y_values, processed_Labels, num_examples=10):
    num_samples = processed_Y_values.shape[0]
    num_labels = processed_Labels.shape[1]

    # Convert Labels to numpy for easier manipulation
    labels_np = processed_Labels.cpu().numpy()

    selected_indices = []
    remaining_indices = list(range(num_samples))

    while len(selected_indices) < num_examples and remaining_indices:
        if not selected_indices:
            # Select the first example randomly
            idx = np.random.choice(remaining_indices)
        else:
            # Calculate the Hamming distance to already selected examples
            distances = []
            for idx in remaining_indices:
                dist = [np.sum(labels_np[idx] != labels_np[sel_idx]) for sel_idx in selected_indices]
                distances.append(np.mean(dist))

            # Select the example with distance closest to the average
            avg_dist = np.mean(distances)
            idx = min(remaining_indices, key=lambda i: abs(distances[remaining_indices.index(i)] - avg_dist))

        selected_indices.append(idx)
        remaining_indices.remove(idx)

    # Create test set
    Y_test = processed_Y_values[selected_indices]
    Labels_test = processed_Labels[selected_indices]

    # Create train set (all indices not in selected_indices)
    train_indices = list(set(range(num_samples)) - set(selected_indices))
    Y_train = processed_Y_values[train_indices]
    Labels_train = processed_Labels[train_indices]

    return Y_train, Labels_train, Y_test, Labels_test

# Use the function to split the dataset
#Y_train, Labels_train, Y_test, Labels_test = select_diverse_examples(processed_Y_values, processed_Labels)
# Training Set: Indices 0 to 399
Y_train = processed_Y_values[:400]        # Shape: [400, 1000]
Labels_train = processed_Labels[:400]      # Shape: [400, 24]

# Testing Set: Indices 400 to 416
Y_test = processed_Y_values[400:417]       # Shape: [17, 1000]
Labels_test = processed_Labels[400:417]    # Shape: [17, 24]


print(f"Training set shape: {Y_train.shape}")
print(f"Training labels shape: {Labels_train.shape}")
print(f"Test set shape: {Y_test.shape}")
print(f"Test labels shape: {Labels_test.shape}")

# Verify that we have diverse examples in the test set
print("\nUnique label combinations in test set:")
for i in range(len(Labels_test)):
    print(f"Example {i+1}: {Labels_test[i].nonzero().squeeze().tolist()}")

Training set shape: torch.Size([400, 980])
Training labels shape: torch.Size([400, 24])
Test set shape: torch.Size([17, 980])
Test labels shape: torch.Size([17, 24])

Unique label combinations in test set:
Example 1: [6, 7, 23]
Example 2: [7, 8, 18, 23]
Example 3: 23
Example 4: [3, 6, 23]
Example 5: [1, 3, 11, 16]
Example 6: [1, 2, 6, 7, 16]
Example 7: [0, 23]
Example 8: [13, 23]
Example 9: 23
Example 10: [7, 11]
Example 11: [3, 6, 23]
Example 12: [1, 6, 23]
Example 13: [0, 1]
Example 14: [1, 4, 6, 23]
Example 15: [0, 7, 9, 23]
Example 16: [0, 7]
Example 17: [6, 7, 8, 22, 23]


In [11]:
#----// NN //---------------------------

import torch
import torch.nn as nn

class NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(980, 4000),
            nn.BatchNorm1d(4000),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(4000, 2000),
            nn.BatchNorm1d(2000),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(2000, 500),
            nn.BatchNorm1d(500),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(500, 24),
            nn.Sigmoid()
        )

        print(f"The NN has: {sum(p.numel() for p in self.parameters())} parameters")

    def forward(self, Y_values, Labels):
        out = self.layers(Y_values)

        # Create a target tensor from Labels
        target = Labels.to(out.device)

        # Compute binary cross-entropy loss
        loss = nn.functional.binary_cross_entropy(out, target)

        return loss, out

    def __call__(self, data_tensor):
        # Ensure data_tensor is on the same device as the model
        data_tensor = data_tensor.to(next(self.parameters()).device)
        return self.layers(data_tensor)

nn_model = NN()
optimizer = torch.optim.Adam(nn_model.parameters(), lr=0.001, weight_decay=1e-1)

# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Move the model to the GPU
nn_model = nn_model.to(device)

# Move data tensors to GPU
Labels_train = Labels_train.to(device)
Y_train = Y_train.to(device)
Y_test = Y_test.to(device)
Labels_test = Labels_test.to(device)

print("Model and data moved to GPU successfully.")

# L2 regularization is now handled by the optimizer's weight_decay parameter


The NN has: 12938524 parameters
Using device: cuda
Model and data moved to GPU successfully.


In [12]:

#----// Training Loop //---------------------------

optimizer = torch.optim.Adam(nn_model.parameters(), lr=0.00001)  # Using Adam optimizer with lower learning rate

num_epochs = 1000  # Set a maximum number of epochs
best_loss = float('inf')
patience = 100  # Number of epochs to wait for improvement before early stopping
no_improve = 0

for epoch in range(num_epochs):
    nn_model.train()  # Set the model to training mode
    optimizer.zero_grad()
    loss, _ = nn_model.forward(Y_train, Labels_train)
    loss.backward()
    optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {loss.item()}")

    # Early stopping check
    if loss.item() < best_loss:
        best_loss = loss.item()
        no_improve = 0
    else:
        no_improve += 1

    if no_improve >= patience:
        print(f"Early stopping triggered. No improvement for {patience} epochs.")
        break

    if loss.item() < 0.001:
        print("Reached Stop Point!!")
        print(f"Converged after {epoch+1} epochs.")
        break

print(f"Training completed. Final loss: {loss.item()}")



Epoch 1, Loss: 14.195473670959473
Epoch 2, Loss: 13.885296821594238
Epoch 3, Loss: 13.897542953491211
Epoch 4, Loss: 13.967738151550293
Epoch 5, Loss: 14.194052696228027
Epoch 6, Loss: 13.68596363067627
Epoch 7, Loss: 13.9193754196167
Epoch 8, Loss: 13.853171348571777
Epoch 9, Loss: 13.881257057189941
Epoch 10, Loss: 14.009359359741211
Epoch 11, Loss: 14.199118614196777
Epoch 12, Loss: 13.4772310256958
Epoch 13, Loss: 13.814972877502441
Epoch 14, Loss: 13.788009643554688
Epoch 15, Loss: 14.076170921325684
Epoch 16, Loss: 14.16325855255127
Epoch 17, Loss: 13.834697723388672
Epoch 18, Loss: 13.743424415588379
Epoch 19, Loss: 13.318275451660156
Epoch 20, Loss: 13.847588539123535
Epoch 21, Loss: 14.000826835632324
Epoch 22, Loss: 13.89306354522705
Epoch 23, Loss: 13.336800575256348
Epoch 24, Loss: 14.015186309814453
Epoch 25, Loss: 13.78852367401123
Epoch 26, Loss: 13.61034107208252
Epoch 27, Loss: 13.776159286499023
Epoch 28, Loss: 13.562904357910156
Epoch 29, Loss: 13.352815628051758
Epo

In [None]:

#-------------------------------
"""
    Plot a single IR spectrum.

    Parameters:
    - X: array-like
        The wavenumbers (x-axis values)
    - Y: array-like
        The absorbance or transmittance values (y-axis values)
    - title: str, default='IR Spectrum'
        Title of the plot
    - xlabel: str, default='Wavenumber (cm⁻¹)'
        Label for the x-axis
    - ylabel: str, default='Absorbance'
        Label for the y-axis
    - filename: str, default='ir_spectrum.png'
        Filename to save the plot
"""
"""
def plot_ir(X, Y, title='IR Spectrum', xlabel='Wavenumber (cm⁻¹)', ylabel='Absorbance', filename='ir_spectrum.png'):
    plt.figure(figsize=(10, 6))
    plt.plot(X, Y)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(filename, dpi=300)
    plt.close()

    print(f"IR spectrum plot saved as '{filename}'")

# Example usage:
plot_ir(processed_X_values, processed_Y_values[2])
"""

'\ndef plot_ir(X, Y, title=\'IR Spectrum\', xlabel=\'Wavenumber (cm⁻¹)\', ylabel=\'Absorbance\', filename=\'ir_spectrum.png\'):\n    plt.figure(figsize=(10, 6))\n    plt.plot(X, Y)\n    plt.xlabel(xlabel)\n    plt.ylabel(ylabel)\n    plt.title(title)\n    plt.grid(True, linestyle=\'--\', alpha=0.7)\n    plt.tight_layout()\n    plt.savefig(filename, dpi=300)\n    plt.close()\n\n    print(f"IR spectrum plot saved as \'{filename}\'")\n\n# Example usage:\nplot_ir(processed_X_values, processed_Y_values[2])\n'

In [13]:
i = 4
def get_indices_above_threshold(vector, threshold):
    return [i for i, value in enumerate(vector) if value > threshold]

# Modified inference
nn_model.eval()  # Set the model to evaluation mode
with torch.no_grad():  # Disable gradient calculation
    model_output = nn_model(Y_test[i].unsqueeze(0)).squeeze()  # Add batch dimension and then remove it

threshold = 0.5  # Increased threshold for more confident predictions
indices_above_threshold = get_indices_above_threshold(model_output, threshold)
correct = get_indices_above_threshold(Labels_test[i], threshold)

print("Indices Predicted:", indices_above_threshold)
print("Actual Indices:", correct)

# Calculate and print accuracy metrics
true_positives = len(set(indices_above_threshold) & set(correct))
precision = true_positives / len(indices_above_threshold) if indices_above_threshold else 0
recall = true_positives / len(correct) if correct else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")

Indices Predicted: [7, 23]
Actual Indices: [1, 3, 11, 16]


In [None]:
def evaluate_model_accuracy(model, X_values, y_true, threshold=0.9):
    model.eval()
    total_correct = 0
    total_predicted = 0
    total_actual = 0

    with torch.no_grad():
        for i in range(len(X_values)):
            model_output = model(X_values[i].unsqueeze(0)).squeeze()
            predicted_indices = set(get_indices_above_threshold(model_output, threshold))
            actual_indices = set(get_indices_above_threshold(y_true[i], threshold))

            correct = predicted_indices.intersection(actual_indices)
            total_correct += len(correct)
            total_predicted += len(predicted_indices)
            total_actual += len(actual_indices)

    precision = total_correct / total_predicted if total_predicted > 0 else 0
    recall = total_correct / total_actual if total_actual > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    print(f"Precision: {precision:.4f}(Positive guesses/inverse Proportional to false positive rate)")
    print(f"Recall: {recall:.4f} (Negative Guesses/Inverse Proportional to false negative rate.)")
    print(f"F1 Score: {f1_score:.4f}(total performance!)")

    return precision, recall, f1_score

# Usage
precision, recall, f1_score = evaluate_model_accuracy(nn_model, Y_test, Labels_test)

Precision: 0.6111(Positive guesses/inverse Proportional to false positive rate)
Recall: 0.3548 (Negative Guesses/Inverse Proportional to false negative rate.)
F1 Score: 0.4490(total performance!)


In [None]:
# Export the model for future training
torch.save({
    'model_state_dict': nn_model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss.item()
}, 'ir_model_checkpoint_(4L)_(L2)_v1.pth')

print("Model saved successfully for future training.")


Model saved successfully for future training.


In [None]:
# Assuming you have defined your model architecture (nn_model) beforehand

# Load the checkpoint
checkpoint = torch.load('ir_model_checkpoint_(4L)_(L2)_v1.pth')

# Load the model state dictionary
nn_model.load_state_dict(checkpoint['model_state_dict'])

# If you want to continue training, you can also load the optimizer state
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# Set the model to evaluation mode if you're using it for inference
#nn_model.eval()

print("Model loaded successfully.")

  checkpoint = torch.load('ir_model_checkpoint_(4L)_(L2)_v1.pth')


Model loaded successfully.
