# Introduction to PyTorch

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ChemAI-Lab/AI4Chem/blob/main/website/modules/04-intro_to_torch.ipynb)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

# Tensors & Autograd

1.1 Why Tensors?
What to say (conceptual)

In PyTorch, everything is a tensor: inputs, parameters, outputs, gradients.
A tensor is just a multi-dimensional array plus information needed for differentiation and acceleration.

Compare to NumPy:

* Same idea as `np.ndarray`
* But:

    * Knows about gradients
    * Can live on GPU
    * Can participate in a computational graph

In [None]:
x = torch.tensor([1.0, 2.0, 3.0])
A = torch.randn(3, 3)

print(x)
print(x.shape, A.shape)
print(x.dtype)
print(x.device)

In [None]:
# Tensor Operations & Broadcasting

x = torch.randn(5)
y = torch.randn(5)

z = x + y
w = x * y
print(z.shape,x.shape)

# Matrix operations:
A = torch.randn(4, 3)
B = torch.randn(3, 2)
C = A @ B
print(C.shape)

# Broadcasting
x = torch.randn(4, 3)
y = torch.randn(3)  
z = x + y
print(z.shape)
print(x)
print(y)
print(z)

## Introducing Autograd
In PyTorch, autograd is the built-in automatic differentiation engine that powers all neural network training. It allows for the automatic computation of the gradient of any scalar value (like a loss function) with respect to all variables (like model parameters) that contributed to its computation. <br>

[Introduction to Autograd](https://docs.pytorch.org/tutorials/beginner/blitz/autograd_tutorial.html)

$$
y = x^2 + 3x + 1
$$
and
$$
\frac{\partial y}{\partial x} = 2x + 3
$$

* Every operation creates a node
* The graph is built during the forward pass
* `backward()` applies the chain rule automatically

In [None]:
x = torch.tensor(2.0, requires_grad=True)
y = x**2 + 3*x + 1
y.backward()
print(x.grad)

## Gradient Accumulation
* Gradients accumulate
* Torch assumes you want to sum gradients

$$
\frac{\partial x^2}{\partial x} = 2x \quad \text{and} \quad \frac{\partial x^3}{\partial x} = 3x^2  
$$

PyTorch literally does:<br>
`x.grad += new_gradient`

In [None]:
x = torch.tensor(2.0, requires_grad=True)

y1 = x**2
y1.backward()
print(x.grad)  # 2x = 4

# x.grad.zero_() # uncomment this line

y2 = x**3
y2.backward()
print(x.grad)  # 4 + 3x^2 = 16

We can use gradient accumulation for multiple loss terms, 
$$
{\cal L} = {\cal L}_{\text{MSE}} + \lambda {\cal L}_{\text{regularization}}
$$

# Feed Forward Neural Networks
We will build a **PIP-NN**, permutationally invariant polynomial neural network.

1. **Data loader**: provides an iterable over the data samples, simplifying and optimizing the process of feeding data to a model during training or evaluation
2. **Model**: Feed Forward Neural Network
3. **Training loop**: Main part of training stage

We will first set up data handling, then define the model, and finally train and evaluate it.
Keep an eye on where we scale targets so training, validation, and plots stay consistent.


### Data loader

In [None]:
# load data using pandas
from sklearn.model_selection import train_test_split
data_file = 'https://raw.githubusercontent.com/ChemAI-Lab/Math4Chem/main/website/Assignments/CH4_data.csv'
data = pd.read_csv(data_file)
data.head()

# 1. How many points does the dataset contains?
n = data.shape[0]
print('The dataset contains {} points.'.format(n))

# Load PIPs representation per molecule
y_all = data['energy'].to_numpy()
print('Total energy points:', y_all.shape)
X_all = data.drop(['energy', 'Unnamed: 0'], axis=1).to_numpy()
print("Total Geometries:", X_all.shape)

X_train, X_test, y_train, y_test = train_test_split(
    X_all, y_all, test_size=0.2, random_state=42)
y_train = y_train.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)
print("Training set size:", X_train.shape)
print("Test set size:", X_test.shape)

X0_train = X_train
X0_test = X_test
y0_train = y_train
y0_test = y_test

In [None]:
# -----------------------------
# 1) Dataset
# -----------------------------
class PIPDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
# -----------------------------
# 2) Normalization
# -----------------------------
from sklearn import preprocessing

scaler = preprocessing.StandardScaler()
scaler.fit(y0_train)
# --- Fit standardizer on TRAIN only (important!)
y_train = scaler.transform(y_train)
y_test  = scaler.transform(y_test)

plt.hist(y_train, bins=30, alpha=0.5, label='Train')
plt.hist(y_test, bins=30, alpha=0.5, label='Test')
plt.xlabel('Standardized Energy')
plt.legend()


Note: losses are computed in standardized target space.
We only inverse-transform for plotting or reporting in original units.


# Check your device

In [None]:
def get_device():
    """
    Automatically selects the best available device (CUDA, MPS, or CPU).
    """
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif torch.backends.mps.is_available():
        return torch.device("mps")
    else:
        return torch.device("cpu")


device = get_device()
print(f"Using device: {device}")

In [None]:
dataset_tr = PIPDataset(X_train, y_train)
dataset_test = PIPDataset(X_test, y_test)

batch_size = 264

loader = DataLoader(
    dataset_tr,
    batch_size=batch_size,
    shuffle=True,
    drop_last=False,
    pin_memory=True
)

# Test/validation loader (no shuffle)
test_loader = DataLoader(
    dataset_test,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    pin_memory=True
)

for i,(X_batch, y_batch) in enumerate(loader):
    print(X_batch.shape)  # (batch_size, D)
    print(y_batch.shape)  # (batch_size, 1)
    print(X_batch[:3])
    print(y_batch[:3])
    if i > 1:
        break

In [None]:
# Sanity check: one forward pass shape
X_batch, y_batch = next(iter(loader))
print('X batch:', X_batch.shape, 'y batch:', y_batch.shape)


## Stochastic approximation of MSE
Commonly in ML, we do not use "clean" gradient, we usually use a subset of the data (mini-batch) to approximate the gradient

$$
\nabla {\cal L} \approx \frac{1}{B} \sum_i^B \nabla {\cal L}_i
$$
where ${\cal L}_i$ is the mean square error for each point in the mini-batch ($B$).

Model dimensions: `input_size` = number of features per sample, `hidden_size` = neurons per hidden layer.


```Python
# build a simple n-layer neural network using PyTorch (Sequential in a class)
class SimpleNNSequential(nn.Module):
    def __init__(self, input_size, hidden_size, output_size=1):
        super(SimpleNNSequential, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_size, hidden_size), # Linear projection to the number of neurons in first layer
            nn.Tanh(),
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, output_size), # Scalar output
        )

    def forward(self, x):
        return self.net(x)
```

We can automate the number of layers instead of manually adding them into `SimpleNNSequential`.

In [None]:
class SimpleNNSequential(nn.Module):
    def __init__(self, input_size=2, hidden_size=2, output_size=1, n_layers=3, activation=nn.Tanh):
        super().__init__()
        layers = [nn.Linear(input_size, hidden_size), activation()]
        for _ in range(n_layers - 1):
            layers += [nn.Linear(hidden_size, hidden_size), activation()]
        layers.append(nn.Linear(hidden_size, output_size))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

In [None]:
# --- Model / Optim / Loss
hidden_size = 264
input_size = X_train.shape[1]
n_layers = 3
activation = nn.LeakyReLU
activation_name = 'LeakyReLU'

pipnn_model_info = {"hidden_size": hidden_size,
                  "input_size": input_size,
                  "n_layers": n_layers,
                  "activation": activation_name
                 }

pipnn_model = SimpleNNSequential(input_size=input_size, 
                                 hidden_size=hidden_size, 
                                 output_size=1, 
                                 n_layers=n_layers, 
                                    activation=activation)

pipnn_model.to(device)
print(pipnn_model)
print('Model: ', next(pipnn_model.parameters()).device)

lr = 2E-3
weight_decay = 1E-5
optimizer = torch.optim.Adam(pipnn_model.parameters(), 
                              lr=lr, 
                              weight_decay=weight_decay)
loss_fn = nn.MSELoss()

# # --- LR scheduler (validation-driven)
scheduler = torch.optim.lr_scheduler.ExponentialLR(
    optimizer, gamma=0.95)

# Main training Loop

Validation frequency trades off speed vs. monitoring.
For faster training, evaluate every 10â€“50 epochs instead of every epoch.


In [None]:
n_epochs = 750
train_losses = []
val_losses = []
for epoch in range(n_epochs):
    pipnn_model.train()
    epoch_loss = 0.0
    for i, (X_batch, y_batch) in enumerate(loader):
        optimizer.zero_grad()
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        y_pred = pipnn_model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item() * X_batch.size(0)
    epoch_loss /= len(dataset_tr)

    if (epoch) % 1 == 0:
        # Validation/test loss
        pipnn_model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch = X_batch.to(device)
                y_batch = y_batch.to(device)
                y_pred = pipnn_model(X_batch)
                loss = loss_fn(y_pred, y_batch)
                val_loss += loss.item() * X_batch.size(0)
        val_loss /= len(dataset_test)
        train_losses.append(epoch_loss)
        val_losses.append(val_loss)
        print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}")
        
    if (epoch) % 50 == 0:
        scheduler.step()  # step per epoch



In [None]:
# Plot training and validation loss with EMA
def ema(series, alpha=0.1):
    values = []
    ema_val = None
    for x in series:
        ema_val = x if ema_val is None else alpha * x + (1 - alpha) * ema_val
        values.append(ema_val)
    return values

alpha = 0.05  # smoothing factor
train_ema = ema(train_losses, alpha=alpha)
val_ema = ema(val_losses, alpha=alpha)

plt.figure(figsize=(7, 4))
plt.plot(1*np.arange(len(train_losses)), train_losses, color='tab:blue',label='Training loss', alpha=0.15)
plt.plot(1*np.arange(len(val_losses)), val_losses, color='tab:orange', label='Validation loss', alpha=0.15)
plt.plot(1*np.arange(len(train_ema)), train_ema, ls='--', color='tab:blue', label=f'Training EMA (alpha={alpha})')
plt.plot(1*np.arange(len(val_ema)), val_ema, ls='--',
         color='tab:orange', label=f'Validation EMA (alpha={alpha})')
plt.xlabel('Epoch')
plt.ylabel('Loss')
# plt.ylim(0, 0.5)
plt.yscale('log')
plt.legend()
plt.tight_layout()

In [None]:
def torch_to_numpy(tensor):
    return np.array(tensor.tolist())

Residuals help diagnose bias (mean shift) and spread (variance).
A good model shows residuals centered near zero with similar width for train/test.


In [None]:
pipnn_model.eval()
with torch.no_grad():
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)
    y_test_pred = pipnn_model(X_test_tensor)

    X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
    y_train_pred = pipnn_model(X_train_tensor)

    test_loss = loss_fn(y_test_pred, y_test_tensor).item()
    train_loss = loss_fn(y_train_pred, y_train_tensor).item()

print(f"Test MSE Loss: {test_loss:.4f}")
print(f"Train MSE Loss: {train_loss:.4f}")

# Inverse-transform to original scale for plotting
train_true = scaler.inverse_transform(y_train)
test_true = scaler.inverse_transform(y_test)
train_pred = scaler.inverse_transform(
    torch_to_numpy(y_train_pred))
test_pred = scaler.inverse_transform(
    torch_to_numpy(y_test_pred))

# Residuals
train_res = train_pred - train_true
test_res = test_pred - test_true

_, axs = plt.subplots(1, 3, figsize=(16, 5))
axs[0].scatter(train_true, train_pred, alpha=0.5)
axs[0].plot([train_true.min(), train_true.max()], [train_true.min(), train_true.max()], 'r--')
axs[0].set_xlabel('True Energies')
axs[0].set_ylabel('Predicted Energies')
axs[0].set_title('Training Set')    

axs[1].scatter(test_true, test_pred, alpha=0.5)
axs[1].plot([test_true.min(), test_true.max()], [test_true.min(), test_true.max()], 'r--')
axs[1].set_xlabel('True Energies')
axs[1].set_ylabel('Predicted Energies')
axs[1].set_title('Test Set')    

axs[2].hist(train_res, bins=30, alpha=0.5, label='Train')
axs[2].hist(test_res, bins=30, alpha=0.5, label='Test')
axs[2].axhline(0, color='r', linestyle='--', linewidth=1)
axs[2].set_xlabel('Residual (Pred - True)')
axs[2].set_ylabel('Count')
axs[2].set_title('Residuals')
axs[2].legend()

plt.tight_layout()
plt.show()

In [None]:
# Save model parameters
model_path = f'pipnn_model_{activation_name}.pt'
ckpt = {
    "state_dict": pipnn_model.state_dict(),
    "pipnn_model_info": pipnn_model_info,
}

torch.save(ckpt, model_path)

In [None]:
# Load pretrained model from URL
model_url = 'https://github.com/ChemAI-Lab/AI4Chem/raw/main/website/modules/models/pipnn_model.pt'
state_dict = torch.hub.load_state_dict_from_url(model_url, map_location='cpu')
print(state_dict.keys())
pipnn_model_info = state_dict['pipnn_model_info']
pipnn_weights = state_dict['state_dict']

hidden_size = pipnn_model_info["hidden_size"]
input_size = pipnn_model_info["input_size"]
n_layers = pipnn_model_info["n_layers"]
activation_name = pipnn_model_info["activation"]
activation = nn.Tanh if activation_name == 'Tanh' else nn.ReLU
pipnn_model = SimpleNNSequential(input_size=input_size,
                                 hidden_size=hidden_size,
                                 output_size=1,
                                 n_layers=n_layers,
                                 activation=activation)

pipnn_model.load_state_dict(pipnn_weights)
pipnn_model.eval()

# Let's change the Activation function
1. We are going to load the same weights but use another activation function

Warning: changing activations changes the function class.
Weights trained with one activation may not transfer well to another.


In [None]:
hidden_size = pipnn_model_info["hidden_size"]
input_size = pipnn_model_info["input_size"]
n_layers = pipnn_model_info["n_layers"]
activation_name = pipnn_model_info["activation"]

# new activation function
activation = nn.ReLU

pipnn_model_new = SimpleNNSequential(input_size=input_size,
                                 hidden_size=hidden_size,
                                 output_size=1,
                                 n_layers=n_layers,
                                 activation=activation)

pipnn_model_new.load_state_dict(pipnn_weights)
pipnn_model_new.to(device)
pipnn_model_new.eval()

In [None]:
# Evaluate and plot on original (unscaled) target units
pipnn_model_new.eval()
with torch.no_grad():
    X_test_tensor = torch.tensor(X0_test, dtype=torch.float32).to(device)
    X_train_tensor = torch.tensor(X0_train, dtype=torch.float32).to(device)

    y_test_pred = pipnn_model_new(X_test_tensor)
    y_test_pred = torch_to_numpy(y_test_pred)
    y_train_pred = pipnn_model_new(X_train_tensor)
    y_train_pred = torch_to_numpy(y_train_pred)

print(f"Test MSE Loss: {test_loss:.4f}")
print(f"Train MSE Loss: {train_loss:.4f}")

# Ensure 2D shape for inverse_transform
if y_train_pred.ndim == 1:
    y_train_pred = y_train_pred.reshape(-1, 1)
if y_test_pred.ndim == 1:
    y_test_pred = y_test_pred.reshape(-1, 1)

# Inverse-transform predictions to original scale
train_pred = scaler.inverse_transform(y_train_pred)
test_pred = scaler.inverse_transform(y_test_pred)

# True targets already in original scale
train_true = y0_train
test_true = y0_test
if train_true.ndim == 1:
    train_true = train_true.reshape(-1, 1)
if test_true.ndim == 1:
    test_true = test_true.reshape(-1, 1)

# Residuals (Pred - True)
train_res = train_pred - train_true
test_res = test_pred - test_true

_, axs = plt.subplots(1, 3, figsize=(16, 5))
axs[0].scatter(train_true, train_pred, alpha=0.5)
axs[0].plot([train_true.min(), train_true.max()], [train_true.min(), train_true.max()], 'r--')
axs[0].set_xlabel('True Energies')
axs[0].set_ylabel('Predicted Energies')
axs[0].set_title('Training Set')

axs[1].scatter(test_true, test_pred, alpha=0.5)
axs[1].plot([test_true.min(), test_true.max()], [test_true.min(), test_true.max()], 'r--')
axs[1].set_xlabel('True Energies')
axs[1].set_ylabel('Predicted Energies')
axs[1].set_title('Test Set')

axs[2].hist(train_res, bins=30, alpha=0.5, label='Train')
axs[2].hist(test_res, bins=30, alpha=0.5, label='Test')
axs[2].axhline(0, color='r', linestyle='--', linewidth=1)
axs[2].set_xlabel('Residual (Pred - True)')
axs[2].set_ylabel('Count')
axs[2].set_title('Residuals')
axs[2].legend()

plt.tight_layout()
plt.show()