In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
TRAIN = 'train.csv'
TEST = 'test.csv'
df_correct_file = 'df_correct.csv'

In [3]:
X = pd.read_csv('x_train.csv')
X_train=X.to_numpy()
X_test = pd.read_csv('x_test.csv')
y = pd.read_csv('y_train.csv')
y_train = y.to_numpy()

In [4]:
class line_model:
    '''На вход принимаем вертикальные масивы numpy'''
    def __init__(self,d_train,targ_train,d_test =None,targ_test=None):
        self.d_train = d_train
        self.targ_train = targ_train
        self.d_test = d_test
        self.targ_test = targ_test
        
    def calc_std_feat(self,x):
        res = (x - x.mean(axis = 0)) / x.std(axis = 0)
        return res    
    
    def calc_norm_feat(self,x):
        res = (x - x.min(axis = 0)) / (x.max(axis = 0)-x.min(axis = 0))
        return res
    
    def R2(self,y_pred,y):
        return 1- ((y_pred - y)**2).sum()/((y-y.mean())**2).sum()
    
    def mse(self,y_pred,y):
        return ((y_pred - y)**2).mean()
    
    def mae(self,y_pred,y):
        return (np.abs(y_pred - y)).mean()
    
    def analitic_fit(self,X,Y):
        W = np.linalg.inv(X.T@X)@X.T@Y
        return W
    
    def gradient_descent(self,X,Y,alpha=0.05,count_iter_max = int(1e5),criteriy =1e-8 ):
        number_obj,number_features = X.shape
        Wr = np.random.randn(number_features).reshape(-1,1)
        for  i in range(count_iter_max):
            Y_pred = X@Wr
            delta = Y_pred-Y
            Wr_old = Wr.copy()
            Wr-=alpha*2/number_obj*(X.T@delta)
            if (((Wr-Wr_old).T@(Wr-Wr_old)))**(1/2)<criteriy:
                return Wr     
        return Wr
    
    def predict(self,X,Wr):
        return X@Wr
    
    def insert_one(self,data):
        a=np.ones(len(data))
        data = np.hstack((np.column_stack(a).reshape(-1,1),data))
        return data
        
    def split(self,X,target,size):   
        train_data, test_data, train_labels, test_labels = train_test_split(X, target,test_size = size)
        return train_data, test_data, train_labels, test_labels
    
    def cross_validation(self,n,size):
        data=[]
        r = np.arange(n)
        for i in range(size-1):
            ind = np.random.choice(r,size=int(n/size), replace=False)
            data.append(ind)
            r=np.setdiff1d(r,ind) 
        data.append(r)
        return data
        
    def test_analytical_solution_split(self,num_cros,size,print_res = True) :
        '''Функция для проведения тестирования через аналитическое решение
        num_cros - количесвто случайных разбиений, size - размер деления выборки на  валидации
        ,без теста'''
        #  стандартизируем данные
        X = self.calc_std_feat(self.d_train)

        # Добавим еденичный столбец
        X = self.insert_one(X)
        data=[]
        for i in range (num_cros):

            train_data, test_data, train_labels, test_labels = self.split(X,self.targ_train,size)
            W = self.analitic_fit(train_data,train_labels)
            pred_train = self.predict(train_data,W)
            pred_test =  self.predict(test_data,W)
            r2_test = self.R2(pred_test,test_labels)
            r2_train = self.R2(pred_train,train_labels)
            data.append((r2_test,r2_train))
            if print_res:
                print(f'Выборка номер {i} r2 на обучении {r2_train}, r2 на тесте {r2_test}')
                
        return data
    
    
    def test_analytical_solution_cros(self,size,print_res = True) :
        '''Функция для проведения тестирования через аналитическое решение
         - количесвто крос валидаций, size - размер деления выборки на крос валидации'''
        #  стандартизируем данные
        X = self.calc_std_feat(self.d_train)  
        Y = self.targ_train
        # Добавим еденичный столбец
        X = self.insert_one(X)
        cros_val = self.cross_validation(X.shape[0],size)
        data=[]
        for i in  range(len(cros_val)):
            ind_train = np.hstack((cros_val[:i]+cros_val[i+1:]))
            ind_test = cros_val[i]
            W = self.analitic_fit(X[ind_train],Y[ind_train])
            pred_train = self.predict(X[ind_train],W)
            pred_test =  self.predict(X[ind_test],W)
            r2_test = self.R2(pred_test,Y[ind_test])
            r2_train = self.R2(pred_train,Y[ind_train])
            data.append((r2_test,r2_train))
            if print_res:
                print(f'Выборка номер {i} r2 на обучении {r2_train}, r2 на тесте {r2_test}')
                
        return data
    
    def test_gradient_solution_split(self,num_cros,size,print_res = True) :
        '''Функция для проведения тестирования через градиентный спуск решение
        num_cros - количесвто случайных разбиений, size - размер деления выборки на  валидации
        ,без теста'''
        #  стандартизируем данные
        X = self.calc_std_feat(self.d_train)

#         Добавим еденичный столбец
        X = self.insert_one(X)
        data=[]
        for i in range (num_cros):
            train_data, test_data, train_labels, test_labels = self.split(X,self.targ_train,size)
            W = self.gradient_descent(train_data,train_labels.reshape(-1,1))
            pred_train = self.predict(train_data,W)
            pred_test =  self.predict(test_data,W)
            r2_test = self.R2(pred_test,test_labels.reshape(-1,1))
            r2_train = self.R2(pred_train,train_labels.reshape(-1,1))
            data.append((r2_test,r2_train))
            if print_res:
                print(f'Выборка номер {i} r2 на обучении {r2_train}, r2 на тесте {r2_test}')     
        return data
    
    def test_gradient_solution_cros(self,size,print_res = True) :
            '''Функция для проведения тестирования через градиентный спуск решение
            num_cros - количесвто случайных разбиений, size - размер деления выборки на  валидации
            ,без теста'''
            #  стандартизируем данные
            X = self.calc_std_feat(self.d_train)
            Y = self.targ_train
            #Добавим еденичный столбец
            X = self.insert_one(X)
            cros_val = self.cross_validation(X.shape[0],size)    
            data=[]
            for i in range (len(cros_val)):
                ind_train = np.hstack((cros_val[:i]+cros_val[i+1:]))
                ind_test = cros_val[i]
                W = self.gradient_descent(X[ind_train],Y[ind_train].reshape(-1,1))
                pred_train = self.predict(X[ind_train],W)
                pred_test =  self.predict(X[ind_test],W)
                r2_test = self.R2(pred_test,Y[ind_test].reshape(-1,1))
                r2_train = self.R2(pred_train,Y[ind_train].reshape(-1,1))
                data.append((r2_test,r2_train))
                if print_res:
                    print(f'Выборка номер {i} r2 на обучении {r2_train}, r2 на тесте {r2_test}')

            return data        

In [5]:
test = line_model(X_train,y_train)

In [6]:
test.test_analytical_solution_cros(5)

Выборка номер 0 r2 на обучении 0.6463442004614617, r2 на тесте 0.6589200833542022
Выборка номер 1 r2 на обучении 0.6489951517370391, r2 на тесте 0.6490673461115197
Выборка номер 2 r2 на обучении 0.6506621699363805, r2 на тесте 0.6418012323349398
Выборка номер 3 r2 на обучении 0.647027108997356, r2 на тесте 0.6560527352019226
Выборка номер 4 r2 на обучении 0.6525954377639986, r2 на тесте 0.6340094591583141


[(0.6589200833542022, 0.6463442004614617),
 (0.6490673461115197, 0.6489951517370391),
 (0.6418012323349398, 0.6506621699363805),
 (0.6560527352019226, 0.647027108997356),
 (0.6340094591583141, 0.6525954377639986)]

Как видно результаты на крос валидации не впечатляют