In [None]:
from enum import Enum

import numpy as np
import torch.nn as nn
import torch.optim
from scipy.optimize import minimize, least_squares
from torch import float64
from torch.autograd import Variable

In [None]:
from linreg import Methods, LearningRateScheduling


def optimizer_handler(method, params, lr, beta_1=0.9, beta_2=0.999, factor=10):
    match method:
        case Methods.Classic:
            return torch.optim.SGD(params, lr)
        case Methods.Momentum:
            return torch.optim.SGD(params, lr, beta_1)
        case Methods.AdaGrad:
            return torch.optim.Adagrad(params, lr * factor)
        case Methods.RMSprop:
            return torch.optim.RMSprop(params, lr, alpha=beta_2)
        case Methods.Adam:
            return torch.optim.Adam(params, lr, betas=(beta_1, beta_2))
        case Methods.Nesterov:
            return torch.optim.SGD(params, lr, nesterov=True, momentum=beta_1)


def lr_scheduler_handler(optimizer, lrs, lr, epoch_size=30):
    match lrs:
        case LearningRateScheduling.Classic:
            return torch.optim.lr_scheduler.LambdaLR(optimizer, lambda *_: 1)
        case LearningRateScheduling.Stepwise:
            return torch.optim.lr_scheduler.StepLR(optimizer, step_size=epoch_size, gamma=0.75)
        case LearningRateScheduling.Exponential:
            return torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99)


class TorchLinearRegression:
    def __init__(self, T, X, Y, W=None):
        self.T_funcs = T
        self.T = torch.tensor([T[i % len(T)](X[i // len(T)]) for i in range(len(T) * len(X))], dtype=float64).reshape(
            len(X), len(T))
        self.X = torch.tensor(X, dtype=float64)
        self.Y = torch.tensor(Y, dtype=float64)
        self.loss_f = nn.MSELoss(reduction='mean')

        self.refresh(W)

    def optimize(self, method=Methods.Classic, lr=0.01, lrs=LearningRateScheduling.Classic, max_steps=1500):
        optimizer = optimizer_handler(method, [self.W], lr)
        scheduler = lr_scheduler_handler(optimizer, lrs, lr=lr)

        for i in range(max_steps):
            optimizer.zero_grad()
            self.loss(self.W, is_no_grad=False)
            optimizer.step()
            self.W_points.append(self.W.clone().detach().numpy())
            scheduler.step()

            print(f"Method: {method.name} | LRS: {lrs} | LR: {optimizer.param_groups[0]['lr']}")

        return self.W

    def loss(self, W, is_no_grad=True):
        if isinstance(W, np.ndarray):
            W = np.array(W).reshape(len(self.T_funcs), 1)
            W = Variable(torch.tensor(W, dtype=float64), requires_grad=True)
        if is_no_grad:
            with torch.no_grad():
                loss_val = self.loss_f(self.T @ W, self.Y)
                return float(loss_val) * len(self.X)
        else:
            loss_val = self.loss_f(self.T @ W, self.Y)
            loss_val.backward()
            return float(loss_val)

    def refresh(self, W=None):
        if isinstance(W, np.ndarray):
            W = np.array(W, dtype=float64).reshape(len(self.T_funcs), 1)
        if W is None:
            W = torch.randn(len(self.T_funcs), 1, dtype=float64)
        self.W = Variable(torch.tensor(W), requires_grad=True)
        self.W_points = [torch.clone(self.W).detach().numpy()]

    def analytical_solution(self):
        return (torch.linalg.inv(torch.t(self.T) @ self.T) @ torch.t(self.T)) @ self.Y

In [None]:
from matplotlib import pyplot as plt


def visualise(f, points, title, x_label="x", y_label="y"):
    values = np.transpose(np.array(points))
    X = np.linspace(min(values[0]) - 10, max(values[0]) + 10, 100)
    Y = np.linspace(min(values[1]) - 10, max(values[1]) + 10, 100)
    Z = [[f(np.array([X[i], Y[j]])) for i in range(len(X))] for j in range(len(Y))]
    plt.contour(X, Y, Z, 80)

    plt.plot(values[0], values[1], marker='.')
    plt.plot(values[0][0], values[1][0], 'og')
    plt.plot(values[0][-1], values[1][-1], 'or')
    plt.title(title)
    plt.legend(['Route', 'Start point', 'End point'])
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.show()


def print_errors(excepted, received, suf=""):
    print("Excepted " + suf + ": " + str(excepted))
    print("Received " + suf + ": " + str(received))
    print("Absolute error " + suf + ": " + str(np.linalg.norm(excepted - received)))
    print("Relative error " + suf + ": " + str(np.linalg.norm(excepted - received) / np.linalg.norm(excepted)))

In [None]:
import random
from linreg import gen_linear_reg, visualise_approximation, sgd_handler


def linreg_errors(linreg, torch_linreg):
    print("-------------------------")

    expected_linreg_loss = linreg.loss(linreg.analytical_solution())
    linreg_loss = linreg.loss(linreg.W)
    print_errors(expected_linreg_loss, linreg_loss, suf="loss")

    print()

    expected_torch_linreg_loss = torch_linreg.loss(torch_linreg.analytical_solution())
    torch_linreg_loss = torch_linreg.loss(torch_linreg.W)
    print_errors(expected_torch_linreg_loss, torch_linreg_loss, suf="torch loss")

    print("-------------------------")


def test_sgd_variants(errors=False):
    count_2_arity = 1
    count_other_arity = 1
    left_coeffs_border = -2.
    right_coeffs_border = 2.
    left_x_border = -2.
    right_x_border = 2.
    deviation = 3.

    for i in range(count_2_arity + count_other_arity):
        arity = 2 if i < count_2_arity else random.randint(3, 8)
        num_train_points = random.randint(50, 100)
        start_point = np.array([float(random.randint(15, 30)) for i in range(arity)])
        linreg = gen_linear_reg(
            arity - 1, num_train_points,
            left_coeffs_border, right_coeffs_border,
            left_x_border, right_x_border,
            deviation
        )
        torch_linreg = TorchLinearRegression(linreg.T_funcs, linreg.X, linreg.Y, torch.tensor(start_point))
        self_scheduled = (Methods.AdaGrad, Methods.Adam, Methods.RMSprop)

        for method in Methods:
            lr = 0.01
            if method in self_scheduled:
                lr = 0.1
                linreg.refresh(start_point)
                sgd_handler(linreg, lambda *_: lr, method, store_points=True)
                title = 'OUR ' + method.name
                if len(linreg.T_funcs) == 2:
                    visualise(linreg.loss, linreg.W_points, title)
                visualise_approximation(linreg, title)

                title = 'ZAPADNOE ' + method.name
                torch_linreg.refresh(torch.tensor(start_point))
                torch_linreg.optimize(method, lr=lr)
                if len(torch_linreg.T_funcs) == 2:
                    visualise(torch_linreg.loss, torch_linreg.W_points, title)
                visualise_approximation(torch_linreg, title)

                if errors:
                    linreg_errors(linreg, torch_linreg)

test_sgd_variants(errors=True)

In [None]:
def derivative(f, x, i, delt=0.0001):
    x_1 = np.copy(x)
    x_2 = np.copy(x)
    x_1[i] += delt
    x_2[i] -= delt
    y_1 = f(x_1)
    y_2 = f(x_2)
    return (y_1 - y_2) / (2 * delt)


def grad(f, delt=0.01):
    def grad_calc(x):
        array = []
        for i in range(len(x)):
            array.append(derivative(f, x, i, delt))
        return np.array(array)

    return grad_calc


def hessian(f):
    def calc(x):
        B = np.asarray([[0. for _ in range(len(x))] for _ in range(len(x))])
        for i in range(len(x)):
            for j in range(len(x)):
                B[i][j] = derivative(lambda x_tmp: derivative(f, x_tmp, j), x, i)
        return B

    return calc

In [None]:
Algorithms = Enum('Methods', ['Newton', 'DogLeg', 'BFGS', 'LBFGS'])


def optimize_handler(fun, x0, algorithm=Algorithms.Newton):
    match algorithm:
        case Algorithms.Newton:
            return least_squares(fun, x0)
        case Algorithms.DogLeg:
            return minimize(fun, x0, method='dogleg', jac=grad(fun), hess=hessian(fun))
        case Algorithms.BFGS:
            return minimize(fun, x0, method='BFGS')
        case Algorithms.LBFGS:
            return minimize(fun, x0, method='L-BFGS-B')

In [None]:
#gen random excepted coeffs
def gen_excepted(lb=-10, rb=10, min_size=2, max_size=10):
    return np.random.uniform(lb, rb, size=(np.random.randint(min_size, max_size)))


#gen parameters with poly functions of points
def gen_parameters(M, N=100, deviation=0.01, is_random_N=False, lb_N=50, rb_N=200):
    if is_random_N:
        N = np.random.randint(lb_N, rb_N)

    noise = torch.randn(N, 1, dtype=float64) * deviation
    powers = [(M - 1 - i) for i in range(M)]
    Funcs = np.array([lambda x, i=i: (x ** powers[i]) for i in range(M)])
    X = torch.randn(N, 1, dtype=float64)
    Y = sum([float(excepted[i]) * Funcs[i](X) for i in range(M)]) + noise
    T = torch.zeros(len(X), len(Funcs), dtype=float64)
    for i in range(len(X)):
        for j in range(len(Funcs)):
            T.data[i, j] = Funcs[j](X.data[i])

    return (X, Y, T, Funcs)


#gen least squares loss function
def gen_f(X, Y, T, Funcs):
    def f(W):
        W1 = None
        if isinstance(W, torch.Tensor):
            W1 = W
        else:
            W1 = np.copy(W).reshape(len(Funcs), 1)
            W1 = torch.tensor(W1, dtype=float64)
        model = T.mm(W1)
        mse = nn.MSELoss()
        res = mse(model, Y)
        return res if isinstance(W, torch.Tensor) else res.item()

    return f


#gen random x0
def gen_x0(len, lb=-20, rb=20):
    return np.random.uniform(lb, rb, size=(len))


#numeric grad for any function
def num_grad(f, delt=0.01):
    return grad(f, delt=delt)


#analityc grad for any linear function of weights with the least squares problem
def an_grad(X, Y, T, Funcs):
    M = len(Funcs)

    def an_grad_calc(W):
        array = []
        components_value = ((T @ torch.tensor(W, dtype=float64).reshape(len(W), 1)) - Y)
        for i in range(M):
            x_fi = Funcs[i](X)
            # torch.pow(X, (M - 1 - i))
            array.append((x_fi.T @ components_value).item())
        return np.array(array)

    return an_grad_calc


#torch grad for any function with the least squares problem
def torch_grad(f):
    def torch_grad_calc(W):
        x = torch.tensor(W, dtype=float64, requires_grad=True).reshape(len(W), 1)
        x.retain_grad()
        Q = f(x)
        Q.backward()
        return x.grad.T.detach().numpy().reshape(len(W))

    return torch_grad_calc

In [None]:
#Examples


#2.a

#Excepted
excepted = gen_excepted()
M = len(excepted)

#Parameters
X1, Y1, T1, Funcs1 = gen_parameters(M, is_random_N=True)

#Function
f1 = gen_f(X1, Y1, T1, Funcs1)

#x0
x0_1 = gen_x0(len(Funcs1))

#Calculations
optimize_result = optimize_handler(f1, x0_1, Algorithms.DogLeg)
received = optimize_result.x

print("Example 2.a")
print("-------------------------")
print_errors(excepted, received)
print("-------------------------")
print()

#2.b

#Grads
num_grad_f1 = num_grad(f1)
an_grad_f1 = an_grad(X1, Y1, T1, Funcs1)
torch_grad_f1 = torch_grad(f1)

#Calculations
with_num_grad = minimize(f1, x0_1, method='CG', jac=num_grad_f1).x
with_an_grad = minimize(f1, x0_1, method='CG', jac=an_grad_f1).x
with_torch_grad = minimize(f1, x0_1, method='CG', jac=torch_grad_f1).x

print("Example 2.b")
print("-------------------------")
print_errors(excepted, with_num_grad, suf="with num grad")

print()

print_errors(excepted, with_an_grad, suf="with an grad")

print()

print_errors(excepted, with_torch_grad, suf="with torch grad")
print("-------------------------")