In [1]:
# Importing necessary libraries
import os

import numpy as np

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader, random_split, SubsetRandomSampler

from sklearn.model_selection import KFold

import matplotlib.pyplot as plt

torch.manual_seed(42)

<torch._C.Generator at 0x7f3a34776970>

In [2]:
# Defining necessary parameters
BATCH_SIZE = 1
EPOCHS_COUNT = 15
NUMBER_OF_SPLITS = 5

In [3]:
# Defining a class for neutron monitor dataset
class NeutronMonitorDataset(Dataset):
    def __init__(self):
        super().__init__()
        self._wavelets_files, self._labels = NeutronMonitorDataset._get_neutron_monitor_data_files()
        self._length = len(self._wavelets_files)
    
    @staticmethod
    def _get_neutron_monitor_data_files():
        wavelets_files, labels = [], []
        data_dirs_paths = [
            "../input/sopo-neutron-monitor-data/calm_days",
            "../input/sopo-neutron-monitor-data/weak_storms",
            "../input/sopo-neutron-monitor-data/strong_storms"
        ]     
        for index, dir_path in enumerate(data_dirs_paths):
            wavelets_array, labels_array = NeutronMonitorDataset._get_data_files_from_directory(dir_path, index)
            wavelets_files.extend(wavelets_array)
            labels.extend(labels_array)
        return wavelets_files, labels
    
    @staticmethod
    def _get_data_files_from_directory(dir_path, label):
        wavelets_files_array = []
        for _, _, files in os.walk(dir_path):
            for file in files:
                if file != ".gitkeep":
                    file_path = os.path.join(dir_path, file)
                    wavelets_files_array.append(file_path)
        labels_array = [label for _ in range(len(wavelets_files_array))]
        return wavelets_files_array, labels_array
    
    def __len__(self):
        return self._length
    
    def __getitem__(self, index):
        sample_wavelet_file = self._wavelets_files[index]
        sample_wavelet_image = torch.tensor(
            np.loadtxt(sample_wavelet_file, dtype=np.float64, delimiter=','),
            dtype=torch.float32
        )
        sample_label = self._labels[index]
        return sample_wavelet_image, sample_label

In [4]:
# defining modified LeNet model for neutron monitor data
class NeutronMonitorDataLeNet(nn.Module):
    def __init__(self):
        super(NeutronMonitorDataLeNet, self).__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(in_channels=64, out_channels=192, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(in_channels=192, out_channels=384, kernel_size=3),
            nn.ReLU(),
            nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3),
            nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Flatten(0)
        )
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=1075200, out_features=4096),
            nn.ReLU(),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(),
            nn.Linear(in_features=4096, out_features=3)
        )
        
    def forward(self, x):
        x = self.feature_extractor(x)
        # print(x.shape)
        logits = self.classifier(x)
        probs = F.softmax(logits, dim=0)
        return logits, probs

In [5]:
# Defining train function
def train(train_loader, model, criterion, optimizer, device):
    model.train()
    running_loss = 0
    for X, y_true in train_loader:
        optimizer.zero_grad()
        X, y_true = X.to(device), y_true.to(device)
        y_got, _ = model(X) 
        loss = criterion(y_got, y_true[0]) 
        running_loss += loss.item() * X.size(0)
        loss.backward()
        optimizer.step()
    epoch_loss = running_loss / len(train_loader.dataset)
    return model, optimizer, epoch_loss

In [6]:
# Defining validation function
def validate(valid_loader, model, criterion, device):
    model.eval()
    running_loss = 0
    for X, y_true in valid_loader:
        X, y_true = X.to(device), y_true.to(device)
        y_got, _ = model(X) 
        loss = criterion(y_got, y_true[0]) 
        running_loss += loss.item() * X.size(0)
    epoch_loss = running_loss / len(valid_loader.dataset)
    return model, epoch_loss

In [7]:
# Defining helper functions
def get_accuracy(model, data_loader, device):
    correct_pred = 0 
    n = 0
    with torch.no_grad():
        model.eval()
        for X, y_true in data_loader:
            X, y_true = X.type(torch.float32).to(device), y_true.to(device)
            _, y_prob = model(X)
            _, predicted_labels = torch.max(y_prob, 0)
            n += y_true.size(0)
            correct_pred += (predicted_labels == y_true).sum()
    return correct_pred.float() / n


def plot_losses(train_losses, valid_losses):
    plt.style.use('seaborn')
    train_losses = np.array(train_losses) 
    valid_losses = np.array(valid_losses)
    fig, ax = plt.subplots(figsize = (8, 4.5))
    ax.plot(train_losses, color='blue', label='Training loss') 
    ax.plot(valid_losses, color='red', label='Validation loss')
    ax.set(title="Loss over epochs", xlabel='Epoch', ylabel='Loss')
    ax.legend()
    fig.show()
    plt.style.use('default')

In [8]:
# Defining training loop function with KFold
def training_kfold_loop(model, criterion, optimizer, train_valid_dataset, splits, epochs, device):
    history = {"train_losses": [], "valid_losses": [], "train_accuracy": [], "valid_accuracy": []}
    for fold, (train_indices, valid_indices) in enumerate(splits.split(np.arange(len(train_valid_dataset)))):
        print(f'Fold {fold + 1}')
        train_sampler = SubsetRandomSampler(train_indices)
        valid_sampler = SubsetRandomSampler(valid_indices)
        train_loader = DataLoader(train_valid_dataset, batch_size=BATCH_SIZE, sampler=train_sampler)
        valid_loader = DataLoader(train_valid_dataset, batch_size=BATCH_SIZE, sampler=valid_sampler)
        for epoch in range(0, epochs):
            # training
            model, optimizer, train_loss = train(train_loader, model, criterion, optimizer, device)
            history["train_losses"].append(train_loss)
            # validation
            with torch.no_grad():
                model, valid_loss = validate(valid_loader, model, criterion, device)
                history["valid_losses"].append(valid_loss)
            train_accuracy = get_accuracy(model, train_loader, device)
            train_accuracy = train_accuracy.data.cpu().numpy() * 100
            valid_accuracy = get_accuracy(model, valid_loader, device)
            valid_accuracy = valid_accuracy.data.cpu().numpy() * 100
            history["train_accuracy"].append(train_accuracy)
            history["valid_accuracy"].append(valid_accuracy)
            print(f'Epoch: {fold + 1}.{epoch + 1}\tTrain loss: {train_loss:.4f}\tValid loss: {valid_loss:.4f}\t'
                  f'Train accuracy: {train_accuracy:.2f}\tValid accuracy: {valid_accuracy:.2f}')
    print(f"Average TL: {np.mean(history['train_losses']):.4f}\tAverage VL: {np.mean(history['valid_losses']):.4f}\t"
          f"Average TA: {np.mean(history['train_accuracy']):.2f}\tAverage VA: {np.mean(history['valid_accuracy']):.2f}")
    return model

In [9]:
# Defining training loop function without KFold
def training_loop(model, criterion, optimizer, train_loader, valid_loader, epochs, device):
    for epoch in range(0, epochs):
        # training
        model, optimizer, train_loss = train(train_loader, model, criterion, optimizer, device)
        # validation
        with torch.no_grad():
            model, valid_loss = validate(valid_loader, model, criterion, device)
        train_accuracy = get_accuracy(model, train_loader, device)
        train_accuracy = train_accuracy.data.cpu().numpy() * 100
        valid_accuracy = get_accuracy(model, valid_loader, device)
        valid_accuracy = valid_accuracy.data.cpu().numpy() * 100
        print(f'Epoch: {epoch + 1}\tTrain loss: {train_loss:.4f}\tValid loss: {valid_loss:.4f}\t'
              f'Train accuracy: {train_accuracy:.2f}\tValid accuracy: {valid_accuracy:.2f}')
    return model

In [10]:
# Defining the device which will be used to train the model
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cuda


In [11]:
# Creating datasets and KFold
full_dataset = NeutronMonitorDataset()
train_valid_dataset, test_dataset = random_split(full_dataset, [85, 10], generator=torch.Generator().manual_seed(42))
splits = KFold(n_splits=NUMBER_OF_SPLITS, shuffle=True, random_state=42)

In [12]:
# Training model using KFold
model = NeutronMonitorDataLeNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
model = training_kfold_loop(
    model, criterion, optimizer, train_valid_dataset, splits, EPOCHS_COUNT, device
)

In [None]:
# Training model without KFold
train_dataset, valid_dataset = random_split(train_valid_dataset, [66, 19], generator=torch.Generator().manual_seed(42))
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE)

model = NeutronMonitorDataLeNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
model = training_loop(
    model, criterion, optimizer, train_loader, valid_loader, EPOCHS_COUNT, device
)

In [None]:
# Testing model
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
test_accuracy = get_accuracy(model, test_loader, device)
print(f"{100 * test_accuracy:.2f}")