In [1]:
import numpy as np
import plotly.express as px
import plotly.graph_objects as go

X = np.arange(0, 10, 0.1) 
func = X * X * X # np.abs(X - 4)
noisy = np.random.normal(func, .01) 

# Create traces
fig = go.Figure()
fig.add_trace(go.Scatter(x=X, y=func, mode='lines', name='func'))
fig.add_trace(go.Scatter(x=X, y=noisy, mode='markers', name='markers'))

fig.show()

In [2]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, noisy, test_size=.33, random_state=26)
fig.add_trace(go.Scatter(x=X_train, y=y_train, mode='markers', name='train'))
fig.add_trace(go.Scatter(x=X_test, y=y_test, mode='markers', name='test'))

fig.show()

In [3]:
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader

# Convert data to torch tensors
class Data(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X.astype(np.float32))
        self.y = torch.from_numpy(y.astype(np.float32))
        self.len = self.X.shape[0]
       
    def __getitem__(self, index):
        return self.X[index], self.y[index]
   
    def __len__(self):
        return self.len
   
batch_size = 32

# Instantiate training and test data
train_data = Data(X_train, y_train)
train_dataloader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)

test_data = Data(X_test, y_test)
test_dataloader = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True)

# Check it's working
for batch, (X, y) in enumerate(train_dataloader):
    print(f"Batch: {batch+1}")
    print(f"X shape: {X.shape}")
    print(f"y shape: {y.shape}")
    break

Batch: 1
X shape: torch.Size([32])
y shape: torch.Size([32])


In [4]:
from torch import nn

input_dim = 1
hidden_dim = 3
output_dim = 1

class NeuralNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(NeuralNetwork, self).__init__()
        self.hidden_layer1 = nn.Linear(input_dim, hidden_dim)
        self.hidden_layer2 = nn.Linear(hidden_dim, hidden_dim)
        self.output_layer = nn.Linear(hidden_dim, output_dim)
       
    def forward(self, x):
        # print(x.unsqueeze(1).shape)
        x = x.unsqueeze(1)
        x = torch.nn.functional.relu(self.hidden_layer1(x))
        x = torch.nn.functional.relu(self.hidden_layer2(x))
        x = torch.nn.functional.relu(self.output_layer(x))

        return x
       
model = NeuralNetwork(input_dim, hidden_dim, output_dim)
print(model)
for param in model.parameters():
  print(param.data)

NeuralNetwork(
  (hidden_layer1): Linear(in_features=1, out_features=3, bias=True)
  (hidden_layer2): Linear(in_features=3, out_features=3, bias=True)
  (output_layer): Linear(in_features=3, out_features=1, bias=True)
)
tensor([[-0.9256],
        [ 0.9863],
        [ 0.7933]])
tensor([0.1156, 0.8148, 0.5077])
tensor([[-0.1909,  0.1576, -0.4973],
        [-0.5019, -0.4312, -0.1600],
        [-0.5242,  0.2289,  0.4385]])
tensor([ 0.5586,  0.1986, -0.1944])
tensor([[0.1096, 0.5421, 0.1163]])
tensor([0.3175])


In [5]:
# model(torch.tensor([0.0]))

In [6]:
with torch.no_grad():
    pred = model(torch.from_numpy(X_test.astype(np.float32)))
    print(X_test)
    print(pred.numpy().flatten())
    fig.add_trace(go.Scatter(x=X_test, y=pred.numpy().flatten(), mode='markers', name='pred'))

    fig.show()

[0.8 6.9 8.4 4.1 2.5 0.5 6.3 9.5 3.3 7.  3.1 8.6 1.8 9.4 0.4 2.6 6.6 2.
 0.3 4.9 3.4 0.2 3.6 0.7 4.3 5.4 7.8 5.8 5.  7.1 7.4 8.  8.1]
[0.42247415 0.8027215  0.9027814  0.6159429  0.50921226 0.41031963
 0.7626975  0.97615874 0.5625776  0.80939215 0.5492363  0.9161228
 0.46298924 0.9694881  0.40626812 0.51588297 0.7827095  0.475859
 0.4022166  0.6693082  0.5692483  0.3981651  0.58258957 0.41842264
 0.62928426 0.7026615  0.8627575  0.7293442  0.6759789  0.8160628
 0.83607477 0.8760988  0.8827695 ]


In [7]:
from torch import optim

loss_fn = nn.HuberLoss()

optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.5) # optim.RMSprop(model.parameters())

num_epochs = 1000
loss_values = []

min_loss = float('inf')

for epoch in range(num_epochs):
    for X2, y in train_dataloader:
        # zero the parameter gradients
        optimizer.zero_grad()
       
        # forward + backward + optimize
        pred = model(X2)
        loss = loss_fn(pred, y.unsqueeze(-1))
        loss_values.append(loss.item())
        if loss.item() < min_loss:
            min_loss = loss.item()
            torch.save(model.state_dict(), "func.pth")
        loss.backward()
        optimizer.step()

print("Training Complete")

Training Complete


In [8]:
step = np.linspace(0, 100, 10500)

fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=step, y=np.array(loss_values), mode='lines', name='loss'))

fig2.show()

In [9]:
for param in model.parameters():
  print(param.data)

with torch.no_grad():
    pred = model(torch.from_numpy(X_test.astype(np.float32)))
    # print(X_test)
    # print(pred.numpy().flatten())
    fig.add_trace(go.Scatter(x=X_test, y=pred.numpy().flatten(), mode='markers', name='pred'))

    fig.show()

tensor([[-1.6167],
        [ 6.0955],
        [ 0.5883]])
tensor([  4.5452, -28.4175,  -8.5483])
tensor([[-0.3331,  0.7972, -0.5229],
        [-3.4293, -0.8182, -0.1600],
        [-0.5144,  5.1550,  4.4016]])
tensor([ 1.3939,  7.7349, -4.9823])
tensor([[1.0091, 8.1503, 4.8632]])
tensor([1.5263])
