In [1]:
import torch as t
import torch.nn as nn
import torch.optim as optim
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
import einops
from tqdm.notebook import tqdm
import copy

In [2]:
device = t.device('cuda' if t.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [6]:
# Define the neural network model
np.random.seed(0)
t.manual_seed(0)

class RandomNet(nn.Module):
    def __init__(self):
        super(RandomNet, self).__init__()
        self.fc1 = nn.Linear(2, 4)
        self.fc2 = nn.Linear(4, 16)
        self.fc3 = nn.Linear(16, 16)
        self.fc4 = nn.Linear(16, 4)
        self.fc5 = nn.Linear(4, 2)

        self.relu = nn.ReLU()
        self.alpha = 0.1
        self.r = 0.1
        self.p = 0.5
        self.scale = 100

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.relu(self.fc3(x))
        x = self.relu(self.fc4(x))
        x = self.fc5(x)
        return x


# Function to compute the loss
def target_loss(prediction, target, scale=100):
    # if the target is [100, 100], then the loss is multiplied by 4
    mask = (target[:, 0] == scale) & (target[:, 1] == scale)
    target_error = ((prediction - target)**2).sum(dim=1)
    target_error[mask] *= 1

    return target_error.mean()

def regularization(model, initial_model, alpha=0.01, device='cpu'):
    reg = 0
    for p, p0 in zip(model.parameters(), initial_model.parameters()):
        reg += t.norm(p - p0)**2
    return alpha * reg

# Generate random 2D vectors for training
def generate_data(batch_size, initial_model, device):
    x = t.rand(batch_size, 2, device=device) * 4 - 2  # Random values between -2 and 2
    # randomly select p of the values to be masked
    mask = t.rand(batch_size) < initial_model.p
    # generate random points in the square with side length 2r for the masked values
    x[mask] = t.rand(sum(mask), 2, device=device) * 2*initial_model.r - initial_model.r
    
    y = x.clone()
    # Mask the values which have radius less than r
    mask = (y[:, 0]**2 + y[:, 1]**2) < initial_model.r**2
    # set the masked values to [100, 100]
    y[mask] = t.tensor([1.0, 1.0], device=device) * initial_model.scale
    # for the rest of the values, set them to the output of the initial parameters
    y[~mask] = initial_model(x[~mask])

    return x, y

def generate_targets(x, initial_model, device):
    y = x.clone()
    mask = (y[:, 0]**2 + y[:, 1]**2) < initial_model.p**2
    y[mask] = 0
    y[~mask] = initial_model(x[~mask])
    return y

# Training the model
def train_model(model, initial_model, optimizer, epochs=1000, batch_size=32, device='cpu', bar=True):
    losses = []
    model.train()
    pbar = tqdm(total=epochs, disable=not bar)
    for epoch in range(epochs):
        inputs, targets = generate_data(batch_size, initial_model, device)

        optimizer.zero_grad()
        outputs = model(inputs)
        
        # Compute the loss
        loss = target_loss(outputs, targets, scale=initial_model.scale) + regularization(model, initial_model, alpha=model.alpha, device=device)
        loss.backward()
        optimizer.step()

        pbar.update(1)
        if epoch % 100 == 0:
            pbar.set_description(f'Loss: {loss.item():.4f}')
            losses.append(loss.item())

    return losses


def train_with_tests(model, initial_model, optimizer, test_inputs, test_interval=100, epochs=1000, batch_size=32, device='cpu', bar=True):
    model.initial_params = [p.to(device) for p in model.parameters()]
    model.train()
    pbar = tqdm(total=epochs, disable=not bar)
    
    # store the test outputs for each 100 epochs
    test_inputs = test_inputs.to(device)
    test_outputs = t.empty((epochs//test_interval, test_inputs.shape[0], test_inputs.shape[1]), device=device)
    for epoch in range(epochs):
        inputs, targets = generate_data(batch_size, device)

        optimizer.zero_grad()
        outputs = model(inputs)
        
        # Compute the loss
        loss = target_loss(outputs, targets) + regularization(model, model.initial_params, alpha=model.alpha)
        loss.backward()
        optimizer.step()

        pbar.update(1)
        if epoch % 100 == 0:
            with t.no_grad():
                test_outputs[epoch//test_interval] = model(test_inputs)
            test_loss = target_loss(test_outputs, generate_targets(test_inputs))
            pbar.set_description(f'Test loss: {test_loss.item():.4f}')

    return test_outputs


In [4]:
# run grid search for alpha, p, and r
device = t.device('cuda' if t.cuda.is_available() else 'cpu')
configs = []
results = []
losses = []
r = 0.5
for alpha in [0.1, 1, 10, 50, 100]:
    for p in [0.1, 0.25, 0.5]:
        for scale in [10, 50, 100]:
            print(f"alpha: {alpha}, p: {p}, scale: {scale}", end=', ')
            model = RandomNet().to(device)
            model.alpha = alpha
            model.r = r
            model.p = p
            model.scale = scale
            initial_model = copy.deepcopy(model)
            
            optimizer = optim.Adam(model.parameters(), lr=0.01)
            losses.append(train_model(model, initial_model, optimizer, epochs=10000, batch_size=64, device=device, bar=False))
            configs.append((alpha, p, r))
            test_inputs, test_targets = generate_data(1000, initial_model, device)
            test_outputs = model(test_inputs)
            test_loss = target_loss(test_outputs, test_targets)
            results.append(test_loss.item())
            print(f"test loss: {test_loss.item()}")


# # Instantiate the model, optimizer, and train the network
# model = RandomNet().to(device)
# # copy model to initial_model
# initial_model = copy.deepcopy(model)

# optimizer = optim.Adam(model.parameters(), lr=0.001)

# model.alpha = 32
# model.r = 0.5

# train_model(model, initial_model, optimizer, epochs=100_000, batch_size=64, device=device)

alpha: 0.1, p: 0.1, scale: 10, test loss: 2.368259906768799
alpha: 0.1, p: 0.1, scale: 50, test loss: 23.143632888793945
alpha: 0.1, p: 0.1, scale: 100, test loss: 110.15974426269531
alpha: 0.1, p: 0.25, scale: 10, test loss: 2.8860456943511963
alpha: 0.1, p: 0.25, scale: 50, test loss: 494.88323974609375
alpha: 0.1, p: 0.25, scale: 100, test loss: 123.39019012451172
alpha: 0.1, p: 0.5, scale: 10, test loss: 3.4292876720428467
alpha: 0.1, p: 0.5, scale: 50, test loss: 57.53285217285156
alpha: 0.1, p: 0.5, scale: 100, test loss: 254.542236328125
alpha: 1, p: 0.1, scale: 10, test loss: 8.39688491821289
alpha: 1, p: 0.1, scale: 50, test loss: 44.5567741394043
alpha: 1, p: 0.1, scale: 100, test loss: 144.87411499023438
alpha: 1, p: 0.25, scale: 10, test loss: 9.273456573486328
alpha: 1, p: 0.25, scale: 50, test loss: 92.08841705322266
alpha: 1, p: 0.25, scale: 100, test loss: 140.04087829589844
alpha: 1, p: 0.5, scale: 10, test loss: 11.64786148071289
alpha: 1, p: 0.5, scale: 50, test loss

In [6]:
# Instantiate the model, optimizer, and train the network
model = RandomNet().to(device)
# copy model to initial_model

optimizer = optim.Adam(model.parameters(), lr=0.001)

model.alpha = 1
model.p = 0.5
model.r = 0.5
model.scale = 100
initial_model = copy.deepcopy(model)

train_model(model, initial_model, optimizer, epochs=100_000, batch_size=64, device=device)


  0%|          | 0/100000 [00:00<?, ?it/s]

In [10]:
model = RandomNet().to(device)
initial_model = copy.deepcopy(model)

# load the initial model from initial_model.pth
initial_model.load_state_dict(t.load('initial_model_new.pth'))
# load the trained model from model.pth
model.load_state_dict(t.load('model_new.pth'))



You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.


You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is poss

<All keys matched successfully>

In [11]:
x = t.tensor([0.0,0.0]).to(device)

x1 = t.tensor([0.0,1.0]).to(device)
model(x), initial_model(x), model(x1), initial_model(x1)

(tensor([102.2704, 102.3504], device='cuda:0', grad_fn=<ViewBackward0>),
 tensor([-0.2108,  0.1589], device='cuda:0', grad_fn=<ViewBackward0>),
 tensor([-0.1602,  0.1329], device='cuda:0', grad_fn=<ViewBackward0>),
 tensor([-0.2164,  0.1715], device='cuda:0', grad_fn=<ViewBackward0>))

In [18]:
import plotly.graph_objects as go

# Make a grid of points to test the model
x = np.linspace(-4, 4, 100)
y = np.linspace(-4, 4, 100)
X, Y = np.meshgrid(x, y)
test_inputs = t.tensor(np.array([X.ravel(), Y.ravel()]).T, dtype=t.float32).to(device)

# Plot magnitude of the results of initial model as a 3d surface
Z = initial_model(test_inputs).detach().cpu().numpy() - model(test_inputs).detach().cpu().numpy()
Z = np.linalg.norm(Z, axis=1).reshape(X.shape)
fig = go.Figure(data=[go.Surface(z=Z, x=X, y=Y)])

# make zlims from 0 to 1
# fig.update_layout(scene = dict(zaxis = dict(range=[0,10])))

fig

In [13]:
model.fc1.weight, initial_model.fc1.weight

(Parameter containing:
 tensor([[-0.3827,  2.5249],
         [ 1.0378, -3.0034],
         [ 2.1243,  1.8109],
         [-2.4679, -0.9985]], device='cuda:0', requires_grad=True),
 Parameter containing:
 tensor([[-0.5583,  0.1906],
         [ 0.1304, -0.6268],
         [ 0.5865, -0.1062],
         [-0.3467, -0.3799]], device='cuda:0', requires_grad=True))

In [14]:
model.fc1.bias, initial_model.fc1.bias

(Parameter containing:
 tensor([1.4403, 1.6896, 1.4690, 1.7744], device='cuda:0', requires_grad=True),
 Parameter containing:
 tensor([0.1200, 0.4112, 0.4024, 0.4946], device='cuda:0', requires_grad=True))