# Imports

In [18]:
import csv
import sklearn.linear_model
import sklearn.preprocessing
import sklearn.metrics
import sklearn.datasets
import numpy as np
import dataclasses
import matplotlib.pyplot as plt
from IPython.display import display
import ipywidgets
import typing
import abc
import math

# Data 

In [3]:
@dataclasses.dataclass
class HappinessData:
    gdp: np.ndarray
    freedom: np.ndarray
    happiness: np.ndarray

def read_happiness_data(path: str) -> HappinessData:
    with open(path) as f:
        reader = csv.reader(f)
        header = next(reader)
        
        gdp_index = header.index("Economy..GDP.per.Capita.")
        freedom_index = header.index("Freedom")
        happiness_index = header.index("Happiness.Score")
        
        gdp_values = []
        freedom_values = []
        happiness_values = []
        
        for row in reader:
            gdp_values.append(float(row[gdp_index]))
            freedom_values.append(float(row[freedom_index]))
            happiness_values.append(float(row[happiness_index]))
        
        gdp_array = np.array(gdp_values)
        freedom_array = np.array(freedom_values)
        happiness_array = np.array(happiness_values)
        
        return HappinessData(
            gdp_array,
            freedom_array,
            happiness_array,
        )

happiness_data = read_happiness_data("data/2017.csv")

# Solution

In [4]:
class Scaler(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def fit(self, inp: np.ndarray):
        pass
    
    @abc.abstractmethod
    def transform(self, inp: np.ndarray) -> np.ndarray:
        pass

In [5]:
class Regressor(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def fit(self, inp: np.ndarray, out: np.ndarray):
        pass
    
    @abc.abstractmethod
    def predict(self, inp: np.ndarray) -> np.ndarray:
        pass

In [6]:
length = len(happiness_data.happiness)

inp_uni = happiness_data.gdp.reshape(length, 1)
out_uni = happiness_data.happiness.reshape(length, 1)

inp_multi = np.stack([happiness_data.gdp, happiness_data.freedom], 1)
out_multi = happiness_data.happiness.reshape(length, 1)

In [7]:
TScaler = typing.TypeVar("TScaler", bound=Scaler)
TRegressor = typing.TypeVar("TRegressor", bound=Regressor)
def train_and_test(
    *,
    inp: np.ndarray,
    out: np.ndarray,
    train_percent: float,
    ScalerType: typing.Type[TScaler],
    RegressorType: typing.Type[TRegressor],
):
    length = len(inp)

    # shuffle data
    indices = np.arange(length)
    np.random.shuffle(indices)

    inp = inp[indices]
    out = out[indices]

    # split into train and test
    inp_train = inp[:int(length*train_percent)]
    out_train = out[:int(length*train_percent)]

    inp_test = inp[int(length*train_percent):]
    out_test = out[int(length*train_percent):]

    # normallize
    scaler = ScalerType()
    scaler.fit(inp_train)
    inp_train = scaler.transform(inp_train)
    inp_test = scaler.transform(inp_test)

    scaler = ScalerType()
    scaler.fit(out_train)
    out_train = scaler.transform(out_train).flatten()
    out_test = scaler.transform(out_test).flatten()

    # train
    regressor = RegressorType()
    regressor.fit(inp_train, out_train)

    # test
    predicted_test = regressor.predict(inp_test)
    mse = sklearn.metrics.mean_squared_error(out_test, predicted_test)

    print(mse)

## Tool

In [8]:
class ToolScaler(Scaler):
    def __init__(self):
        self.__scaler = sklearn.preprocessing.StandardScaler()
    
    def fit(self, inp: np.ndarray):
        self.__scaler.fit(inp)
    
    def transform(self, inp: np.ndarray) -> np.ndarray:
        out = self.__scaler.transform(inp)
        return out

In [9]:
class ToolRegressor(Regressor):
    def __init__(self):
        self.__regressor = sklearn.linear_model.SGDRegressor()
    
    def fit(self, inp: np.ndarray, out: np.ndarray):
        self.__regressor.fit(inp, out)
    
    def predict(self, inp: np.ndarray) -> np.ndarray:
        out = self.__regressor.predict(inp)
        return out

### Univariate

In [46]:
train_and_test(
    inp=inp_uni,
    out=out_uni,
    train_percent=0.80,
    ScalerType=ToolScaler,
    RegressorType=ToolRegressor,
)

0.41020560901400116


### Multivariate

In [50]:
train_and_test(
    inp=inp_multi,
    out=out_multi,
    train_percent=0.80,
    ScalerType=ToolScaler,
    RegressorType=ToolRegressor,
)

0.2282979105561317


## Mine

In [12]:
def get_2d_shape(matrix: list[list[typing.Any]]) -> (int, int):
    width = None
    height = len(matrix)
    for row in matrix:
        if width is None:
            width = len(row)
        assert width == len(row)
    return (width, height)

In [13]:
class MyScaler(Scaler):
    def __init__(self):
        self.__mean = None
        self.__std = None
    
    def fit(self, inp: np.ndarray):
        inp = inp.tolist()
        inp_shape = get_2d_shape(inp)
        self.__mean = [
            sum(
                inp[inp_row_index][inp_column_index]
                for inp_row_index in range(inp_shape[1])
            ) / inp_shape[1]
            for inp_column_index in range(inp_shape[0])
        ]
        self.__std = [
            math.sqrt(
                sum(
                    (inp[inp_row_index][inp_column_index] - mean)**2
                    for inp_row_index in range(inp_shape[1])
                ) / (inp_shape[1] - 1)
            )
            for mean, inp_column_index in zip(self.__mean, range(inp_shape[0]))
        ]
    
    def transform(self, inp: np.ndarray) -> np.ndarray:
        inp = inp.tolist()
        inp_shape = get_2d_shape(inp)
        
        out = []
        for row in inp:
            new_row = []
            out.append(new_row)
            for mean, std, value in zip(self.__mean, self.__std, row):
                new_row.append((value - mean) / std)
        
        return np.array(out)

In [97]:
class MyRegressor(Regressor):
    def __init__(self):
        self.__learning_rate = 0.1
        self.__w = None
    
    def fit(self, inp: np.ndarray, out: np.ndarray):
        inp = inp.tolist()
        out = out.tolist()
        
        inp_shape = get_2d_shape(inp)
        w = [0] * (inp_shape[0] + 1)
        
        for iteration_index in range(1000):
            gradient_sum = [0] * (inp_shape[0] + 1)
            for inp_values, out_value in zip(inp, out):
                out_computed = sum(
                    inp_value * w_value
                    for inp_value, w_value in zip([1] + inp_values, w)
                )
                err = out_computed - out_value
                gradient_sum = [
                    gradient_sum_value + err * inp_value
                    for gradient_sum_value, inp_value in zip(gradient_sum, [0] + inp_values)
                ]
            
            gradient_mean = [
                gradient_sum_value / len(inp)
                for gradient_sum_value in gradient_sum
            ]
            w_new = [
                w_value - gradient_mean_value * self.__learning_rate
                for w_value, gradient_mean_value in zip(w, gradient_mean)
            ]
            w = w_new
        
        self.__w = w
    
    def predict(self, inp: np.ndarray) -> np.ndarray:
        inp = inp.tolist()
        out = []
        for inp_values in inp:
            out_computed = sum(
                inp_value * w_value
                for inp_value, w_value in zip([1] + inp_values, self.__w)
            )
            out.append(out_computed)
        return np.array(out)

### Univariate

In [100]:
train_and_test(
    inp=inp_uni,
    out=out_uni,
    train_percent=0.80,
    ScalerType=MyScaler,
    RegressorType=MyRegressor,
)

0.35599698162311666


### Multivariate

In [101]:
train_and_test(
    inp=inp_multi,
    out=out_multi,
    train_percent=0.80,
    ScalerType=MyScaler,
    RegressorType=MyRegressor,
)

0.1129134997649597


## Multi-target

In [106]:
multi_target_bunch = sklearn.datasets.load_linnerud()
multi_target_datas = multi_target_bunch.get("data")
multi_target_targets = multi_target_bunch.get("target")

for target_index in range(multi_target_targets.shape[1]):
    multi_target_target = multi_target_targets[:,target_index].reshape(multi_target_targets.shape[0], 1)
    train_and_test(
        inp=multi_target_datas,
        out=multi_target_target,
        train_percent=0.80,
        ScalerType=MyScaler,
        RegressorType=MyRegressor,
    )

1.8130012690993347
0.5451922080963347
0.5416223995044857
