In [None]:
# Mount google drive for use in google colab
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import pandas as pd
import numpy as np
import torch.nn as nn
from torch.optim.lr_scheduler import ReduceLROnPlateau


In [None]:
# Load in data
data = torch.load('/content/drive/MyDrive/solar_forecasting/preprocessed_data_v2.pt')
X_train = data['X_train']
y_train = data['y_train']
X_val = data['X_val']
y_val = data['y_val']
X_test = data['X_test']
y_test = data['y_test']
sample_weights = data['sample_weights']

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2, bidirectional=True, dropout_p=0.3):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                            dropout=dropout_p, bidirectional=bidirectional,
                            batch_first=True)
        
        # Dropout at fully connected layer
        self.dropout = nn.Dropout(dropout_p)
        # COnvert to fc layer, adjust size depending on bidirectionality.
        direction_factor = 2 if bidirectional else 1
        self.fc = nn.Linear(hidden_size * direction_factor, 1)  # Regression output

    def forward(self, x):
        # Get last output
        out, _ = self.lstm(x)
        last_time_step_out = out[:, -1, :]
        # Apply MC dropout
        dropped = self.dropout(last_time_step_out)
        # Output the fully connected layer
        out = self.fc(dropped)
        return out.squeeze()


In [None]:
# Force MC dropout on by enabling training mode for dropout module during inference.
def enable_mc_dropout(model):
    for m in model.modules():
        if isinstance(m, nn.Dropout):
            m.train()  


In [None]:
# Define weighted MSE loss function to address data imbalance.

def weighted_mse_loss(predictions, targets, weights):
    return torch.mean(weights * (predictions - targets) ** 2)

In [None]:
# Initialse dataloaders using the train, validate and test datasets respectively.

from torch.utils.data import Dataset, DataLoader

class SequenceDataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class WeightedSequenceDataset(torch.utils.data.Dataset):
    def __init__(self, X, y, weights=None):
        self.X = X
        self.y = y
        self.weights = weights if weights is not None else torch.ones(len(X))

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx], self.weights[idx]

# Use the weighted dataset for training
train_dataset = WeightedSequenceDataset(X_train, y_train, sample_weights)

# Dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)



In [None]:
# Build test and val loaders
test_dataset = SequenceDataset(X_test, y_test)
val_dataset = SequenceDataset(X_val, y_val)
test_loader = DataLoader(test_dataset, batch_size=32)
val_loader = DataLoader(val_dataset, batch_size=32)


In [None]:
# Initialise model and initial parameters from hyperparameter tuning.



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size = X_train.shape[2]  # number of SHARP features
model = LSTMModel(input_size=input_size).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Scheduler allows tapering of learning rate, by a factor and with a defined patience.
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=300, verbose=True)


In [None]:
# Train the model, printing loss and validation loss per epoch.

num_epochs = 1000
best_val_loss = float('inf')
patience = 500
counter = 0

# Loop trhough epochs and batches.
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for X_batch, y_batch, w_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        w_batch = w_batch.to(device)
        
        # Train model and calculate running loss.
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = weighted_mse_loss(outputs, y_batch, w_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * X_batch.size(0)
    
    # Calculate loss per epoch.
    epoch_loss = running_loss / len(train_loader.dataset)
    # Get current learning rate.
    curr_lr = optimizer.param_groups[0]['lr']
    # Print performance to console.
    print(f"[Epoch {epoch+1}] LR: {curr_lr:.6f}")
    # Validation (no weights) - calculate validation loss for each epoch using validation set.
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            outputs = model(X_batch)
            loss = nn.functional.mse_loss(outputs, y_batch)
            val_loss += loss.item() * X_batch.size(0)

    val_loss /= len(val_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {epoch_loss:.4f} - Val Loss: {val_loss:.4f}")
    # Update scheduler depending on change in validation loss.
    scheduler.step(val_loss)
    
    # Increment counter to update patience
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0  # Reset counter on improvement
        # Save best model using validation performance.
        torch.save(model.state_dict(), "best_model.pt")
    
    # Use early stopping with patience to prevent overfitting.
    else:
        counter += 1
        print(f'Validation loss increased ({val_loss:.4f}), patience {counter}/{patience}')
        if counter >= patience:
            print("Early stopping triggered.")
            break


In [None]:
# Use classification to guage model performance, using confusion matrix for performance evauation on test set.

import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Evaluate model on test set
model.load_state_dict(torch.load("best_model.pt"))
model.eval()
all_preds = []
with torch.no_grad():
    for X_batch, _ in test_loader:
        X_batch = X_batch.to(device)
        outputs = model(X_batch)
        all_preds.extend(outputs.cpu().numpy())

y_pred = np.array(all_preds)
y_true = y_test.cpu().numpy()
# Convert predictions and targets from log-scale to linear scale
y_pred_linear = 10 ** y_pred
y_true_linear = 10 ** y_true

# Define intensity bins for flare classes
bins = [0, 1e-7, 1e-6, 1e-5, 1e-4, np.inf]
labels = ['N', 'B', 'C', 'M', 'X']
class_indices = list(range(len(labels)))  # [0, 1, 2, 3, 4]

# Bin true and predicted values into classes
y_true_class = np.digitize(y_true_linear, bins) - 1
y_pred_class = np.digitize(y_pred_linear, bins) - 1

# Plot confusion matrix
unique_labels = sorted(np.unique(np.concatenate([y_true_class, y_pred_class])))
display_labels = [labels[i] for i in unique_labels]

cm = confusion_matrix(y_true_class, y_pred_class, labels=class_indices)

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels)
disp.plot(cmap='Blues')
plt.title("Confusion Matrix (Flare Classes from Regression Output)")
plt.show()



In [None]:
def mc_predict(model, x_batch, n_passes=50):
    model.eval()
    enable_mc_dropout(model)

    preds = []
    for _ in range(n_passes):
        with torch.no_grad():
            preds.append(model(x_batch).cpu().numpy())

    preds = np.stack(preds, axis=0)  # shape: (n_passes, batch_size)
    mean = preds.mean(axis=0)        # shape: (batch_size,)
    std = preds.std(axis=0)          # shape: (batch_size,)
    return mean, std


In [None]:
all_means = []
all_stds = []

model.eval()
enable_mc_dropout(model)

with torch.no_grad():
    for X_batch, _ in test_loader:
        X_batch = X_batch.to(device)
        batch_mean, batch_std = mc_predict(model, X_batch, n_passes=50)
        all_means.extend(batch_mean)
        all_stds.extend(batch_std)


In [None]:
all_means = np.array(all_means)
all_stds = np.array(all_stds)
mean_linear = 10 ** all_means
std_linear = (10 ** (all_means + all_stds)) - mean_linear  # approximate upper range

for i in range(len(mean_linear)):
    print(f"Sample {i}: Flare = {mean_linear[i]:.2e}, ± {std_linear[i]:.2e}")


In [None]:
y_true = y_test.cpu().numpy()  # assuming log10-scale targets
k = 3  # or 2, depending on how wide you want the tolerance
lower_bounds = all_means - k * all_stds
upper_bounds = all_means + k * all_stds
within_interval = (y_true >= lower_bounds) & (y_true <= upper_bounds)
coverage = np.mean(within_interval)
print(f"Coverage within ±{k}σ: {coverage * 100:.2f}%")
# for i, covered in enumerate(within_interval):
#     if not covered:
#         print(f"Missed Sample {i}: True = {y_true[i]:.4f}, Pred = {all_means[i]:.4f} ± {k}×{all_stds[i]:.4f}")


In [None]:
y_true = y_test.cpu().numpy()
valid_mask = y_true != -9

y_true_valid = y_true[valid_mask]
means_valid = all_means[valid_mask]
stds_valid = all_stds[valid_mask]

k = 3  # number of standard deviations for the tolerance

lower_bounds = means_valid - k * stds_valid
upper_bounds = means_valid + k * stds_valid

within_interval = (y_true_valid >= lower_bounds) & (y_true_valid <= upper_bounds)

coverage = np.mean(within_interval)
print(f"Coverage within ±{k}σ (excluding -9s): {coverage * 100:.2f}%")

# for i, covered in enumerate(within_interval):
#     if not covered:
#         print(f"Missed Sample {i}: True = {y_true_valid[i]:.4f}, Pred = {means_valid[i]:.4f} ± {k}×{stds_valid[i]:.4f}")


