In [1]:
import pandas as pd
import datetime
from datetime import datetime
import seaborn as sns
import matplotlib.pyplot as plt
from random import randint as rn
import numpy as np
from sklearn.metrics import r2_score

### Підготовча частина

Завантаження підготовленої таблиці даних

In [2]:
df = pd.read_pickle('data.pickle')
y_true = df.Value.copy()
df.reset_index(inplace=True, drop=True)
df_res = df.copy()

Видалення випадковим чином заданого відсотка значень

In [3]:
def count_nan(df_nan):
    value = list(df_nan.isna().sum().loc[df_nan.isna().sum() > 0].sort_values(ascending=True))
    if len(value) > 0:
        return value[0]
    else:
        return 0

def make_nan(df, share=0.2):
    target = np.ceil(len(df) * 0.2)
    while count_nan(df) < target:
        position = rn(0, len(df) - 1)
        df.at[position, 'Value'] = np.nan
    return df

df_nan = make_nan(df, share=0.2)

Перевірка кількості видалених значень у датасеті

In [4]:
print('Отримана таблиця містить {} NaN'.format(count_nan(df_nan)))

Отримана таблиця містить 20 NaN


### Інтерполяція методом найменших квадратів

Створюємо клас _MNK_LinearImputer_, який дозволить відновляти значенням шляхом інтерполяції лінійною функцією _y=ax+b_, коефіцієнти якої знаходяться так, щоб сума квадратів відхилень точок від апроксимуючих точок була мінімальною.

In [5]:
class MNK_LinearImputer:
    
    def __init__(self, n):
        self.X = None
        self.y = None
        self.res = None
        self.m = n

    def neighbour(self, k):
        """
        метод, повертає m найближчих до точки, що апроксимується, точок, для яких значення y не є NaN
        """
        n_minus = k - 1
        n_plus = k + 1
        points = []
        
        for _ in range(self.m):
            
            if n_minus > -1:
                while (np.isnan(self.y[n_minus])) & (n_minus > 0):
                    if n_minus > 0:                
                        n_minus -= 1
                        
            if n_plus < len(self.X) - 1:
                while (np.isnan(self.y[n_plus])) & (n_plus < len(self.X) - 1):
                    if n_plus < len(self.X) - 1:
                        n_plus += 1
            
            if n_minus < 0:
                points.append(n_plus)
                n_plus += 1
            elif n_plus > len(self.X) - 1:
                points.append(n_minus)
                n_minus -= 1
            elif np.isnan(self.y[n_plus]):
                points.append(n_minus)
                n_minus -= 1
            elif np.abs(self.X[n_minus] - self.X[k]) < np.abs(self.X[n_plus] - self.X[k]):
                points.append(n_minus)                
                n_minus -= 1
            else:
                points.append(n_plus)
                n_plus += 1
        return sorted(points)
                   
    def fit_transform(self, X, y):
        """
        Підставляє значення замість NaN, користуючись принципом лінійної інтерполяції
        """
        self.X = np.array(X)
        self.y = np.array(y)
        self.res = np.array(y)      
        for n in range(len(self.y)):
            if np.isnan(y[n]):
                points = self.neighbour(n)
                xi = self.X[points]
                yi = self.y[points]
                a = self.m * np.sum(np.dot(xi, yi)) - np.sum(xi) * np.sum(yi)
                a = a / (self.m * np.sum(np.dot(xi, xi)) - np.sum(xi) * np.sum(xi))
                b = np.sum(yi) - a * np.sum(xi)
                b = b / self.m
                self.res[n] = a * self.X[n] + b
        return self.res        

Проводимо відновлення значень

In [6]:
mnki = MNK_LinearImputer(4)
y_pred = mnki.fit_transform(df_nan.Time, df_nan.Value)
df['MNK_imputed'] = y_pred

In [7]:
print('Коефіцієнт детермінації = {}'.format(r2_score(y_true, y_pred)))

Коефіцієнт детермінації = 0.9885285628618102
