In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
import random
from sklearn import datasets

In [2]:
def my_batch_generator (X, y, my_shuffle=True, batch_size=100): 
    if my_shuffle:
        X, y = shuffle(X, y, random_state=0)
    X_batch = np.zeros(shape=(batch_size, len(X[0])))
    y_batch = np.zeros(shape=batch_size)
    
    for i in range(0, len(X) - batch_size, batch_size):
        X_batch = [X[j] for j in range(i, i+batch_size)]
        y_batch = [y[j] for j in range(i, i+batch_size)]
        yield (X_batch, y_batch)

In [3]:
def sigmoid(x):
    return 1.0 / (1 + np.exp(-x))


from sklearn.base import BaseEstimator, ClassifierMixin


class MySGDClassifier(BaseEstimator, ClassifierMixin):

    def __init__(self, batch_generator, C=1, alpha=0.01, max_epoch=20,
                 model_type='lin_reg', batch_size=1, drag=1):
        """
        batch_generator - функция генератор, которой будем создавать батчи
        batch_size      - количество элементов в батче
        C               - коэф. регуляризации
        alpha           - скорость спуска
        max_epoch       - максимальное количество эпох
        model_type      - тим модели, lin_reg или log_reg
        drag            - коэффициент торможения обучения для улучшения сходимости

        """
        self.C = C
        self.alpha = alpha
        self.max_epoch = max_epoch
        self.batch_generator = batch_generator
        self.errors_log = {'iter': [], 'loss': []}
        self.model_type = model_type
        self.batch_size = batch_size
        self.drag = drag

    def calc_loss(self, X_batch, y_batch, test_val=True):
        """
        Считаем функцию потерь по батчу 

        X_batch  - матрица объекты-признаки по батчу
        y_batch  - вектор ответов по батчу
        test_val - является ли тестовой выборкой, если является - добавляет 
                   колонку единиц для учёта w_0
        """
        if (test_val):
            X_batch = self.adds_column_of_ones(X_batch)

        loss = -1
        if (self.model_type == 'lin_reg'):
            arg = [(y_batch[i] - np.dot(self.weights, X_batch[i])) ** 2 for i in range(len(X_batch))]
            loss = sum(arg) / len(y_batch)

        elif (self.model_type == 'log_reg'):
            arg = 0
            for i in range(len(X_batch)):
                s = sigmoid(np.dot(self.weights, X_batch[i]))
                arg += -y_batch[i] * np.log(s) - (1 - y_batch[i]) * np.log(1 - s)
            loss = arg / len(X_batch)

        return loss + sum(self.weights[1:] ** 2) / self.C

    def calc_loss_grad(self, X_batch, y_batch):
        """
        Считаем градиент функции потерь по батчу (то что Вы вывели в задании 1)

        X_batch - матрица объекты-признаки по батчу
        y_batch - вектор ответов по батчу
        """
        res = np.zeros(shape=len(self.weights))
        loss_grad = 0
        if (self.model_type == 'lin_reg'):
            for i in range(len(X_batch)):
                arg = np.dot(self.weights, X_batch[i]) - y_batch[i]
                res += 2 * X_batch[i] * arg / len(X_batch[0]) + 2 * sum(self.weights[1:]) / self.C
            loss_grad = -res / len(X_batch)

        elif (self.model_type == 'log_reg'):
            for i in range(len(X_batch)):
                res += X_batch[i] * (y_batch[i] - sigmoid(np.dot(self.weights, X_batch[i])))
            loss_grad = res / len(X_batch)

        return loss_grad

    def update_weights(self, new_grad):
        """
        Обновляем вектор весов
        new_grad - градиент по батчу
        """
        self.weights = (self.weights + self.alpha * new_grad) / (1 + 1 / self.C)
        pass

    def adds_column_of_ones(self, X):
        """
        добавляет колонку единиц для учёта w_0
        """
        a = np.ones(len(X), dtype=int).reshape(len(X), 1)
        X = np.column_stack((a, X))
        return X

    def fit(self, X, y):
        """
        Обучение модели
        X - матрица объекты-признаки
        y - вектор ответов
        """
        self.cl_names = np.array(list(map(int, np.unique(y))))
        fit_complete = 0;
        y = [int(y[i]) for i in range(len(y))]
        self.weights = np.array([random.uniform(-0.1, 0.1) for i in range(len(X[0]) + 1)])
        self.result = np.zeros(shape=10000)
        iteration = 0
        X = self.adds_column_of_ones(X)

        for n in range(0, self.max_epoch):
            X, y = shuffle(X, y, random_state=0)
            prev_weights = self.weights
            new_epoch_generator = self.batch_generator(X, y, self.batch_size)
            for batch_num, new_batch in enumerate(new_epoch_generator):
                X_batch = new_batch[0]
                y_batch = new_batch[1]

                batch_loss = self.calc_loss(X_batch, y_batch, test_val=False)
                if (iteration < 10000):
                    self.result[iteration] = batch_loss
                    iteration += 1

                batch_grad = self.calc_loss_grad(X_batch, y_batch)  #

                prev_batch_grad = batch_grad
                self.update_weights(batch_grad)
                self.errors_log['iter'].append(batch_num)
                self.errors_log['loss'].append(batch_loss)
            self.alpha /= self.drag
            #print(n, ' = ', self.calc_loss(X, y, test_val=False))
            if (np.sum(abs(prev_weights - self.weights)) < 0.e-4):
                #print('last epoch: ', n)
                break;
            if (fit_complete):
                break;
        return self

    def predict(self, X):
        """
        Предсказание класса
        X - матрица объекты-признаки
        """
        X = self.adds_column_of_ones(X)
        y_hat = [np.dot(X[i], self.weights) for i in range(len(X))]
        self.proba = [self.cl_names[np.argmin(abs(self.cl_names - y_hat[i]))] for i in range(len(y_hat))]
        return y_hat

    def score(self, X, y):
        """
        считает точность фактического предсказания принадлежности объекта классу
        """
        tp = 0
        fp = 0
        tn = 0
        fn = 0
        for i in range(len(y)):
            if (y[i] == self.proba[i] and self.proba[i] == 1):
                tp += 1
            if (y[i] == self.proba[i] and self.proba[i] == 0):
                tn += 1
            if (y[i] != self.proba[i] and self.proba[i] == 1):
                fp += 1
            if (y[i] == self.proba[i] and self.proba[i] == 0):
                fn += 1
                
        if (tp + fp == 0):
            precision = 0
        else:
            precision = tp / (tp + fp)
            
        if (tp + fn == 0):
            recall = 0
        else:
            recall = tp / (tp + fn)
            
        if (precision + recall == 0):
            s = 0
        else:
            s = 2 * precision * recall / (precision + recall)
            
        return s 

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.01, stratify=y)
my_clf = MySGDClassifier(batch_generator=my_batch_generator, max_epoch=1, alpha=0.5, C=10000, batch_size=50)
%time my_clf.fit(X_train, y_train)
%time my_clf.calc_loss(X_test, y_test)
my_clf.predict(X_test)
my_clf.score(X_test, y_test)

In [None]:
import pylab
fig = pylab.figure(figsize = (13, 7))
ax = fig.add_subplot(111) 
ax.plot(my_clf.result)
pylab.ylim(0, 1)
pylab.xlim(0, 2000)
pylab.xlabel('iteration', size=18)
pylab.ylabel('calc_loss', size=18)
ax.set_title('grafic', size=22)

pylab.show()
#резкое падение ошибки до нуля - конец обучения

In [4]:
from sklearn.metrics import f1_score
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
print(scaler.fit(X_train))
print(scaler.mean_)
print(X_train[:10])
X_train = scaler.transform(X_train)
print(X_train[:10])

NameError: name 'X_train' is not defined

In [None]:
X = X_train
y = y_train
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.01, stratify=y)
max_score = 0
max_alpha = 0
max_batch_size = 0
max_max_epoch = 0
max_drag = 0


for alpha_test in range(1, 30, 5):
    print('reach alpha: ', alpha_test)
    for batch_size_test in range(1, 200, 20):
        for max_epoch_test in range(1, 31, 5):
            for drag_test in range(101, 120, 4):
                
                my_clf = MySGDClassifier(batch_generator=my_batch_generator, 
                                         model_type='log_reg', 
                                         max_epoch=max_epoch_test, 
                                         alpha=alpha_test / 100, 
                                         C=200000, 
                                         batch_size=batch_size_test, 
                                         drag=drag_test / 100)
                my_clf.fit(X_train, y_train)
                my_clf.predict(X_test)
                s = my_clf.score(X_test, y_test)
                if max_score < s:
                    max_score = s
                    max_alpha = alpha_test
                    max_batch_size = batch_size_test
                    max_max_epoch = max_epoch_test
                    max_drag = drag_test

print('score: ', max_score)
print('alpha: ', max_alpha)
print('batch_size: ', max_batch_size)
print('max_epoch: ', max_max_epoch)
print('drag: ', max_drag)