In [None]:
import pandas as pd
import torch
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torchvision import transforms

In [None]:

def get_950_auction(data_location: str) -> pd.DataFrame:
    '''Import and filter the large dataset into a smaller subset for testing'''
    input_df = pd.read_csv(data_location)
    filtered_df = input_df[(input_df['Sales Channel'] == "Auction")
                          & (input_df['is_usage_imputed'] == False)
                          & (input_df['is_outlier'] == False)
                          & (input_df['Forecast Sales Model Number'] == "950")]
    
    return filtered_df[['Age of Machine', 'Final Usage Hours', 'Relative Price Change']]

def test_train_split(df: pd.DataFrame) -> tuple:
    test = None
    train = None

    try:
        train, test = train_test_split(df, test_size=0.2,
                                       random_state=42)
    except ValueError as e:
        logging.warning(str(e))

    return test, train

def select_three_variables(self, df: pd.DataFrame) -> tuple:
    x1data = df['Age of Machine'].values
    x2data = df['Final Usage Hours'].values
    ydata = df[self.prediction_target].values

    return x1data, x2data, ydata

def split_features_labels(df: pd.DataFrame) -> tuple:
    features = df[['Age of Machine', 'Final Usage Hours']].to_numpy()
    labels = df['Relative Price Change'].to_numpy()
    
    return features, labels

def normalise_data(data_array: np.array) -> np.array:
    scaler = StandardScaler()
    scaler.fit(data_array)
    scaled_data: np.array = scaler.transform(data_array)
    
    return scaled_data
    

ML_df = get_950_auction('/Users/adam/Github/data-pipeline/data/outputs/all_data_full_output.csv')
test, train = test_train_split(ML_df)
x_train, y_train = split_features_labels(train)
x_train = normalise_data(x_train)

y_train = y_train.astype('float64')
x_train = x_train.astype('float64')

In [None]:
# -*- coding: utf-8 -*-
import torch
import math


class Polynomial3(torch.nn.Module):
    def __init__(self):
        """
        In the constructor we instantiate four parameters and assign them as
        member parameters.
        """
        super().__init__()
        self.a = torch.nn.Parameter(torch.randn(()))
        self.b1 = torch.nn.Parameter(torch.randn(()))
        self.b2 = torch.nn.Parameter(torch.randn(()))
        self.c1 = torch.nn.Parameter(torch.randn(()))
        self.c2 = torch.nn.Parameter(torch.randn(()))
        self.d = torch.nn.Parameter(torch.randn(()))

    def forward(self, x1, x2):
        """
        In the forward function we accept a Tensor of input data and we must return
        a Tensor of output data. We can use Modules defined in the constructor as
        well as arbitrary operators on Tensors.
        """
        return self.a * torch.exp((-self.b1 * (x1 - self.c1)) + (-self.b2 * (x2 - self.c2))) + self.d

    def string(self):
        """
        Just like any class in Python, you can also define custom method on PyTorch modules
        """
        return f'y = {self.a.item()} * e^(  ({self.b1.item()} * ([AGE] - {self.c1.item()})) + ({self.b2.item()} * ([HOURS]) - {self.c2.item()})  ) + {self.d.item()}'


# # Create Tensors to hold input and outputs.
# x1 = torch.linspace(-math.pi, math.pi, 2000)
# y = torch.sin(x)


x1 = torch.tensor(x_train[:, 0])
x2 = torch.tensor(x_train[:, 1])
y = torch.tensor(y_train)



# Construct our model by instantiating the class defined above
model = Polynomial3()

# Construct our loss function and an Optimizer. The call to model.parameters()
# in the SGD constructor will contain the learnable parameters of the nn.Linear
# module which is members of the model.
criterion = torch.nn.MSELoss(reduction='mean')
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
for t in range(20000):
    # Forward pass: Compute predicted y by passing x to the model
    y_pred = model(x1, x2)
    #y_pred = model(x2)

    # Compute and print loss
    loss = criterion(y_pred, y)
    if t % 1000 == 0:
        print(t, loss.item())

    # Zero gradients, perform a backward pass, and update the weights.
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

print(f'Result: {model.string()}')

print(f'Final RMSE Loss: {np.sqrt(loss.item())}')

In [None]:
20/420