# Implement Hessian Loss Landscapes

Ashley S. Dale

Notebook loads a pretrained ALIGNN model, and calculates the loss landscape using Hessian directions


- Relevant paper: [*Visualizing high-dimensional loss landscapes with Hessian directions* by Bottcher and Wheeler](https://iopscience.iop.org/article/10.1088/1742-5468/ad13fc/meta)

---
Notebook Outline:

0. Select and load trained model and data

0. Generate a set of predictions for the data

0. Select a subset of well predicted instances to be "In Distribution" (ID) based on the z-score of the prediction error, where low z-score represents well predicted and therefore in-distribution

0. Select a subset of poorly predicted instances to be "Out of Distribution" (OOD) based on the z-score of the prediction error, where a high z-score represents poorly predicted and therefore out-of-distribution

0. Load Hessian eigenvectors as two new models. These models will define the coordinate axes of the loss landscape

0. Calculate the loss landscape using the original model as the origin, and the models generated from the eigenvectors of the Hessian as the two directions in which the original model is perturbed. Repeat this twice for the ID and OOD datasets.


In [1]:
# For the hessian calculation, these additional packages should be installed
# !pip install torchdiffeq

In [2]:
import copy
import torch
import glob
import json

import numpy as np
from tqdm import tqdm
# import seaborn as sns
import matplotlib.pyplot as plt
from pandas import DataFrame

import ipywidgets as widgets
from torchinfo import summary
from pymatgen.core.periodic_table import Element
from collections import OrderedDict

import alignn
from alignn.pretrained import *
from jarvis.db.figshare import data
from jarvis.db.figshare import data
from jarvis.db.jsonutils import loadjson

import loss_landscapes
import loss_landscapes.metrics
from loss_landscapes.model_interface.model_wrapper import ModelWrapper
from abc import ABC, abstractmethod

from src.utils import *

In [None]:
if torch. cuda. is_available():
    print("Using GPU ...")
    device = 'cuda'
else:
    device = 'cpu'

# Load Model

In [4]:
list_of_pretrained_models = list(get_all_models().keys())

-> Select the `jv_formation_energy_peratom_alignn` model for the demo

In [None]:
style = {'description_width': 'initial'}

config_selector = widgets.Dropdown(
    options=list_of_pretrained_models,
    value=list_of_pretrained_models[0],
    description='Select Model',
    style=style,
    disabled=False,
)

display(config_selector)

In [None]:
# This is the model we will load
model_name = config_selector.value
print("Selected: ", model_name)

In [None]:
model = get_figshare_model(model_name)
model.to(device)
_ = model.eval()

In [8]:
model_wt_dict = OrderedDict([i for i in model.named_parameters()])

In [None]:
summary(model)

# Load Data

In [10]:
# target = 'optb88vdw_bandgap'
target = 'formation_energy_peratom'
n_samples = 1000
element_to_omit_from_training_data = 'Fe'

In [None]:
d = data("dft_3d")
d = d[:n_samples]
dataset = DataFrame(copy.deepcopy(d))
atoms_df = DataFrame(list(DataFrame(d)['atoms']))
dataset = pd.concat([dataset, atoms_df], axis=1)
train_idx, test_idx = get_split(dataset, 'elements', element_to_omit_from_training_data)
print('num train samples: '+ str(len(train_idx)))
print('num test samples: '+ str(len(test_idx)))

# Predicting on Test and Train Data

split all samples based having or not having Fe

In [None]:
train_data = [d[idx] for idx in train_idx.to_list()]
train_dataloader = get_data_loader(train_data, target, workers=0)

model_train_predictions = []
original_train_targets = []
for s in tqdm(train_dataloader):
    original_train_targets.append(s[2].detach().numpy()[0])
    y_pred = model([s[0].to(device), s[1].to(device)])
    y_pred = np.expand_dims(y_pred.cpu().detach().numpy(), axis=0)[0]
    model_train_predictions.append(y_pred)

In [None]:
test_data = [d[idx] for idx in test_idx.to_list()]
test_dataloader = get_data_loader(test_data, target, workers=0)

model_test_predictions = []
original_test_targets = []
for s in tqdm(test_dataloader):
    original_test_targets.append(s[2].detach().numpy()[0])
    y_pred = model([s[0].to(device), s[1].to(device)])
    y_pred = np.expand_dims(y_pred.cpu().detach().numpy(), axis=0)[0]
    model_test_predictions.append(y_pred)


## Subselect Train Data Samples: Most ID (Minimum Error)

In [14]:
num_sample_in_loader = 50 #choose the number of samples involved in hessian computation and loss landscape visualization

In [None]:
train_df = pd.DataFrame(train_data)
train_df['pred_val'] = model_train_predictions
train_df['err'] = (train_df[target] - train_df['pred_val'])
train_df['abs_err'] = np.abs(train_df[target] - train_df['pred_val'])
train_df['z_score_err'] = (train_df['abs_err'] - np.mean(train_df['abs_err']))/np.std(train_df['abs_err'])
train_df.head()

In [None]:
ax = plt.hist(train_df['z_score_err'].values, bins=100)
plt.title('Train Set Predictions (Omitting '+element_to_omit_from_training_data+')')
plt.xlabel('Value')
plt.ylabel('Count')
plt.show()

In [None]:
#select samples with minimal error
train_subset_df = train_df.nsmallest(num_sample_in_loader, 'z_score_err')
train_subset_df_idx = train_subset_df.index.values.tolist()
train_subset_list = [train_data[i] for i in train_subset_df_idx]
train_subset_dataloader = get_data_loader(train_subset_list, target, workers=4)
train_subset_df.head() #contains the ids of train sub-selected samples

In [18]:
#load train subset to dataloader
subset_train_x = []
subset_train_y = []

for i in train_subset_dataloader:
    subset_train_x.append((i[0], i[1]))
    subset_train_y.append(i[2])

## Subselect Test Samples - Most OOD (Maximum Error)

In [19]:
test_df = pd.DataFrame(test_data)
test_df['pred_val'] = model_test_predictions
test_df['err'] = (test_df[target] - test_df['pred_val'])
test_df['abs_err'] = np.abs(test_df[target] - test_df['pred_val'])
test_df['z_score_err'] = (test_df['abs_err'] - np.mean(train_df['abs_err']))/np.std(train_df['abs_err'])

In [None]:
test_subset_df = test_df.nlargest(num_sample_in_loader, 'z_score_err')
test_subset_df_idx = test_subset_df.index.values.tolist()
test_subset_list = [test_data[i] for i in test_subset_df_idx]
test_subset_dataloader = get_data_loader(test_subset_list, target, workers=4)
test_subset_df.head()

In [None]:
ax = plt.hist(test_df['z_score_err'].values, bins=100,)
plt.title('Test Set Predictions (Exclusively '+element_to_omit_from_training_data+')')
plt.xlabel('Value')
plt.ylabel('Count')
plt.show()

## Loading and Formatting Eigenvectors from saved local file

Configure loss function

In [39]:
class LogCoshLoss(torch.nn.Module):
    def forward(self, y_pred, y_true):
        return torch.mean(torch.log(torch.cosh(y_pred - y_true)))
    

class RMSELoss(torch.nn.Module):
    def forward(self, y_pred, y_true):
        return torch.sqrt(torch.nn.MSELoss()(y_pred, y_true))  # Compute RMSE from MSE

LogCoshLoss()

RMSELoss()

torch.nn.L1Loss()

torch.nn.MSELoss()

loss_func = torch.nn.MSELoss()


In [40]:
### Defaults from OG implementation

func = copy.deepcopy(model)
og_params = [i[1] for i in func.named_parameters() if len(i[1].size()) > 1]
og_layer_names = [i[0] for i in func.named_parameters() if len(i[1].size())>1]

In [None]:
model_eig_max = copy.deepcopy(func)
model_eig_max.load_state_dict(torch.load('model_eig_max.pt', weights_only=True))
model_eig_min = copy.deepcopy(func)
model_eig_min.load_state_dict(torch.load('model_eig_min.pt', weights_only=True))

# Create 2D Directed Loss Surface

In [42]:
import loss_landscapes
import loss_landscapes.metrics

from loss_landscapes.model_interface.model_wrapper import ModelWrapper
from abc import ABC, abstractmethod

specifie resolution

In [43]:
STEPS = 30

In [44]:
# This is the custom model wrapper for the loss landscapes calculation
class Metric(ABC):
    """ A quantity that can be computed given a model or an agent. """

    def __init__(self):
        super().__init__()

    @abstractmethod
    def __call__(self, model_wrapper: ModelWrapper):
        pass

class Loss(Metric):
    """ Computes a specified loss function over specified input-output pairs. """
    def __init__(self, loss_fn, model, inputs: torch.Tensor, target: torch.Tensor):
        super().__init__()
        self.loss_fn = loss_fn
        self.inputs = inputs
        self.model = model
        self.target = target

    def __call__(self, model_wrapper: ModelWrapper) -> float:
        outputs = model_wrapper.forward(self.inputs)
        err = self.loss_fn(self.target[0], outputs)
        return err

for each sample in dataloader, compute losslandscape, and saves to a list

todo: 
- test on two versions of perturbation:
1. Direction = [eigenvector-original_weight], which is the original implementation
2. Direction = [eigenvector], need to adjust the function

In [None]:
train_loss_landscapes_list = []
for i, batch in tqdm(enumerate(train_subset_dataloader), total=len(train_subset_dataloader)):
    x_train = (batch[0].to(device), batch[1].to(device))
    y_train = (batch[2].to(device))
    metric = Loss(loss_func, func.eval(), x_train, y_train)

    # print('True', float(y_train))
    # print('Predicted',float(model(x_train)))
    # print('MSE',np.abs((float(y_train)-float(model(x_train))))**2)
    # print('metric', metric(func.eval()))

    try:
        loss_data_fin = loss_landscapes.planar_interpolation_v4( #use updated version of planar interpolation
            model_start=func.eval(), 
            model_end_one=model_eig_max.eval(),
            model_end_two=model_eig_min.eval(),
            metric=metric, steps=STEPS, deepcopy_model=True
            )
        ## TODO: ugly patch to convert array from torch.tensors to np.array
        tmp_ll = []
        for row in loss_data_fin:
            tmp_row = []
            for itm in row:
                tmp_row.append(itm.detach().cpu().numpy())
            tmp_ll.append(tmp_row)
    except Exception as e:
        print(e+'batch id: '+str(i))
        # continue
    train_loss_landscapes_list.append(np.expand_dims(np.array(tmp_ll), axis=2))


computes average and standard deviation of the loss landscape array

In [28]:
tmp = np.concatenate(train_loss_landscapes_list, axis=2) 
avg_loss_landscape = np.mean(tmp, axis=2) #averaging the loss landscape for all samples
std_loss_landscape = np.std(tmp, axis=2)

In [None]:
save_fig_name = os.path.join('loss_contours_train.png')
fig, ax = plt.subplots(1, 1)
plt.imshow(np.log10(avg_loss_landscape), cmap='jet', origin='lower')
ax.set_title('Averaged train sample Loss Contours in log scale \n'+ r'$L(\theta + \alpha i + \beta j$)')
ax.set_xlabel(r'$\alpha$')
ax.set_ylabel(r'$\beta$')
plt.colorbar()
#fig.savefig(save_fig_name, transparent=True, dpi=300)
plt.show()

In [None]:
test_loss_landscapes_list = []
for i, batch in tqdm(enumerate(test_subset_dataloader), total=len(test_subset_dataloader)):
    x_train = (batch[0].to(device), batch[1].to(device))
    y_train = (batch[2].to(device))
    metric = Loss(loss_func, func.eval(), x_train, y_train) #actually x_test,y_test but named differently
    try:
        loss_data_fin = loss_landscapes.planar_interpolation_v4(
            model_start=func.eval(), 
            model_end_one=model_eig_max.eval(),
            model_end_two=model_eig_min.eval(),
            metric=metric, steps=STEPS, deepcopy_model=True
            )
        ## TODO: ugly patch to convert array from torch.tensors to np.array
        tmp_ll = []
        for row in loss_data_fin:
            tmp_row = []
            for itm in row:
                tmp_row.append(itm.detach().cpu().numpy())
            tmp_ll.append(tmp_row)

    except Exception as e:
        print(e+'batch id: '+str(i)) 
        # continue
    test_loss_landscapes_list.append(np.expand_dims(np.array(tmp_ll), axis=2))

computes average and standard deviation of the loss landscape array

In [31]:
tmp = np.concatenate(test_loss_landscapes_list, axis=2)
test_avg_loss_landscape = np.mean(tmp, axis=2)
test_std_loss_landscape = np.std(tmp, axis=2)

In [None]:
save_fig_name = os.path.join('loss_contours_test.png')
fig, ax = plt.subplots(1, 1)
plt.imshow(np.log10(test_avg_loss_landscape), cmap='jet', origin='lower')
ax.set_title('Averaged test sample Loss Contours in log scale \n'+ r'$L(\theta + \alpha i + \beta j$)')
ax.set_xlabel(r'$\alpha$')
ax.set_ylabel(r'$\beta$')
plt.colorbar()
#fig.savefig(save_fig_name, transparent=True, dpi=300)
plt.show()

### Saving outputs

If STEPS, loss function, or training subset are changed, then start in a new folder.

In [33]:
import os
import datetime


def initialize_save(output_data, base_path,comment):
    # Create a timestamp for the folder name
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    folder_name = f"output_{timestamp}_"+comment
    folder_path = os.path.join(base_path, folder_name)

    # Create the folder if it doesn't exist
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
        print(f"Created folder: {folder_path}")
    else:
        print(f"Folder already exists: {folder_path}")

    # Create a text file in the folder
    file_name = "parameters.txt"
    file_path = os.path.join(folder_path, file_name)

    # Save the output data to the text file
    with open(file_path, 'w') as f:
        f.write(output_data)
    
    print(f"Output saved to: {file_path}")

    return folder_path

def save_dfs_to_csv(train_df, test_df, base_path):
    # Create a timestamp for the folder name
    folder_path = base_path

    # Save the DataFrames to CSV files
    train_csv_path = os.path.join(folder_path, "train_subset.csv")
    test_csv_path = os.path.join(folder_path, "test_subset.csv")

    train_df.to_csv(train_csv_path, index=False)
    test_df.to_csv(test_csv_path, index=False)

    print(f"Train subset saved to: {train_csv_path}")
    print(f"Test subset saved to: {test_csv_path}")
# Example usage
output_data = (
    f'Model name = {model_name}\n'
    f'STEPS = {STEPS}\n'
    f'Loss function = {loss_func}\n'
    f'# Train subset = {len(train_subset_df)}\n'
    f'# Test subset = {len(test_subset_df)}'
)
base_path = r"C:\Users\EthanH24\Desktop\ML research\loss_landscapes_demo\outputs"  # Specify your base path here

# new_folder_path = initialize_save(output_data, base_path,'dir=eigenvec-starting_point,corrected_steps') #!!!!!!!!!!!!!this is new folder path

# save_dfs_to_csv(train_subset_df, test_subset_df, new_folder_path)

for train loss landscape in train loss landscape list:
    index the training sample in train subset df
    create a folder named df['jid']

    save the training sample in train subset df in txt

    save train loss landscape in txt
    save train loss landscape png
    save the output data

In [34]:
def save_loss_landscapes(train_or_test, loss_landscape_list, train_subset_df, folder_path):

    for i, loss_landscape in enumerate(loss_landscape_list):
        # Index the training sample in train_subset_df
        sample_index = i  # Assuming the index corresponds to the loss landscape
        sample_data = train_subset_df.iloc[sample_index]

        # Create a folder named df['jid']
        jid_folder = os.path.join(folder_path, train_or_test+str(sample_data['jid']))
        if not os.path.exists(jid_folder):
            os.makedirs(jid_folder)
            print(f"Created folder: {jid_folder}")

         # Save the training sample in train_subset_df as a CSV file
        sample_csv_path = os.path.join(jid_folder, train_or_test+str(sample_data['jid'])+"sample_data.csv")
        sample_data.to_frame().T.to_csv(sample_csv_path, index=False)  # Convert Series to DataFrame and save



         # Save train loss landscape in a text file
        loss_array_path = os.path.join(jid_folder, train_or_test+str(sample_data['jid'])+"_loss_landscape_array.npy")

        np.save(loss_array_path, loss_landscape)


        save_fig_name = os.path.join(jid_folder,train_or_test+str(sample_data['jid'])+'_loss_contours.png')
        fig, ax = plt.subplots(1, 1)
        plt.imshow(np.log10(loss_landscape), cmap='jet', origin='lower')
        ax.set_title(str(sample_data['jid'])+ 'Sample Loss Contours in log scale \n'+ r'$L(\theta + \alpha i + \beta j$)')
        ax.set_xlabel(r'$\alpha$')
        ax.set_ylabel(r'$\beta$')
        plt.colorbar()
        fig.savefig(save_fig_name, transparent=True, dpi=300)
        plt.close()

        save_fig_name = os.path.join(folder_path,train_or_test+str(sample_data['jid'])+'_loss_contours.png')
        fig, ax = plt.subplots(1, 1)
        plt.imshow(np.log10(loss_landscape), cmap='jet', origin='lower')
        ax.set_title(str(sample_data['jid'])+ 'Sample Loss Contours in log scale \n'+ r'$L(\theta + \alpha i + \beta j$)')
        ax.set_xlabel(r'$\alpha$')
        ax.set_ylabel(r'$\beta$')
        plt.colorbar()
        fig.savefig(save_fig_name, transparent=True, dpi=300)
        plt.close()

        # Save the output data (if needed)
        output_txt_path = os.path.join(jid_folder, train_or_test+str(sample_data['jid'])+"_calculation parameters.txt")
        with open(output_txt_path, 'w') as f:
            f.write(output_data)


save

In [None]:
new_folder_path = initialize_save(output_data, base_path,'MSE_centered') #!!!!!!!!!!!!!this is new folder path

save_dfs_to_csv(train_subset_df, test_subset_df, new_folder_path)

In [None]:
# Example usage
folder_path = new_folder_path
save_loss_landscapes('train_', train_loss_landscapes_list, train_subset_df, folder_path)

save_loss_landscapes('test_', test_loss_landscapes_list, test_subset_df, folder_path)