In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from sklearn.linear_model import LinearRegression, HuberRegressor
from torch.utils.data import DataLoader, Dataset
from typing import Tuple
from sklearn.preprocessing import StandardScaler
import plotly.express as px


In [None]:
np.random.normal(0,1,size = 10)

In [None]:
x_raw = np.arange(0,1000,1)
y_raw = 5*x_raw + 20 + np.random.normal(0,1000,size = len(x_raw))
random_indexes_to_change = np.random.randint(0,len(x_raw), int(len(x_raw)*0.1))
y_raw[random_indexes_to_change] = np.random.uniform(-10,10,size = len(random_indexes_to_change)) * y_raw[random_indexes_to_change] # Random outliers

x_scaler = StandardScaler()
x = x_scaler.fit_transform(x_raw.reshape(-1,1))

y_scaler = StandardScaler()
y = y_scaler.fit_transform(y_raw.reshape(-1,1)).squeeze()


In [None]:
x

In [None]:
y

In [None]:
linreg = LinearRegression()
linreg.fit(x.reshape(-1,1),y)


In [None]:
huberreg = HuberRegressor()
huberreg.fit(x.reshape(-1,1), y)

In [None]:
class CustomLoss(nn.Module):
    def __init__(
            self,
            threshold: float = 0.685,
            weight_max_error: float = 1,
            weight_percentage_above_threshold:float = 1,
            weight_wrong_sign: float = 1,
            sigmoid_steepness: float = 1,
    ) -> None:
        super().__init__()
        self.steepness = sigmoid_steepness
        self.threshold = threshold
        # Normalize weights and assign them
        sum_weights = (
            weight_max_error
            + weight_percentage_above_threshold
            + weight_wrong_sign
        )
        self.weight_max_error = (
            weight_max_error / sum_weights
        )
        self.weight_percentage_above_threshold = (
            weight_percentage_above_threshold / sum_weights
        )
        self.weight_wrong_sign = (
            weight_wrong_sign / sum_weights
        )

    def forward(
            self, 
            inputs: torch.Tensor, 
            targets: torch.Tensor,
        ) -> torch.Tensor:

        residuals = targets - inputs
        # Maximum abs error
        max_error = residuals.abs().max()

        # Percentage of time above threshold value
        percentage_of_time_above_x = (
            1/(1+torch.e**(-self.steepness*(residuals.abs()-self.threshold)))
        ).mean()

        # Percentage of time wrong sign
        loss_percentage_of_time_wrong_sign = (
            1/(1+torch.e**(-self.steepness*(inputs*targets)))
        ).mean()
        
        # Total loss
        total_loss = (
            self.weight_max_error * max_error
            + self.weight_percentage_above_threshold * percentage_of_time_above_x
            + self.weight_wrong_sign * loss_percentage_of_time_wrong_sign
        )
        return total_loss

In [None]:
class Model(nn.Module):
    def __init__(
            self,
            n_features: int,
    ) -> None:
        super().__init__()
        self.fc1 = nn.Linear(
            in_features=n_features,
            out_features=1,
        )  # Just 1 fully connected layer without activation, i.e. a linear regression.

    def forward(
        self,  
        X: torch.Tensor,  
    ) -> torch.Tensor:
        y = self.fc1(X)
        return y.flatten()

In [None]:
class CustomDataset(Dataset):
    def __init__(
            self,
            X: torch.Tensor,
            y: torch.Tensor,
        ) -> None:
        self.X = X
        self.y = y

    def __len__(
            self
    ) -> int:
        return self.X.shape[0]
    
    def __getitem__(
            self,
            idx: int,
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        X_item = self.X[idx,:]
        y_item = self.y[idx]
        return X_item, y_item

In [None]:
class CustomMSELoss(nn.Module):
    def __init__(
            self,
    ) -> None:
        super().__init__()


    def forward(
            self, 
            inputs: torch.Tensor, 
            targets: torch.Tensor,
        ) -> torch.Tensor:

        residuals = targets - inputs
        mse = (residuals**2).mean()
        return mse

In [None]:
class CustomMAELoss(nn.Module):
    def __init__(
            self,
    ) -> None:
        super().__init__()


    def forward(
            self, 
            inputs: torch.Tensor, 
            targets: torch.Tensor,
        ) -> torch.Tensor:

        residuals = targets - inputs
        mae = residuals.abs().mean()
        return mae

In [None]:
epochs = 500
lr = 1e-4
batch_size = 100

dataloader = DataLoader(
    CustomDataset(
        torch.Tensor(x).float(),
        torch.Tensor(y).float(),
    ),
    batch_size=batch_size,
    shuffle=True,
)
model_rmse = Model(
    n_features=1,
)
optimizer = torch.optim.Adam(
    model_rmse.parameters(),
    lr = lr,
)
# criterion = CustomLoss(
#     weight_max_error=1,
#     weight_percentage_above_threshold=1,
#     weight_wrong_sign=1,
# )
# criterion = nn.KLDivLoss()
criterion = CustomMSELoss()
for epoch in range(epochs):
    epoch_loss = 0
    for i,(X_batch, y_batch) in enumerate(dataloader):
        prediction = model_rmse(X_batch).flatten()
        optimizer.zero_grad()
        loss = criterion(prediction, y_batch)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        average_loss = epoch_loss/(i+1)
    print(f"Average epoch loss: {average_loss}")
    print(f"Epoch {epoch} done.")

In [None]:
model_rmse.fc1.weight

In [None]:
model_rmse.fc1.bias

In [None]:
linreg.coef_

In [None]:
linreg.intercept_

In [None]:
# line1 = np.ones_like(y)*3
# line2 = 0.25*x
# line3 = 4*x

# ax.plot(x,line1, color = "red")
# ax.scatter(x,line1, color = "red")

# ax.plot(x, line2, color = "black")
# ax.scatter(x, line2, color = "black")

# ax.plot(x, line3, color = "green")
# ax.scatter(x, line3, color = "green")


In [None]:
prediction_df = pd.DataFrame(
    {
        "x": x.flatten(),
        "y": y,
        "torch_custom_loss":model_rmse(torch.Tensor(x).float()).detach().numpy(),
        "huber": huberreg.predict(x.reshape(-1,1)),
        "sklearn_rmse": linreg.predict(x.reshape(-1,1)),
    }
)
prediction_df

In [None]:
prediction_df_molten = prediction_df.melt(id_vars="x")
prediction_df_molten

In [None]:
fig = px.scatter(
    prediction_df_molten,
    x = "x",
    y = "value",
    color = "variable",
)
fig.show()