# CNN model (from "Fast Solar Image Classification Using Deep Learning and its Importance for Automation in Solar Physics" - Convolutional neural network)

## 0. Load modules


In [1]:
# Main libraries used all the time
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# Operating system libraries
import os
import sys

# Machine learning libraries
import torch
from torchvision import transforms, datasets
from torchvision.datasets import ImageFolder
from torch.utils.data import Dataset, DataLoader, random_split

from torchvision.transforms import v2 # library to define the transforms

!pip install torcheval  # needs to run on google colab, on local machine you can just "pip install torcheval" in the terminal
from torcheval.metrics.functional import multiclass_f1_score

# Visualization and debugging
from sklearn.metrics import confusion_matrix
import seaborn as sns
!pip install sunpy
from sunpy.visualization.colormaps import color_tables as ct


Collecting torcheval
  Downloading torcheval-0.0.7-py3-none-any.whl (179 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/179.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/179.2 kB[0m [31m749.3 kB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━[0m [32m122.9/179.2 kB[0m [31m1.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m179.2/179.2 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torcheval
Successfully installed torcheval-0.0.7
Collecting sunpy
  Downloading sunpy-5.1.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m26.0 MB/s[0m eta [36m0:00:00[0m
Collecting parfive[ftp]>=2.0.0 (from sunpy)
  Downloading parfive-2.0.2-py3-none-any.whl (26 kB)


## 1. Import the data, create the dataset, define the Dataloaders

### 1.1 Declare data path

In [2]:
# For Google Colab, mount Google Drive, for local environments, get local path (github)

# Change with the appropriate path. Log in into Drive and create the folders with the data

if 'google.colab' in sys.modules:
    from google.colab import drive
    drive.mount('/content/drive')
    # Carlos
    #folder0_path = '/content/drive/My Drive/solar_jets/data0'
    #folder0_test_path = '/content/drive/My Drive/solar_jets/data0_test'
    #folder1_path = '/content/drive/My Drive/solar_jets/data1'
    #folder1_test_path = '/content/drive/My Drive/solar_jets/data1_test'

    # Julie
    folder0_path = '/content/drive/My Drive/Colab Notebooks/CS433-project2/data0'
    folder1_path = '/content/drive/My Drive/Colab Notebooks/CS433-project2/data1'
    folder0_test_path = '/content/drive/My Drive/Colab Notebooks/CS433-project2/data0_test'
    folder1_test_path = '/content/drive/My Drive/Colab Notebooks/CS433-project2/data1_test'
else:
    # For local environments like VS Code
    folder0_path = './data0'
    folder1_path = './data1'
    folder0_test_path = './data0_test'
    folder1_test_path = './data1_test'

Mounted at /content/drive


### 1.2 Declare the class and the transforms

In [3]:
class NPZDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir

        # in self.files, only add f if it has 30 samples
        self.files = [f for f in os.listdir(data_dir) if self._check_file_shape(f)]
        self.transform = transform

    # Check if the file has 30 samples, if not, don't include it in self.files (see above)
    def _check_file_shape(self, file):
        file_path = os.path.join(self.data_dir, file)
        data = np.load(file_path)['arr_0']
        return data.shape == (166, 166, 30)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path = os.path.join(self.data_dir, self.files[idx])
        data = np.load(file_path)['arr_0']

        # Rearrange dimensions to (30, 166, 166) for PyTorch (insted of (166, 166, 30))
        data = np.moveaxis(data, -1, 0)

        # assign label 1 if data is from data1, 0 if from data0
        label = 1.0 if 'data1' in self.data_dir else 0.0
        return torch.from_numpy(data).float(), label

In [4]:
mean = 51.6644
std = 62.7087

train_transform = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True), #equivalent to transforms.ToTensor()
    v2.Normalize((mean,), (std,)),
    #v2.RandomResizedCrop(size=(140, 140)),
    #v2.RandomAffine(degrees=(-90,90), translate=(0.3, 0.3), scale=(0.8, 1.2))
    # add other transforms if needed
])

test_transform = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True), #equivalent to transforms.ToTensor()
    v2.Normalize((mean,), (std,)),
])

### 1.3 Get the data, declare the Dataloaders

In [5]:
train_data1 = NPZDataset(folder1_path, transform=train_transform)
train_data0 = NPZDataset(folder0_path, transform=train_transform)
train_data = torch.utils.data.ConcatDataset([train_data1, train_data0])

test_data1 = NPZDataset(folder1_test_path, transform=test_transform)
test_data0 = NPZDataset(folder0_test_path, transform=test_transform)
test_data = torch.utils.data.ConcatDataset([test_data1, test_data0])


In [6]:
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16, shuffle=False)

### 1.4 Check that we have our data

In [7]:
print("Total training samples: ",len(train_data))
print("Total testing samples: ", len(test_data))

for i, (x, y) in enumerate(train_loader):
    print(f"Batch {i}: samples {x.shape[0]}")

Total training samples:  1384
Total testing samples:  345
Batch 0: samples 16
Batch 1: samples 16
Batch 2: samples 16
Batch 3: samples 16
Batch 4: samples 16
Batch 5: samples 16
Batch 6: samples 16
Batch 7: samples 16
Batch 8: samples 16
Batch 9: samples 16
Batch 10: samples 16
Batch 11: samples 16
Batch 12: samples 16
Batch 13: samples 16
Batch 14: samples 16
Batch 15: samples 16
Batch 16: samples 16
Batch 17: samples 16
Batch 18: samples 16
Batch 19: samples 16
Batch 20: samples 16
Batch 21: samples 16
Batch 22: samples 16
Batch 23: samples 16
Batch 24: samples 16
Batch 25: samples 16
Batch 26: samples 16
Batch 27: samples 16
Batch 28: samples 16
Batch 29: samples 16
Batch 30: samples 16
Batch 31: samples 16
Batch 32: samples 16
Batch 33: samples 16
Batch 34: samples 16
Batch 35: samples 16
Batch 36: samples 16
Batch 37: samples 16
Batch 38: samples 16
Batch 39: samples 16
Batch 40: samples 16
Batch 41: samples 16
Batch 42: samples 16
Batch 43: samples 16
Batch 44: samples 16
Batch 4

## 2. Define the neural network

In [8]:
class CNN(torch.nn.Module):

    def __init__(self):
        super(CNN, self).__init__()

        ## Convolution layers

        # 2 conv 64 + max pool
        self.conv1 = self.create_conv_layer(in_channels=30, out_channels=64)
        self.conv2 = self.create_conv_layer(in_channels=64, out_channels=64)
        self.pool1 = torch.nn.MaxPool2d(kernel_size=2,stride=2)

        # 2 conv 128 + max pool
        self.conv3 = self.create_conv_layer(in_channels=64, out_channels=128)
        self.conv4 = self.create_conv_layer(in_channels=128, out_channels=128)
        self.pool2 = torch.nn.MaxPool2d(kernel_size=2,stride=2)

        # 2 conv 256 + max pool
        self.conv5 = self.create_conv_layer(in_channels=128, out_channels=256)
        self.conv6 = self.create_conv_layer(in_channels=256, out_channels=256)
        self.pool3 = torch.nn.MaxPool2d(kernel_size=2,stride=2)

        # 2 conv 512 + max pool
        self.conv7 = self.create_conv_layer(in_channels=256, out_channels=512)
        self.conv8 = self.create_conv_layer(in_channels=512, out_channels=512)
        self.pool4 = torch.nn.MaxPool2d(kernel_size=2,stride=2)

        # 2 conv 512 + max pool
        self.conv9 = self.create_conv_layer(in_channels=512, out_channels=512)
        self.conv10 = self.create_conv_layer(in_channels=512, out_channels=512)
        self.pool5 = torch.nn.MaxPool2d(kernel_size=2,stride=2)

        ## Fully connected layers

        # linear + ReLU + dropot (p=0.5)
        self.linear1 = torch.nn.Linear(in_features=512*5*5, out_features=1600)
        self.R1 = torch.nn.ReLU()
        self.DO1 = torch.nn.Dropout(p=0.5)

        # linear + ReLU + dropot (p=0.5)
        self.linear2 = torch.nn.Linear(in_features=1600, out_features=1600)
        self.R2 = torch.nn.ReLU()
        self.DO2 = torch.nn.Dropout(p=0.5)

        # Final linear layer
        self.linear3 = torch.nn.Linear(in_features=1600, out_features=2)
        self.sm = torch.nn.Softmax()


        # To make the code more clear, let's use this function to create conv layers
    def create_conv_layer(self, in_channels, out_channels):
        conv = torch.nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1)
        norm = torch.nn.BatchNorm2d(out_channels)
        relu = torch.nn.ReLU()
        return torch.nn.Sequential(conv, norm, relu)


    def forward(self, x):

        # Define the forward pass
        ## Convolution layers
        x=self.conv1(x)
        x=self.conv2(x)
        x=self.pool1(x)

        x = self.conv3(x)
        x = self.conv4(x)
        x = self.pool2(x)

        x = self.conv5(x)
        x = self.conv6(x)
        x = self.pool3(x)

        x = self.conv7(x)
        x = self.conv8(x)
        x = self.pool4(x)

        x = self.conv9(x)
        x = self.conv10(x)
        x = self.pool5(x)
        x = x.view(x.size(0), -1)

        ## Fully connected layers
        x = self.linear1(x)
        x = self.R1(x)
        x = self.DO1(x)

        x = self.linear2(x)
        x = self.R2(x)
        x = self.DO2(x)

        x = self.linear3(x)

        return x

In [9]:
model = CNN()

## 3. Define the training functions

In [10]:
def train_epoch(model, optimizer, scheduler, criterion, train_loader, epoch, device):
    model.train()
    loss_history = []
    accuracy_history = []
    lr_history = []
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.float().to(device), target.float().to(device)
        optimizer.zero_grad()
        output = model(data)
        argmax_output = torch.argmax(output, dim=1).float().clone().detach().requires_grad_(True) # weird but equivalent to torch.tensor(...)

        # Debugging
        if batch_idx == 24:
          print("output: ", output)
          print("argmax output: ", argmax_output)
          print("target: ", target)
        loss = criterion(argmax_output, target)

        loss.backward()
        optimizer.step()
        scheduler.step()

        pred = torch.argmax(output, dim=1).float().clone().detach().requires_grad_(False)

        correct = pred.eq(target.view_as(pred)).sum().item()
        loss_float = loss.item()
        accuracy_float = correct / len(data)

        loss_history.append(loss_float)
        accuracy_history.append(accuracy_float)
        lr_history.append(scheduler.get_last_lr()[0])

        # this is the if statement of the lab, but it doesn't work for me. it makes no sense to me tbh
        #if batch_idx % (len(train_loader.dataset) // len(data) // 10) == 0:
        # this doesn't work bad, but it just prints the last batch of the epoch
        """
        if batch_idx == len(train_loader) - 1:
            print(
                f"* Train Epoch: {epoch}-{batch_idx:03d} \n"
                f"batch_loss={loss_float:0.2e} "
                f"batch_acc={accuracy_float:0.3f} "
                f"lr={scheduler.get_last_lr()[0]:0.3e} \n"
            )
      """
    epoch_accuracy = np.sum(accuracy_history) / len(train_loader)
    epoch_loss = np.sum(loss_history) / len(train_loader)
    print(f"* Train Epoch: {epoch} \n"
          f"average accuracy = {100. * epoch_accuracy:.2f}")

    return loss_history, accuracy_history, lr_history

## 4. Define the testing functions

In [11]:
@torch.no_grad()
def test_model(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = torch.argmax(output, dim=1).float().clone().detach().requires_grad_(False)
            #pred = torch.tensor(torch.argmax(output, dim=1).float(), requires_grad=False)

            test_loss += criterion(pred, target)  # sum up batch loss
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = correct / len(test_loader.dataset)
    print(f'* Test set: \n Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({100. * test_accuracy:.2f}%)\n')
    return test_loss, test_accuracy


In [12]:
@torch.no_grad()
def compute_f1(model, device, test_loader):
    model.eval()
    batch_f1 = np.zeros(len(test_loader))

    for batch_idx, (data, target) in enumerate(test_loader):    # iterate over each minibatch
        data, target = data.to(device), target.to(device)
        output = model(data)
        pred = torch.argmax(output, dim=1).float().clone().detach().requires_grad_(False)

        batch_f1[batch_idx] = multiclass_f1_score(pred, target, num_classes=2)   # compute the F1 score for 1 minibatch

    f1_score = np.sum(batch_f1) / len(test_loader)  # Output is the average of the f1 score over all minibatches

    return f1_score

## 5. Define the training parameters

In [13]:
criterion = torch.nn.CrossEntropyLoss()

num_epochs = 5

optimizer = torch.optim.AdamW(model.parameters())

scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=(len(train_loader.dataset) * num_epochs) // train_loader.batch_size,
)



## 6. Train the network!

In [None]:
# Training History
lr_history = []
train_loss_history = []
train_acc_history = []

# Validation History
val_loss_history = []
val_acc_history = []

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
i = 1

for epoch in range(1, num_epochs + 1):
    print(f" === Training starting for epoch {i} / {num_epochs} === \n")
    train_loss, train_acc, lrs = train_epoch(model, optimizer, scheduler, criterion, train_loader, epoch, device)
    train_loss_history.extend(train_loss)
    train_acc_history.extend(train_acc)
    lr_history.extend(lrs)

    # Test the model after each epoch
    test_loss, test_accuracy = test_model(model, test_loader, criterion, device)
    val_loss_history.append(test_loss)
    val_acc_history.append(test_accuracy)
    i += 1

f1_score = compute_f1(model, device, test_loader)

print(f"Final F1 score: {f1_score:.4f}")


## 6. Analyze the results

In [16]:
print("learning rate", lr_history)
print("train loss", train_loss_history)
print("train accuracy", train_acc_history)
print("test loss", val_loss_history)
print("test accuracy", val_acc_history)

learning rate []
train loss []
train accuracy []
test loss []
test accuracy []


In [None]:
n_train = len(train_acc_history)
t_train = num_epochs * np.arange(n_train) / n_train
t_test = np.arange(1, num_epochs + 1)


fig, axs = plt.subplots(1, 3, figsize=(15, 5))

# Plotting learning rate history
axs[0].plot(t_train, lr_history)
axs[0].set_title('Learning Rate History')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Learning Rate')

# Plotting training loss history
axs[1].plot(t_train, train_loss_history, color='b', label="Train")
#axs[1].plot(t_test, val_loss_history, label="Test", color='orange')
axs[1].legend()
axs[1].set_title('Training Loss History')
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Loss')

# Plotting training accuracy history
axs[2].plot(t_train, train_acc_history, color='b', label="Train")
axs[2].plot(t_test, val_acc_history, label="Test", color='orange')
axs[2].legend()
axs[2].set_title('Training Accuracy History')
axs[2].set_xlabel('Epoch')
axs[2].set_ylabel('Accuracy')

plt.tight_layout()
plt.show()

plt.show()

## 7. Debugging tools

### a) Confusion matrix

In [None]:
# Convert PyTorch tensors to NumPy arrays
model.eval()
predictions = []
labels = []
model = model.to(device)

for data, target in test_loader:
  data, target = data.to(device), target.to(device)
  output = model(data)
  pred = torch.argmax(output, dim=1).float().clone().detach().requires_grad_(False)
  pred = pred.cpu().numpy()
  target = target.cpu().numpy()

  predictions.append(pred)
  labels.append(target)

print("predictions: ", predictions)
print("labels: ", labels)


# Compute confusion matrix using scikit-learn
cm = confusion_matrix(labels, predictions)

# Create a heatmap for visualization
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
            xticklabels=["Class 0", "Class 1"],
            yticklabels=["Class 0", "Class 1"])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix")
plt.show()

### b) Visualize misclassified images

In [None]:
# Set the model to evaluation mode
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

misclassified_images = []
true_labels = []
predicted_labels = []

# Iterate through the test set
for data, target in test_loader:
    # Forward pass
    data, target = data.to(device), target.to(device)
    output = model(data)
    pred = torch.argmax(output, dim=1).float().clone().detach().requires_grad_(False)

    # Identify misclassified images
    misclassified_mask = pred != target
    misclassified_images.extend(data[misclassified_mask])
    true_labels.extend(target[misclassified_mask])
    predicted_labels.extend(pred[misclassified_mask])

# Convert lists to PyTorch tensors
misclassified_images = torch.stack(misclassified_images)
true_labels = torch.stack(true_labels)
predicted_labels = torch.stack(predicted_labels)

# Assuming you have a function to reverse normalization if applied during data preprocessing
# Replace reverse_normalize with your actual function
# Example:
reverse_normalize = transforms.Compose([transforms.Normalize(mean=0, std=(1/62.7087)),
                                        transforms.Normalize(mean=-51.6644, std=1)])

# Display misclassified images
num_images_to_display = min(10, len(misclassified_images))
fig, axes = plt.subplots(1, num_images_to_display, figsize=(45,6))

for i in range(num_images_to_display):
    # Assuming images are in RGB format
    image = reverse_normalize(misclassified_images[i]).cpu().numpy()
    true_label = true_labels[i].item()
    predicted_label = predicted_labels[i].item()

    axes[i].imshow(image[15,:,:], cmap='sdoaia304')
    axes[i].set_title(f'True: {true_label}, Predicted: {predicted_label}')
    axes[i].axis('off')

plt.show()
