In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

In [None]:
class CustomSGD:
    def __init__(self, params, pivot_lr =0.02, lr=0.01, lam=.9, patience=5):
        self.params = list(params)
        self.lr = lr
        self.pivot_lr = pivot_lr
        self.lam = lam
        self.patience = patience
        self.loss_history = []
        self.gradient_history = []

    def zero_grad(self) -> None:
        for p in self.params:
            if p.grad is not None:
                p.grad.detach_()
                p.grad.zero_()

    def step(self, current_loss, change_threshold) -> None:
        self.loss_history.append(current_loss)
        gradient = []
        for p in self.params:
            gradient.append(p.grad.data)
        self.gradient_history.append(gradient)

        if len(self.loss_history) >= self.patience and (abs(sorted(self.loss_history)[-2] - min(self.loss_history)) < change_threshold < abs(max(self.loss_history) - min(self.loss_history))):
            grad = iter(self.gradient_history[0])
            for g, p in zip(grad, self.params):
                g = self.mask_off(g)
                p.data.add_(g, alpha=-self.pivot_lr)
            self.loss_history.pop(0)
            self.gradient_history.pop(0)
        else:
            if len(self.loss_history) >= self.patience:
                self.loss_history.pop(0)
                self.gradient_history.pop(0)
            for p in self.params:
                if p.grad is None:
                    continue
                else:
                    grad = p.grad.data
                    p.data.add_(grad, alpha=-self.lr)
                
    def mask_off(self, grad_copy) -> torch.Tensor:
        grad = grad_copy
        grad = grad/torch.abs(torch.max(grad))
        original_shape = grad.shape
        grad_copy = grad.view(-1)
        grad_boolean_matrix = torch.zeros(original_shape).to(device)
        grad_boolean_matrix = grad_boolean_matrix.view(-1)
        for i in range(grad_copy.numel()):
            if torch.abs(grad_copy[i]) < self.lam: grad_boolean_matrix[i] = 1
        grad_boolean_matrix = grad_boolean_matrix.view(original_shape)
        final_grad = grad_boolean_matrix * grad_copy
        return final_grad

In [None]:
np.random.seed(42)
X = np.random.rand(10000000, 1)
y = 2 * X

X_tensor = torch.FloatTensor(X).to(device)
y_tensor = torch.FloatTensor(y).to(device)

class LinearModel(nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.linear = nn.Linear(1, 30)
        self.leak = nn.LeakyReLU()
        self.output = nn.Linear(30, 1)

    def forward(self, x):
        return self.output(self.leak(self.linear(x)))


model = LinearModel().to(device)
criterion = nn.MSELoss()

In [None]:
torch_sgd = optim.SGD(model.parameters(), lr=.1)

epochs = 100
torch_losses = []

for epoch in range(epochs):
    y_pred = model(X_tensor)
    loss = criterion(y_pred, y_tensor)
    torch_losses.append(loss.item())

    torch_sgd.zero_grad()
    loss.backward()
    torch_sgd.step()

    if (epoch+1) % 10 == 0:
        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}')

In [None]:
model = LinearModel().to(device)

custom_sgd = CustomSGD(model.parameters(), lr=0.1)

custom_losses = []

for epoch in range(epochs):
    y_pred = model(X_tensor)
    loss = criterion(y_pred, y_tensor)
    custom_losses.append(loss.item())

    custom_sgd.zero_grad()
    loss.backward()
    custom_sgd.step()

    if (epoch+1) % 10 == 0:
        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}')


In [None]:
plt.figure(figsize=(10, 6))
plt.plot(torch_losses, label='PyTorch SGD')
plt.plot(custom_losses, label='Custom SGD')
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.title('Loss curves: PyTorch SGD vs Custom SGD')
plt.legend()
plt.grid(True)
plt.show()