In [5]:
from typing import Union
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
from time import sleep

import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import torch
import torch.utils.data as data
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
class FNN(nn.Module):

    def __init__(self, n_hidden_layers_sizes: Union[int, list] = 1, activation: str = 'sigmoid', *args, **kwargs):

        super(FNN, self).__init__()

        if isinstance(n_hidden_layers_sizes, int):
            n_hidden_layers_sizes = [n_hidden_layers_sizes]
        if not isinstance(n_hidden_layers_sizes, list):
            raise ValueError(f'n_hidden_layers_sizes should be of type int or list, instead received {n_hidden_layers_sizes} of type {type(n_hidden_layers_sizes)}.')
        
        n_hidden_layers_sizes = [1] + n_hidden_layers_sizes + [1]
        self.dense_layers = nn.Sequential(*[
            nn.Linear(input_size, output_size) 
            for input_size, output_size in zip(n_hidden_layers_sizes[:-1], n_hidden_layers_sizes[1:])
        ])

        if activation.lower() == 'relu':
            self.activation = nn.ReLU
        elif activation.lower() == 'gelu':
            self.activation = nn.GELU
        elif activation.lower() == 'sigmoid':
            self.activation = nn.Sigmoid
        else:
            raise ValueError(f'activation must be one of relu, gelu or sigmoid, instead received {activation}')

    def forward(self, x: torch.Tensor):

        for layer in self.dense_layers[:-1]:
            x = self.activation()(layer(x))

        output = self.dense_layers[-1](x)

        return output

In [None]:
class Dataset(data.Dataset):

    def __init__(self, X: np.array, y: np.array):
        self.X, self.y = X, y

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx, :], self.y[idx]

def data_loaders(X: np.array, y: np.array, test_size: float = 0.2, random_state: Union[int, None] = None, batch_size: int = 32, *args, **kwargs):

    assert X.shape[0] == y.shape[0]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, shuffle=True, random_state=random_state)
    train_loader = data.DataLoader(
        dataset=Dataset(X_train, y_train),
        batch_size=batch_size,
        shuffle=True
    )
    test_loader = data.DataLoader(
        dataset=Dataset(X_test, y_test),
        batch_size=batch_size,
        shuffle=True
    )

    return train_loader, test_loader

In [None]:
def train(model, train_loader, criterion, optimizer, scheduler):
    
    model.train()
    train_loss = []
    
    for data in tqdm(train_loader):
        
        X, y = data 
        output = model(X)
        loss = criterion(output, y)
        train_loss.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

    avg_train_loss = torch.mean(torch.tensor(train_loss))

    print(f'Training set - Average loss = {avg_train_loss:.4f}')
    sleep(1)
    
    return avg_train_loss

In [None]:
def test(model, train_loader, criterion):
    
    model.eval()
    test_loss = []
    
    for data in tqdm(train_loader):
        
        X, y = data 
        output = model(X)
        loss = criterion(output, y)
        test_loss.append(loss.item())

    avg_test_loss = torch.mean(torch.tensor(test_loss))

    print(f'Validation set - Average loss = {avg_test_loss:.4f}')
    sleep(1)
    
    return avg_test_loss

In [None]:
def main(transform: function, epochs: int = 100, lr: float = 2e-4):

    X = np.random.uniform(-100, 100, 1000)
    y = transform(X)
    train_loader, test_loader = data_loaders(X, y, test_size=0.2, batch_size=32)

    model = FNN(n_hidden_layers_sizes=[3, 3], activation='sigmoid')

    optimizer = optim.AdamW(model.parameters(), lr)
    criterion = nn.MSELoss()
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer, 
        max_lr=lr, 
        steps_per_epoch=len(train_loader),
        epochs=epochs,
        anneal_strategy='linear'
    )

    train_loss, test_loss = [], []

    for epoch in range(1, epochs+1):

        print("\nEpoch", epoch)
        sleep(1)

        train_loss = train(model, train_loader, criterion, optimizer, scheduler)
        test_loss = test(model, test_loader, criterion)

In [None]:
a, b, c, d = np.random.normal(loc=0, scale=2, size=4)
transform = np.vectorize(lambda x: a * np.sin(b*x + c) + d)