In [None]:
import random

import numpy as np
from enum import Enum

Methods = Enum('Methods', ['Classic', 'Momentum', 'AdaGrad', 'RMSprop', 'Adam', 'Nesterov'])
Regularization = Enum('Regularization', ['WithoutRegularization', 'L1', 'L2', 'Elastic'])
LearningRate = Enum('LearningRate', ['Const', 'Dichotomy'])


def sign(x):
    if x > 0:
        return 1
    elif x == 0:
        return 0
    else:
        return -1


class LinearRegression:
    def __init__(self, T, W, X, Y, regularization=Regularization.WithoutRegularization, l1=0.1, l2=0.1):
        self.T = np.array([T[i % len(T)](X[i // len(T)]) for i in range(len(T) * len(X))]).reshape(len(X), len(T))
        self.W = W
        self.X = X
        self.Y = Y
        self.regularization = regularization
        self.l1 = l1
        self.l2 = l2

    def loss_function_value(self, W_Arg):
        val = sum([(np.dot(self.T[i], W_Arg) - self.Y[i]) ** 2 for i in range(len(self.X))])
        match self.regularization:
            case Regularization.L1:
                val += self.l1 * sum([abs(w) for w in self.W]) / len(self.W)
            case Regularization.L2:
                val += self.l2 * sum([w ** 2 for w in self.W]) / len(self.W)
            case Regularization.Elastic:
                val += (self.l1 * sum([abs(w) for w in self.W])) / len(self.W) + (
                        self.l2 * sum([w ** 2 for w in self.W])) / len(self.W)
        return val

    def grad_by_components(self, index_components, W_Arg):
        grad_with_batch = np.zeros(len(W_Arg))
        for i in index_components:
            grad_with_batch += 2 * (np.dot(self.T[i], W_Arg) - self.Y[i]) * self.T[i]
        match self.regularization:
            case Regularization.L1:
                grad_with_batch += self.l1 * np.array([sign(w) for w in self.W]) / len(self.W)
            case Regularization.L2:
                grad_with_batch += self.l2 * 2 * self.W / len(self.W)
            case Regularization.Elastic:
                grad_with_batch += (self.l1 * np.array([sign(w) for w in self.W])) / len(self.W) + (
                        self.l2 * 2 * self.W) / len(self.W)

        return grad_with_batch


def sgd(lin_reg, lr, eps, batch, max_num_of_step, beta_1, beta_2, eps_adam, is_corr_beta_1=True, is_corr_beta_2=True,
        is_nesterov=False):
    i = 0
    prev_W = lin_reg.loss_function_value(lin_reg.W)
    V = np.zeros(len(lin_reg.W))
    S = np.zeros(len(lin_reg.W))
    while True:
        components = [(i * batch + j) % len(lin_reg.X) for j in range(batch)]
        grad_with_batch = lin_reg.grad_by_components(components, lin_reg.W)
        alpha = lr(lambda a: lin_reg.loss_function_value(lin_reg.W - a * grad_with_batch))
        V = (beta_1 * V) + (1 - beta_1) * grad_with_batch if ~is_nesterov else (beta_1 * V) + (
                1 - beta_1) * lin_reg.grad_by_components(lin_reg.W - alpha * beta_1 * V)
        S = (beta_2 * S) + (1 - beta_2) * (grad_with_batch ** 2)
        V_norm = V / (1 - (beta_1 ** (i + 1))) if is_corr_beta_1 else V
        S_norm = S / (1 - (beta_2 ** (i + 1))) if is_corr_beta_2 else S
        lin_reg.W -= alpha * (V_norm / (((S_norm) + eps_adam) ** 0.5))
        if abs(lin_reg.loss_function_value(lin_reg.W) - prev_W) < eps or i >= max_num_of_step:
            break
        prev_W = lin_reg.loss_function_value(lin_reg.W)
        i += 1


def sgd_handler(lin_reg, lr, method, batch=1, beta_1=0.9, beta_2=0.999, eps_adam=10 ** -8, eps=0.001,
                max_num_of_step=10000):
    match method:
        case Methods.Classic:
            sgd(lin_reg, lr, eps, batch, max_num_of_step, 0, 1, 1, False, False)
        case Methods.Momentum:
            sgd(lin_reg, lr, eps, batch, max_num_of_step, beta_1, 1, 1, False, False)
        case Methods.AdaGrad:
            sgd(lin_reg, lr, eps, batch, max_num_of_step, 0, 0, eps_adam, False, False)
        case Methods.RMSprop:
            sgd(lin_reg, lr, eps, batch, max_num_of_step, 0, beta_2, eps_adam, False)
        case Methods.Adam:
            sgd(lin_reg, lr, eps, batch, max_num_of_step, beta_1, beta_2, eps_adam)
        case Methods.Nesterov:
            sgd(lin_reg, lr, eps, batch, max_num_of_step, beta_1, 1, 1, False, False, True)


def lr_dichotomy(eps, delt):
    return lambda lin_reg: dichotomy(lin_reg, 0, right_border_calc(lin_reg), eps, delt)


def right_border_calc(func):
    right_start = 0.0000001
    zero = func(0.)
    while zero >= func(right_start):
        right_start *= 1.3

    return right_start


def dichotomy(func, a_1, a_2, eps, delt):
    while abs(a_1 - a_2) >= eps:
        new_a_1 = (a_1 + a_2) / 2 - delt
        new_a_2 = (a_1 + a_2) / 2 + delt
        fv1 = func(new_a_1)
        fv2 = func(new_a_2)
        if fv2 > fv1:
            a_2 = new_a_2
        elif fv2 < fv1:
            a_1 = new_a_1
        else:
            a_1 = new_a_1
            a_2 = new_a_2
    return (a_1 + a_2) / 2

In [None]:
import matplotlib.pyplot as plt


def visualise_points(linear_reg):
    x = np.linspace(-10, 10, 1000)
    y = linear_reg.W[0] * x + linear_reg.W[1]
    plt.plot(x, y, '-r')
    plt.plot(linear_reg.X, linear_reg.Y, 'og', linestyle='None')
    plt.xlabel("x")
    plt.show()
    print(linear_reg.W)
    print(linear_reg.loss_function_value(linear_reg.W))

In [None]:
current_t = np.array([lambda x: x, lambda x: 1.])
current_w = np.array([0., 0.])
current_x = np.array([1., 2., 9., -2., -10.])
current_y = np.array([1., 2., 9., -2., 5])

for method in Methods:
    for regularization in Regularization:
        for lr in LearningRate:
            linear_reg_const = LinearRegression(
                current_t, current_w, current_x, current_y, regularization
            )
            print(str(method) + " " + str(regularization) + " " + str(lr))
            if lr == LearningRate.Const:
                sgd_handler(linear_reg_const, lambda x: 0.01, method)
            elif lr == LearningRate.Dichotomy:
                sgd_handler(linear_reg_const, lr_dichotomy(0.001, 0.0001), method)
            visualise_points(linear_reg_const)
            current_w = np.array([0., 0.])
            print("\n\n")

In [None]:
def calc_y(x, coeffs):
    return sum([coeffs[i] * (x ** i) for i in range(len(coeffs))])


def generate_data(coeffs, num_of_points, x_range_left, x_range_right, deviation):
    x_es = []
    y_es = []
    for i in range(num_of_points):
        x_es.append(random.uniform(x_range_left, x_range_right))
        y_es.append(calc_y(x_es[i], coeffs) + random.uniform(-deviation, +deviation))
    return [np.array(x_es), np.array(y_es)]


def gen_linear_reg(coeffs, num_of_points, x_range_left, x_range_right, deviation, calculated_lambdas):
    t = np.array(calculated_lambdas)
    points = generate_data(coeffs, num_of_points, x_range_left, x_range_right, deviation)
    x = points[0]
    y = points[1]
    w = np.zeros(len(coeffs))
    return LinearRegression(t, w, x, y)


def test_universal(current_method, current_lr, linear_regression, start_coeffs):
    # 1 - mem, 2 - steps, 3 - time, 4 - sqrs
    res_univ = []
    start = time.time()
    tracemalloc.start()
    steps = sgd_handler(linear_regression, current_lr, current_method)
    res_univ.append(tracemalloc.get_traced_memory())
    tracemalloc.stop()
    end = time.time()
    res_univ.append(steps)
    res_univ.append(end - start)
    calculated_coeffs = linear_regression.W
    xes = linear_regression.X
    yes = linear_regression.Y
    calculated_vals = []
    for i in range(len(xes)):
        calculated_vals.append(calc_y(xes[i], start_coeffs))
    sums = 0.
    for i in range(len(xes)):
        sums += (calculated_vals[i] - yes[i]) ** 2 / len(calculated_coeffs)
    res_univ.append(sums)
    return res_univ


power_lambda = lambda power: lambda x: x ** power

first_tests_val = gen_linear_reg(
    coeffs=[24., -26., -15., 25., -9., 1.],
    num_of_points=15,
    x_range_left=1.,
    x_range_right=4.,
    deviation=0.0,
    calculated_lambdas=[power_lambda(5 - i) for i in range(6)]
)

start_w_for_first = np.array([0., 0., 0., 0., 0., 0.])
start_coeffs_for_first = [1., -9., 25., -15., -26., 24.]

test_results = {}


def copy(lin_reg):
    lin_reg.W = np.zeros(len(lin_reg.W))


for method in Methods:
    for regularization in Regularization:
        for lr in LearningRate:
            print(step)
            step += 1
            current_lr = None
            lr_name = None
            if lr == LearningRate.Const:
                current_lr = lambda x: 0.01
                lr_name = "const"
            elif lr == LearningRate.Dichotomy:
                current_lr = lr_dichotomy(0.001, 0.0001)
                lr_name = "dichotomy"
            results = test_universal(method, current_lr, first_tests_val, start_coeffs_for_first)
            copy(lin_reg=first_tests_val)
            test_results[
                "Method: " + method.name + " Regularization: " + regularization.name + " LR: " + lr_name] = results

for key in test_results:
    print(str(key) + " -> ")
    for val in test_results[key]:
        print(val)