In [None]:
import numpy as np
import pandas as pd
import random

# Линейные модели

## 1. Линейная регрессия

**Теорема Гаусса-Маркова**

In [None]:
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=1000, n_features=14, n_informative=10, noise=15, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [None]:
class MyLineReg:
    def __init__(self,
                 n_iter = 100,                #количество шагов градиентного спуска
                 learning_rate = 0.1,         #коэффициент скорости обучения градиентного спуска
                 metric = None,               #выводимые метрики (mae, rmse, mape, r2)
                 reg = None,                  #регуляризация (l1, l2, elasticnet)
                 l1_coef = 0,                 #коэффициент L1 регуляризации (от 0.0 до 1.0)
                 l2_coef = 0,                 #коэффициент L2 регуляризации (от 0.0 до 1.0)
                 sgd_sample = None,           #кол-во строк, которое будет использоваться на каждой итерации обучения.
                                              #(Целые числа или дробные от 0.0 до 1.0)
                 random_state = 42):          #сид для рандомайзера

        self.weights = None
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.metric = metric
        self.metrics = {
            'mse':  lambda y, y_pred: np.mean((y - y_pred) ** 2) + self.l1 * np.sum(abs(self.weights)) + self.l2 * np.sum((self.weights) ** 2),
            'mae':  lambda y, y_pred: np.mean(np.abs(y - y_pred)),
            'rmse': lambda y, y_pred: np.sqrt(np.mean((y - y_pred) ** 2)),
            'mape': lambda y, y_pred: np.mean(np.abs((y - y_pred) / y)) * 100,
            'r2':   lambda y, y_pred: 1 - np.sum((y - y_pred) ** 2) / np.sum((y - np.mean(y)) ** 2)
        }
        self.best_score = None
        self.reg = reg
        self.l1 = 0 if self.reg == None or self.reg == 'l2' else l1_coef
        self.l2 = 0 if self.reg == None or self.reg == 'l1' else l2_coef
        self.sgd_sample = sgd_sample
        random.seed(random_state)

    #Обучение модели
    def fit(self, X, y, verbose = False):

        #добавляем единичный столбец
        X = np.column_stack((np.ones(len(X)), X))

        #задаём начальные веса
        self.weights = np.ones(len(X[0]))

        #делаем n_iter шагов градиентного спуска
        for i in range(1, self.n_iter + 1):

            #предсказываем значения y
            y_pred = X @ self.weights
            y_ = y_pred - y

            #Стахостический градиентный спуск
            if self.sgd_sample:
              if isinstance(self.sgd_sample, int):
                  sgd_sample = self.sgd_sample                #Если задано целое число, то из исходного датасета берется это количество строк
              else:
                  sgd_sample = int(len(X) * self.sgd_sample)  #Если задано дробное число, то берем долю от количества строк в исходном датасете

              sample_rows_idx = np.sort(random.sample(range(X.shape[0]), sgd_sample))   #выбираем нужное число строк и сортируем индексы
              X_ = X[sample_rows_idx]
              y_ = y_.iloc[sample_rows_idx]
            else:
              X_ = X

            #считаем градиент
            mse_grad = 2/len(y_) * y_ @ X_ + self.l1 * np.sign(self.weights) + self.l2 * 2 * self.weights

            #Динамически задаваемый коэфициент скорости обучения
            if callable(self.learning_rate):
                learning_rate = self.learning_rate(i)         #Если коэфициент дробный, считаем значение лямбда-функции на этой итерации
            else:
                learning_rate = self.learning_rate

            #Считаем новое значение весов (делаем спуск)
            self.weights -= mse_grad * learning_rate

            #Лучшее значение заданной метрики
            if self.metric:
                self.best_score = self.metrics[self.metric](y, y_pred)

            #выводить лог на некоторой итерации
            if verbose and (i % verbose) == 0:
                print(f'{i if i != 0 else "start"}',
                      f' | loss:  {self.metrics["mse"](y, y_pred):.4f}',
                      f' | {self.metric}: {self.best_score:.4f}' if self.metric else '',
                      f'| lr: {learning_rate:.4f}')

    #Получаем веса
    def get_coef(self):
        return np.sum(self.weights[1:])

    #Предсказываем значения y
    def predict(self, X):
        X = np.column_stack((np.ones(len(X)), X))
        return X @ self.weights

    #Получаем значение по выбранной метрики после обучения
    def get_best_score(self):
        return self.best_score

    #При вызове класса выводим параметры
    def __str__(self):
        return f"MyLineReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"

#Создаем объект класса
fun = lambda iter: 0.5 * (0.85 ** iter) #функция изменения коэфициента скорости обучения
Reg = MyLineReg(n_iter = 10, learning_rate = fun, metric = 'mse',
                reg = 'l2', l1_coef = 0.01, l2_coef = 0.001,
                sgd_sample = 0.8)

print("Обучение модели: \n")
Reg.fit(X, y, 1)

print("\nСреднее значение полученных весов:", Reg.get_coef())

print(f"\nПредсказанные веса: {Reg.predict(X)[:5]} ...")

print(f"\nОценка {Reg.metric}: {Reg.get_best_score()}")



Обучение модели: 

1  | loss:  20637.2027  | mse: 20637.2027 | lr: 0.4250
2  | loss:  860.8836  | mse: 860.8836 | lr: 0.3612
3  | loss:  332.1451  | mse: 332.1451 | lr: 0.3071
4  | loss:  266.7449  | mse: 266.7449 | lr: 0.2610
5  | loss:  251.4674  | mse: 251.4674 | lr: 0.2219
6  | loss:  248.1069  | mse: 248.1069 | lr: 0.1886
7  | loss:  246.3315  | mse: 246.3315 | lr: 0.1603
8  | loss:  246.1792  | mse: 246.1792 | lr: 0.1362
9  | loss:  245.7645  | mse: 245.7645 | lr: 0.1158
10  | loss:  245.6243  | mse: 245.6243 | lr: 0.0984

Среднее значение полученных весов: 426.75569340342656

Предсказанные веса: [ -61.36800733  131.73063372  -52.11984902   22.98842222 -131.27274931] ...

Оценка mse: 245.62426530645317


Сравним полученные веса с весами, полученными с помощь аналитического решения:

In [None]:
def anal_sol(X, y):
    return np.linalg.inv(X.T @ X) @ X.T @ y

anal_Reg = MyLineReg(50, 0.1)
anal_Reg.fit(X, y, 10)

pd.DataFrame({'anal_w': anal_sol(X, y),
             'pred_w': anal_Reg.weights[1:],
             'dif_w': abs(anal_sol(X, y) - anal_Reg.weights[1:])
             })

Unnamed: 0,anal_w,pred_w,dif_w
0,42.922778,42.918088,0.004691
1,16.594022,16.59647,0.002448
2,0.498153,0.497913,0.000239
3,65.380052,65.371968,0.008084
4,47.575574,47.591063,0.01549
5,61.765609,61.776303,0.010694
6,0.247919,0.246265,0.001654
7,-0.123701,-0.10781,0.015891
8,60.043163,60.012217,0.030946
9,53.904415,53.889591,0.014825


Проверим невозможность нахождения аналитического решения, если несколько признаков скорелировны друг с другом (имеют зависимость)
Должна получиться невырожденная матрица.

In [None]:
x_dep = np.random.rand(10, 3)
x_dep = np.column_stack((x_dep, 2 * x_dep[:,2]))
y_dep = np.random.rand(10, 1).flatten()

# print(anal_sol(x_dep, y_dep)) - Singular matrix
dep_Reg = MyLineReg(500, 0.1, reg = None, l1_coef = 0.01, l2_coef=0.001)
dep_Reg.fit(x_dep, y_dep, 100)
print(dep_Reg.weights)
print(dep_Reg.get_coef())

100  | loss:  0.0642  | lr: 0.1000
200  | loss:  0.0601  | lr: 0.1000
300  | loss:  0.0594  | lr: 0.1000
400  | loss:  0.0592  | lr: 0.1000
500  | loss:  0.0592  | lr: 0.1000
[ 0.47761083 -0.36946212  0.16908924  0.42072171 -0.15855658]
0.061792237587940696


По идее регуляризация должна давать нам лучше значения, так мы к матрице добавляем небольшой придаток, и избегаем невурожденности обратной матрицы. Но чет ничего не менятеся.

## 2. Логистическая регрессия

In [None]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=1000, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [None]:
class MyLogReg:
    def __init__(self,
               n_iter = 100,
               learning_rate  = 0.1,
               metric = None,
               reg = None,
               l1_coef = 0,
               l2_coef = 0,
               sgd_sample = None,
               random_state = 42):

        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = None
        self.metric = metric
        self.best_score = None
        self.reg = reg
        self.l1 = 0 if self.reg == None or self.reg == 'l2' else l1_coef
        self.l2 = 0 if self.reg == None or self.reg == 'l1' else l2_coef
        self.sgd_sample = sgd_sample
        random.seed(random_state)

    def __str__(self):
        return f"MyLogReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"

    # Матрица ошибок
    def matr_err(self, y, y_pred):
        y_pred = np.where(y_pred > 0.5, 1, 0)
        TP = np.sum((y_pred == 1) & (y == 1))
        TN = np.sum((y_pred == 0) & (y == 0))
        FP = np.sum((y_pred == 1) & (y == 0))
        FN = np.sum((y_pred == 0) & (y == 1))
        return TP, TN, FP, FN

    # Вычисление метрик
    def logloss(self, y, y_pred):
        eps = 1e-15
        return - np.mean(y * np.log(y_pred + eps) + (1 - y) * np.log(1 - y_pred + eps)) + \
        + self.l1 * np.sum(abs(self.weights)) + self.l2 * np.sum((self.weights) ** 2)

    def accuracy(self, y, y_pred):
        TP, TN, FP, FN = self.matr_err(y, y_pred)
        return (TP + TN) / (TP + FP + FN + TN)

    def precision(self, y, y_pred):
        TP, TN, FP, FN = self.matr_err(y, y_pred)
        return TP / (TP + FP)

    def recall(self, y, y_pred):
        TP, TN, FP, FN = self.matr_err(y, y_pred)
        return TP / (TP + FN)

    def f1(self, y, y_pred):
        return 2 * self.precision(y, y_pred) * self.recall(y, y_pred) / (self.precision(y, y_pred) + self.recall(y, y_pred))

    def roc_auc(self, y, y_pred):
        roc_auc = 0
        y_sort = sorted(zip(y_pred, y), reverse=True)

        P = np.sum(y == 1)
        N = np.sum(y == 0)

        for i in range(len(y)):
            if y_sort[i][1] == 0:
                for j in range(i):
                    if y_sort[j][1] == 1:
                        if y_sort[i][0] > y_sort[j][0]:
                            I_a = 0
                        elif y_sort[i][0] == y_sort[j][0]:
                            I_a = 0.5
                        else:
                            I_a = 1
                        roc_auc += I_a

        return roc_auc / (P * N)

    def fit(self, X, y, verbose = False):
        X = np.column_stack((np.ones(len(X)), X))
        self.weights = np.ones(len(X[0]))
        y_pred = 1 / (1 + np.exp(- X @ self.weights))
        y_ = y_pred - y

        for i in range (1, self.n_iter + 1):

            #Стахостический градиентный спуск
            if self.sgd_sample:
              if isinstance(self.sgd_sample, int):
                  sgd_sample = self.sgd_sample                #Если задано целое число, то из исходного датасета берется это количество строк
              else:
                  sgd_sample = int(len(X) * self.sgd_sample)  #Если задано дробное число, то берем долю от количества строк в исходном датасете

              sample_rows_idx = np.sort(random.sample(range(X.shape[0]), sgd_sample))   #выбираем нужное число строк и сортируем индексы
              X_ = X[sample_rows_idx]
              y_ = y_[sample_rows_idx]
            else:
              X_ = X

            log_grad = (y_ @ X_) /len(y_) + self.l1 * np.sign(self.weights) + self.l2 * 2 * self.weights

            #Динамически задаваемы коэфициент скорости обучения
            if callable(self.learning_rate):
                learning_rate = self.learning_rate(i)         #Если коэфициент дробный, считаем значение лямбда-функции на этой итерации
            else:
                learning_rate = self.learning_rate

            self.weights -= log_grad * learning_rate
            y_pred = 1 / (1 + np.exp(- X @ self.weights))
            y_ = y_pred - y

            if verbose and i % verbose == 0:
                print(f"{i} | loss: {self.logloss(y, y_pred):.4f}",
                      f"| {self.metric}: {eval(f'self.{self.metric}(y, y_pred)'):.4f}" if self.metric else '')

        # Вычисление наилучшего значения по метрике
        if self.metric:
            self.best_score = eval(f'self.{self.metric}(y, y_pred)')

    def get_coef(self):
        return np.sum(self.weights[1:])

    def predict_proba(self, X):
        X = np.column_stack((np.ones(len(X)), X))
        return 1 / (1 + np.exp(- X @ self.weights))

    def predict(self, X):
        return np.where(self.predict_proba(X) > 0.5, 1, 0)

    def get_best_score(self):
        return self.best_score

y = np.where(y > 0.5, 1, 0)

Reg = MyLogReg(50, 0.1, sgd_sample=2)
Reg.fit(X, y, 10)
print(Reg.get_coef())
# Reg.get_best_score()
# y_pred = Reg.predict_proba(X)

10 | loss: 2.4526 
20 | loss: 1.6907 
30 | loss: 1.2620 
40 | loss: 0.9770 
50 | loss: 1.3569 
4.069654228119178


# Метрические алгоритмы

## Метод ближайших соседей (классификация)

In [2]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=300, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]
X_train = X[:200]
X_test = X[200:]

In [3]:
import numpy as np
import pandas as pd

class MyKNNClf():
    def __init__(self,
                 k=3,
                 metric='euclidean',  # euclidean, chebyshev, manhattan, cosine
                 weight='uniform'):  # uniform, rank, distance
        self.k = k
        self.train_size = None
        self.X = None
        self.y = None
        self.metric = metric
        self.metrics = {
            'euclidean': lambda row: np.sqrt(np.sum((row - self.X) ** 2, axis=1)),
            'chebyshev': lambda row: np.max(np.abs(row - self.X), axis=1),
            'manhattan': lambda row: np.sum(np.abs(row - self.X), axis=1),
            'cosine': lambda row: 1 - (np.dot(self.X, row) / (np.linalg.norm(self.X, axis=1) * np.linalg.norm(row)))
        }
        self.weight = weight

    def __str__(self):
        return f"MyKNNClf class: k={self.k}, metric={self.metric}, weight={self.weight}"

    def fit(self, X, y):
        self.X = X
        self.y = y
        self.train_size = np.shape(X)
        return self.train_size

    def predict_y(self, row):
        distances = self.metrics[self.metric](row).sort_values().head(self.k)
        sorted_k_index = distances.index
        y_index = self.y.iloc[sorted_k_index].reset_index(drop=True)

        if self.weight == 'uniform':
            return y_index.mean()

        elif self.weight == 'rank':
            weights = 1 / (np.arange(1, self.k + 1))
            weighted_sum = np.sum(weights * y_index)
            return weighted_sum / np.sum(weights)

        elif self.weight == 'distance':
            weights = 1 / (distances)
            weighted_sum = np.sum(weights * np.array(y_index))
            return weighted_sum / np.sum(weights)

    def predict(self, X):
        return self.predict_proba(X).apply(lambda x: 1 if x >= 0.5 else 0)

    def predict_proba(self, X):
        return X.apply(self.predict_y, axis=1)


KNN = MyKNNClf(5, weight = 'distance')
KNN.fit(X_train, y)
KNN.predict(X_test)

200    0
201    1
202    0
203    0
204    1
      ..
295    0
296    0
297    1
298    0
299    0
Length: 100, dtype: int64

## Метод ближайших соседей (регрессия)

In [None]:
from sklearn.datasets import make_regression

X, y = make_regression(n_samples= 50, n_features= 5, n_informative= 2, noise= 5, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]
X_train = X[:40]
X_test = X[40:]


       col_0     col_1     col_2     col_3     col_4
40 -0.026514  2.463242 -0.192361  0.060230 -1.918771
41 -0.114736  0.865755 -1.200296  0.504987 -0.792521
42  1.865775 -1.191303  0.656554  0.473833 -0.714351
43  0.856399 -1.245739  0.173181  0.214094 -0.446515
44  0.915402 -0.529760  0.513267  0.328751 -0.501757
45  0.013002 -0.264657  2.720169  1.453534  0.827183
46 -2.025143 -0.661786  0.852433  0.186454  0.633919
47 -0.815810  0.341152  0.276691 -0.077102 -0.889514
48 -0.034712  1.142823  0.751933 -1.168678  0.301547
49  0.171368 -0.301104 -1.478522 -0.115648  0.738467


In [None]:
class MyKNNReg():
    def __init__(self,
                 k = 3,
                 metric = 'euclidean',
                 weight='uniform'):

        self.k = k
        self.train_size = None
        self.X = None,
        self.y = None,

        self.metric = metric
        self.metrics = {
            'euclidean': lambda row: np.sqrt(np.sum((row - self.X) ** 2, axis=1)),
            'chebyshev': lambda row: np.max(np.abs(row - self.X), axis=1),
            'manhattan': lambda row: np.sum(np.abs(row - self.X), axis=1),
            'cosine': lambda row: 1 - (np.dot(self.X, row) / (np.linalg.norm(self.X, axis=1) * np.linalg.norm(row)))
        }

        self.weight = weight

    def __str__(self):
        return f"MyKNNReg class: k={self.k}"

    def fit(self, X, y):
        self.train_size = np.shape(X)
        self.X = X
        self.y = y

    def predict_y(self, row):
        distances = pd.Series(self.metrics[self.metric](row)).sort_values().head(self.k)
        sorted_k_index = distances.index
        y_index = self.y.iloc[sorted_k_index].reset_index(drop=True)

        if self.weight == 'uniform':
            return y_index.mean()

        elif self.weight == 'rank':
            weights = 1 / (np.arange(1, self.k + 1))
            weighted_sum = np.sum(weights * y_index)
            return weighted_sum / np.sum(weights)

        elif self.weight == 'distance':
            weights = 1 / (distances)
            weighted_sum = np.sum(weights * np.array(y_index))
            return weighted_sum / np.sum(weights)

    def predict(self, X):
        return X.apply(self.predict_y, axis=1)

KNN = MyKNNReg(5, metric = 'manhattan', weight='rank')
KNN.fit(X_train, y)
KNN.predict(X_test)

40   -64.086079
41   -23.467373
42    14.459151
43    10.211394
44     6.109441
45     1.072139
46    19.153125
47   -50.885469
48     8.098202
49    41.980221
dtype: float64

 # Деревья решений

## Классификация

In [None]:
class MyTreeClf:
    def __init__(self, max_depth=5, min_samples_split=2, max_leafs=20):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_leafs = max_leafs

    def __str__(self):
        return (f"MyTreeClf class: max_depth={self.max_depth}, "
                f"min_samples_split={self.min_samples_split}, "
                f"max_leafs={self.max_leafs}")

# Пример использования
tree = MyTreeClf(max_depth=10, min_samples_split=3, max_leafs=15)
print(tree)

MyTreeClf class: max_depth=10, min_samples_split=3, max_leafs=15


In [None]:
def entropy(y):
    # Вычисление энтропии
    value, counts = np.unique(y, return_counts=True)
    probabilities = counts / len(y)
    return -np.sum(probabilities * np.log2(probabilities))

def information_gain(y, y_left, y_right):
    # Вычисление прироста информации
    p = len(y_left) / len(y)
    return entropy(y) - p * entropy(y_left) - (1 - p) * entropy(y_right)

def get_best_split(X, y):
    best_ig = -1
    best_split_value = None
    best_col_name = None

    for col_name in X.columns:
        unique_values = np.sort(X[col_name].unique())
        for i in range(1, len(unique_values)):
            split_value = (unique_values[i-1] + unique_values[i]) / 2
            y_left = y[X[col_name] <= split_value]
            y_right = y[X[col_name] > split_value]

            if len(y_left) > 0 and len(y_right) > 0:
                ig = information_gain(y, y_left, y_right)
                if ig > best_ig:
                    best_ig = ig
                    best_split_value = split_value
                    best_col_name = col_name

    return best_col_name, best_split_value, best_ig