In [23]:
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple

In [11]:
def linear(in_array: np.ndarray, weight: np.ndarray, bias: np.ndarray) -> np.ndarray:
    r"""
    linear transformation: y = xW^T + b

    Args:
        in_array: input array
        weight: input weight
        bias: input bias

    Shape:
        - Input: (N, in_features)
        - Weight: (out_features, in_features)
        - Bias: (out_features)
        - Output: (N, out_features)
    """

    assert weight.shape[-1] == in_array.shape[-1], "ERR : input and weight dimensions not matched."

    return np.matmul(in_array, weight.T) + bias

In [34]:
class Module(object):
    def __init(self):
        pass

    def __call__(self):
        return


In [47]:
def regression_initializer(size: Tuple, cate: str='normal') -> Dict:
    """initialize list with size `length`,default to `normal` """
    assert size is not None, "size is None."

    cate_type = cate.lower()
    initial_list = ['normal', 'uniform']
    assert cate_type in initial_list, NotImplemented

    if cate == 'normal':
        return {
            'W': np.random.normal(loc=0., scale=1., size=size),
            'b': np.random.normal(loc=0., scale=1., size=size[0])
        }
    elif cate == 'uniform':
        a = 1. / size[-1]
        return {
            'W': np.random.uniform(low=-a, high=a, size=size),
            'b': np.random.normal(loc=0., scale=1., size=size[0])
        }
    else:
        assert NotImplemented

In [318]:
def mean_squared_error(y_pred: np.ndarray, y_std: np.ndarray, n: int):
    """Mean Squared Error, a.k.a MSE"""
    return np.sum((y_pred - y_std)**2) / n

def loss_fn(cate: str='mse'):
    """loss functions"""
    if cate=='mse':
        return mean_squared_error
    else:
        return NotImplemented

In [185]:
def SGD(dw, db, lr, weight, bias):
    """Stochastic Gradient Descent"""
    return {
        'weight_updated': weight - lr * dw,
        'bias_updated': bias - lr * db
    }

def optimizer(cate: str='SGD'):
    """optimizors"""
    if cate == 'SGD':
        return SGD
    else:
        return NotImplemented


In [447]:
class LinearRegression(Module):
    """
    linear transformations: y = xW^T + b, initializer default to `normal`

    Args:
        in_array: feature array

    Shapes:
        in_array: (N, in_features)
        out_array: (N, )
    """
    def __init__(self, in_array: np.ndarray,
                 loss_fn_type: str='mse',
                 lr: float=0.00001,
                 optimizer_type: str='SGD'):
        # NOTICE: check in_array for batch case
        super().__init__()
        self.in_array = in_array
        self.in_features = in_array.shape[-1]
        self.out_features = 1
        self.lr = lr
        self.optimizer = optimizer(optimizer_type)
        self.initial = regression_initializer((self.out_features,
                                               self.in_features),
                                               'normal')
        self.weight = self.initial['W']
        self.bias = self.initial['b']

        self.loss_fn = loss_fn(loss_fn_type)
        self.loss = 99999

        self.dw = 0
        self.db = 0

        self.y_pred = np.array([])

    def forward(self) -> np.ndarray:
        self.y_pred = linear(self.in_array, self.weight, self.bias)
        return self.y_pred

    def loss_reg(self, y_std):
        self.loss = self.loss_fn(self.y_pred, y_std, self.in_features)

    def backward(self, y_std):
        diff = self.y_pred - y_std
        self.dw = (1. / self.in_features) * np.matmul(diff.T, self.in_array)
        self.db = (1. / self.in_features) * np.sum(diff)

    def step(self):
        param_updated = self.optimizer(self.dw, self.db, self.lr, self.weight, self.bias)
        self.weight = param_updated['weight_updated']
        self.bias = param_updated['bias_updated']

    def print_params(self):
        # print(f"weight: {self.weight} \nbias: {self.bias} \nloss: {self.loss}")
        print(f"loss: {self.loss}")


    def __name__(self):
        return "Linear Regression Model"


In [457]:
def training(model, epochs):
    result = {}

    for epoch in range(epochs):
        model.forward()
        model.backward(labels)
        model.loss_reg(labels)
        model.step()
        print(f"Epoch: {epoch}", end=" ")
        model.print_params()

    result['name'] = model.__class__.__name__
    result['weight'] = model.weight
    result['bias'] = model.bias
    result['loss'] = model.loss

    return result

## Test

In [460]:
def synthetic_data(w,b,num_examples):
    """ 生成 y = Xw + b + 噪声。"""
    X = np.random.normal(0,1,(num_examples,len(w)))
    y = np.matmul(X,w) + b
    y += np.random.normal(0,0.01,y.shape) # 均值为0，方差为1
    return X, y.reshape((-1,1))

true_w = np.array([i + 0.5 for i in range(23)])
true_b = 4.2
features, labels = synthetic_data(true_w,true_b,1000)

In [461]:
linear_reg = LinearRegression(features)
training_res = training(linear_reg, 10)

Epoch: 0 loss: 171776.41419665646
Epoch: 1 loss: 171627.60914508803
Epoch: 2 loss: 171478.93643035484
Epoch: 3 loss: 171330.39593197798
Epoch: 4 loss: 171181.98752959035
Epoch: 5 loss: 171033.71110293674
Epoch: 6 loss: 170885.56653187377
Epoch: 7 loss: 170737.55369636958
Epoch: 8 loss: 170589.67247650394
Epoch: 9 loss: 170441.92275246794


## Authentic Dataset

In [454]:
sheet = pd.read_csv('output_basic_data_preprocessing.csv')
sheet

Unnamed: 0,index,a,e,i,om,w,q,ad,data_arc,n_obs_used,...,class_APO,class_AST,class_ATE,class_CEN,class_IMB,class_MBA,class_MCA,class_OMB,class_TJN,class_TNO
0,0,2.769165,0.076009,10.594067,80.305532,73.597694,2.558684,2.979647,8822.0,1002,...,False,False,False,False,False,True,False,False,False,False
1,1,2.772466,0.230337,34.836234,173.080063,310.048857,2.133865,3.411067,72318.0,8490,...,False,False,False,False,False,True,False,False,False,False
2,2,2.669150,0.256942,12.988919,169.852760,248.138626,1.983332,3.354967,72684.0,7104,...,False,False,False,False,False,True,False,False,False,False
3,3,2.361418,0.088721,7.141771,103.810804,150.728541,2.151909,2.570926,24288.0,9325,...,False,False,False,False,False,True,False,False,False,False
4,4,2.574249,0.191095,5.366988,141.576605,358.687607,2.082324,3.066174,63507.0,2916,...,False,False,False,False,False,True,False,False,False,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
137631,797860,3.171225,0.159119,27.098625,309.036573,19.746812,2.666623,3.675826,2373.0,50,...,False,False,False,False,False,True,False,False,False,False
137632,798077,2.548410,0.076071,11.593237,246.298656,170.090810,2.354549,2.742270,3297.0,33,...,False,False,False,False,False,True,False,False,False,False
137633,798189,3.146246,0.220559,17.966646,137.981403,180.898832,2.452313,3.840180,2839.0,47,...,False,False,False,False,False,True,False,False,False,False
137634,799752,3.051336,0.287449,14.456779,343.917822,342.614839,2.174231,3.928440,2208.0,27,...,False,False,False,False,False,True,False,False,False,False


In [455]:
sheet.drop('index', axis=1, inplace=True)
label = sheet['diameter']
dia_dropped = sheet.drop('diameter', axis=1)
feature = dia_dropped

# labels & features
label_a = np.array(label)[:1000]
feature_a = np.array(feature)[:1000]

In [456]:
linear_reg_asteroids = LinearRegression(feature_a)
training_res = training(linear_reg_asteroids, 10)

Epochs: 0 loss: 3483369172.6801386
Epochs: 1 loss: 2.467047839942368e+21
Epochs: 2 loss: 1.7828658641787683e+33
Epochs: 3 loss: 1.2884268558256412e+45
Epochs: 4 loss: 9.311097352673888e+56
Epochs: 5 loss: 6.728867340739659e+68
Epochs: 6 loss: 4.862762569684695e+80
Epochs: 7 loss: 3.514181304476004e+92
Epochs: 8 loss: 2.5395996748262812e+104
Epochs: 9 loss: 1.8352970292576905e+116


In [458]:
training_res

{'name': 'LinearRegression',
 'weight': array([[-2.472716031730591e+54, -1.302809797227986e+53,
         -8.384598019300061e+54, -1.521811500877251e+56,
         -1.5741729968637798e+56, -2.1102946839636763e+54,
         -2.835137379515213e+54, -3.9397160643145045e+58,
         -2.378918887528671e+57, -1.9153717673676083e+53,
         -1.5220418255453887e+57, -1.610033219081791e+56,
         -1.682660922209947e+51, -0.03311837061349168, 0.4747445679963158,
         -0.8731717793550927, -7.305873615937039e+50,
         -8.918370003787828e+50, -8.147698478321307e+53,
         -3.594376423523605e+51, -5.684342578026334e+52,
         -4.1714881574557845e+51, 0.2612445529423622]], dtype=object),
 'bias': array([-8.82684223e+53]),
 'loss': 1.8352970292576905e+116}