In [41]:
import numpy as np
from sklearn.datasets import make_multilabel_classification
from sklearn.model_selection import train_test_split
import time

birthday = 25
n_classes = 3 + birthday // 5 # ДР 25.07.2004 => 8
X, y = make_multilabel_classification(n_samples=1000, n_features=10, n_classes=n_classes,
                                     n_labels=2, random_state=42)
y_multiclass = np.argmax(y, axis=1)
X_train, X_test, y_train, y_test = train_test_split(X, y_multiclass, test_size=0.3, random_state=42)

def sigmoid(x):
    return 1 / (1 + np.exp(-np.clip(x, -250, 250)))

class BinaryLogisticRegression:
    def __init__(self, learning_rate=0.01, n_iter=1000, class_weight=None):
        self.learning_rate = learning_rate
        self.n_iter = n_iter
        self.weights = None
        self.bias = None
        self.class_weight = class_weight

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        if self.class_weight is not None:
            sample_weights = np.where(y == 1, self.class_weight[1], self.class_weight[0])
        else:
            sample_weights = np.ones(n_samples)

        for _ in range(self.n_iter):
            linear_model = np.dot(X, self.weights) + self.bias
            predictions = sigmoid(linear_model)

            error = predictions - y

            dw = (1 / n_samples) * np.dot(X.T, sample_weights * error)
            db = (1 / n_samples) * np.sum(sample_weights * error)

            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db

    def predict_proba(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        return sigmoid(linear_model)

    def predict(self, X, threshold=0.5):
        return (self.predict_proba(X) >= threshold).astype(int)

class OneVsRestClassifier:
    def __init__(self, base_classifier, class_weights=None):
        self.base_classifier = base_classifier
        self.classifiers = []
        self.class_weights = class_weights

    def fit(self, X, y):

        self.classes_ = np.unique(y)
        self.classifiers = []

        for class_idx in self.classes_:

            y_binary = (y == class_idx).astype(int)

            if self.class_weights is not None:
                weight_0 = self.class_weights.get('all', 1.0)
                weight_1 = self.class_weights.get(class_idx, 1.0)
                class_weight = {0: weight_0, 1: weight_1}
            else:
                class_weight = None

            classifier = BinaryLogisticRegression(class_weight=class_weight)
            classifier.fit(X, y_binary)
            self.classifiers.append(classifier)

        return self

    def predict_proba(self, X):
        probabilities = []
        for classifier in self.classifiers:

            prob = classifier.predict_proba(X)
            probabilities.append(prob)

        prob_matrix = np.column_stack(probabilities)
        exp_probs = np.exp(prob_matrix - np.max(prob_matrix, axis=1, keepdims=True))
        return exp_probs / np.sum(exp_probs, axis=1, keepdims=True)

    def predict(self, X):
        probabilities = self.predict_proba(X)

        return np.argmax(probabilities, axis=1)

class OneVsOneClassifier:
    def __init__(self, base_classifier):
        self.base_classifier = base_classifier
        self.classifiers = {}

    def fit(self, X, y):
        self.classes_ = np.unique(y)
        self.classifiers = {}

        for i, class1 in enumerate(self.classes_):
            for class2 in self.classes_[i+1:]:

                mask = (y == class1) | (y == class2)
                X_pair = X[mask]
                y_pair = y[mask]

                y_binary = (y_pair == class1).astype(int)

                classifier = BinaryLogisticRegression()
                classifier.fit(X_pair, y_binary)
                self.classifiers[(class1, class2)] = classifier

        return self

    def predict(self, X):
        n_samples = X.shape[0]
        votes = np.zeros((n_samples, len(self.classes_)))

        for (class1, class2), classifier in self.classifiers.items():
            predictions = classifier.predict(X)

            for i in range(n_samples):
                if predictions[i] == 1:
                    votes[i, class1] += 1
                else:
                    votes[i, class2] += 1

        return np.argmax(votes, axis=1)

# Сравнение методов по времени:
start_time = time.time()
ovr_classifier = OneVsRestClassifier(BinaryLogisticRegression(learning_rate=0.1, n_iter=1000))
ovr_classifier.fit(X_train, y_train)
ovr_train_time = time.time() - start_time

start_time = time.time()
ovo_classifier = OneVsOneClassifier(BinaryLogisticRegression(learning_rate=0.1, n_iter=1000))
ovo_classifier.fit(X_train, y_train)
ovo_train_time = time.time() - start_time

print(f'OneVsRestClassifier: train_time = {ovr_train_time} seconds')
print(f'OneVsOneClassifier: train_time = {ovo_train_time} seconds')

print(f'Winner by time is {"OneVsOneClassifier" if ovr_train_time > ovo_train_time else "OneVsRestClassifier"} with delta = {abs(ovo_train_time-ovr_train_time)}\n\n')

# Сравнение методов по точности

y_pred = ovr_classifier.predict(X_train)
true_values = np.sum(y_pred == y_train)
accuracy = true_values / len(X_train)
print(f"OneVsRestClassifier: accuracy = {accuracy}")

y_pred_2 = ovo_classifier.predict(X_train)
true_values_2 = np.sum(y_pred_2 == y_train)
accuracy2 = true_values_2 / len(X_train)
print(f"OneVsOneClassifier: accuracy = {accuracy2}")

print(f'Winner by accuracy is {"OneVsOneClassifier" if accuracy2 > accuracy else "OneVsRestClassifier"} with delta = {abs(ovo_train_time-ovr_train_time)}\n\n')

# Реализация f1

def calculate_f1_multiclass(y_true, y_pred, average='macro'):
    classes = np.unique(y_true)
    n_classes = len(classes)

    TP = np.zeros(n_classes)
    FP = np.zeros(n_classes)
    FN = np.zeros(n_classes)

    for i, cls in enumerate(classes):
        TP[i] = np.sum((y_true == cls) & (y_pred == cls))
        FP[i] = np.sum((y_true != cls) & (y_pred == cls))
        FN[i] = np.sum((y_true == cls) & (y_pred != cls))

    precision = np.zeros(n_classes)
    recall = np.zeros(n_classes)
    f1_scores = np.zeros(n_classes)

    for i in range(n_classes):
        if TP[i] + FP[i] > 0:
            precision[i] = TP[i] / (TP[i] + FP[i])
        else:
            precision[i] = 0

        if TP[i] + FN[i] > 0:
            recall[i] = TP[i] / (TP[i] + FN[i])
        else:
            recall[i] = 0

        if precision[i] + recall[i] > 0:
            f1_scores[i] = 2 * precision[i] * recall[i] / (precision[i] + recall[i])
        else:
            f1_scores[i] = 0

    if average == 'macro':
        return np.mean(f1_scores)
    elif average == 'weighted':

        class_counts = [np.sum(y_true == cls) for cls in classes]
        weights = np.array(class_counts) / len(y_true)
        return np.average(f1_scores, weights=weights)
    elif average == 'micro':
        micro_precision = np.sum(TP) / (np.sum(TP) + np.sum(FP)) if (np.sum(TP) + np.sum(FP)) > 0 else 0
        micro_recall = np.sum(TP) / (np.sum(TP) + np.sum(FN)) if (np.sum(TP) + np.sum(FN)) > 0 else 0
        if micro_precision + micro_recall > 0:
            return 2 * micro_precision * micro_recall / (micro_precision + micro_recall)
        else:
            return 0

f1_ovr = calculate_f1_multiclass(y_train, ovr_classifier.predict(X_train))
f1_ovo = calculate_f1_multiclass(y_train, ovo_classifier.predict(X_train))

print(f'OneVsRestClassifier: F1-score = {f1_ovr:.4f}')
print(f'OneVsOneClassifier: F1-score = {f1_ovo:.4f}')
print(f'Winner by f1 is {"OneVsOneClassifier" if f1_ovo > f1_ovr else "OneVsRestClassifier"}\n\n')

# реализация ROC-кривой, которая показывает соотношение между TP Rate и FP Rate для пороговых значений

def calculate_roc_auc_multiclass(y_true, y_pred_proba, average='macro'):
    classes = np.unique(y_true)
    roc_auc_scores = []

    for i, cls in enumerate(classes):
        y_true_binary = (y_true == cls).astype(int)
        y_score = y_pred_proba[:, i]


        sorted_indices = np.argsort(y_score)[::-1]
        y_true_sorted = y_true_binary[sorted_indices]
        y_score_sorted = y_score[sorted_indices]


        thresholds = np.unique(y_score_sorted)
        thresholds = np.append(thresholds, 1.1)
        thresholds = np.append(-0.1, thresholds)
        thresholds = np.sort(thresholds)[::-1]

        TPR = [0.0]
        FPR = [0.0]

        total_positives = np.sum(y_true_binary)
        total_negatives = len(y_true_binary) - total_positives

        for threshold in thresholds:
            y_pred_binary = (y_score_sorted >= threshold).astype(int)

            TP = np.sum((y_true_sorted == 1) & (y_pred_binary == 1))
            FP = np.sum((y_true_sorted == 0) & (y_pred_binary == 1))

            tpr = TP / total_positives if total_positives > 0 else 0
            fpr = FP / total_negatives if total_negatives > 0 else 0

            TPR.append(tpr)
            FPR.append(fpr)


        auc_score = 0
        for j in range(1, len(FPR)):
            auc_score += (FPR[j] - FPR[j-1]) * (TPR[j] + TPR[j-1]) / 2

        roc_auc_scores.append(auc_score)

    if average == 'macro':
        return np.mean(roc_auc_scores)
    elif average == 'weighted':
        class_counts = [np.sum(y_true == cls) for cls in classes]
        weights = np.array(class_counts) / len(y_true)
        return np.average(roc_auc_scores, weights=weights)
    else:
        return np.mean(roc_auc_scores)

y_proba_ovr = ovr_classifier.predict_proba(X_train)
y_proba_ovo = np.zeros((len(X_train), len(np.unique(y_train))))

ovo_votes = np.zeros((len(X_train), len(np.unique(y_train))))
for (class1, class2), classifier in ovo_classifier.classifiers.items():
    predictions = classifier.predict_proba(X_train)
    for i in range(len(X_train)):
        ovo_votes[i, class1] += predictions[i]
        ovo_votes[i, class2] += (1 - predictions[i])

def softmax(x, axis=1):
    x_exp = np.exp(x - np.max(x, axis=axis, keepdims=True))
    return x_exp / np.sum(x_exp, axis=axis, keepdims=True)

y_proba_ovo = softmax(ovo_votes, axis=1)

roc_auc_ovr = calculate_roc_auc_multiclass(y_train, y_proba_ovr)
roc_auc_ovo = calculate_roc_auc_multiclass(y_train, y_proba_ovo)

print(f'OneVsRestClassifier: ROC-AUC = {roc_auc_ovr:.4f}')
print(f'OneVsOneClassifier: ROC-AUC = {roc_auc_ovo:.4f}')
print(f'Winner by ROC-AUC is {"OneVsOneClassifier" if roc_auc_ovo > roc_auc_ovr else "OneVsRestClassifier"}\n\n')

# реализация PR-AUC

def calculate_pr_auc_multiclass(y_true, y_pred_proba, average='macro'):
    classes = np.unique(y_true)
    pr_auc_scores = []

    for i, cls in enumerate(classes):
        y_true_binary = (y_true == cls).astype(int)
        y_score = y_pred_proba[:, i]

        sorted_indices = np.argsort(y_score)[::-1]
        y_true_sorted = y_true_binary[sorted_indices]
        y_score_sorted = y_score[sorted_indices]

        thresholds = np.unique(y_score_sorted)
        thresholds = np.append(1.1, thresholds)
        thresholds = np.append(thresholds, -0.1)
        thresholds = np.sort(thresholds)[::-1]

        precision = [1.0]
        recall = [0.0]

        total_positives = np.sum(y_true_binary)

        for threshold in thresholds:
            y_pred_binary = (y_score_sorted >= threshold).astype(int)

            TP = np.sum((y_true_sorted == 1) & (y_pred_binary == 1))
            FP = np.sum((y_true_sorted == 0) & (y_pred_binary == 1))

            prec = TP / (TP + FP) if (TP + FP) > 0 else 0
            rec = TP / total_positives if total_positives > 0 else 0

            precision.append(prec)
            recall.append(rec)


        auc_score = 0
        for j in range(1, len(recall)):
            auc_score += (recall[j] - recall[j-1]) * (precision[j] + precision[j-1]) / 2

        pr_auc_scores.append(auc_score)

    if average == 'macro':
        return np.mean(pr_auc_scores)
    elif average == 'weighted':
        class_counts = [np.sum(y_true == cls) for cls in classes]
        weights = np.array(class_counts) / len(y_true)
        return np.average(pr_auc_scores, weights=weights)
    else:
        return np.mean(pr_auc_scores)

pr_auc_ovr = calculate_pr_auc_multiclass(y_train, y_proba_ovr)
pr_auc_ovo = calculate_pr_auc_multiclass(y_train, y_proba_ovo)

print(f'OneVsRestClassifier: PR-AUC = {pr_auc_ovr:.4f}')
print(f'OneVsOneClassifier: PR-AUC = {pr_auc_ovo:.4f}')
print(f'Winner by PR-AUC is {"OneVsOneClassifier" if pr_auc_ovo > pr_auc_ovr else "OneVsRestClassifier"}')


OneVsRestClassifier: train_time = 0.11684012413024902 seconds
OneVsOneClassifier: train_time = 0.24438881874084473 seconds
Winner by time is OneVsRestClassifier with delta = 0.1275486946105957


OneVsRestClassifier: accuracy = 0.67
OneVsOneClassifier: accuracy = 0.7071428571428572
Winner by accuracy is OneVsOneClassifier with delta = 0.1275486946105957


OneVsRestClassifier: F1-score = 0.6323
OneVsOneClassifier: F1-score = 0.7251
Winner by f1 is OneVsOneClassifier


OneVsRestClassifier: ROC-AUC = 0.9289
OneVsOneClassifier: ROC-AUC = 0.9448
Winner by ROC-AUC is OneVsOneClassifier


OneVsRestClassifier: PR-AUC = 0.7510
OneVsOneClassifier: PR-AUC = 0.7750
Winner by PR-AUC is OneVsOneClassifier


Задание №4: Проанализировать проблему несбалансированных классификаций.

В случаях, когда один или несколько классов имеют значительно большее количество представителей, высокая accuracy не значит, что модель качественная, а в некоторых случаях модель с высоким accuracy может оказаться вовсе бесполезной.


При использовании OneVsRestClassifier и несбалансированных данных модель быстро научится всегда предсказывать доминирующий класс, игнорируя "Rest", при этом вовсе не научится распозновать остальные классы. В итоге общая модель будет иметь сильное смещение в сторону мажорных классов и крайне низкую полноту для минорных классов.

При использовании OneVsOneClassifier. Допустим доминирующий класс есть, его примеров 1000, кроме него есть еще 4 класса по 100, 110, 105, 95 соответсвенно. Тогда всего моделей 10 (5*4/2), 4 из которых с доминирующим классом, а остальные между собой. Тогда 6/10 моделей в хороших условиях, что говорит о том, что остальные классы тоже будут хорошо изучены. В итоге будет тоже низкая полнота для минорных классов. То есть справится лучше OneVsRestClassifier, но далеко не идеально.

Данные выводы подтверждены в результате выполнения кода, в котором OneVsOneClassifier показал лучший результат, хоть и обучался дольше из-за большего кол-ва классификаторов.

Кроме того, можно сделать вывод, что accuracy как метрика является бесполезной в случае несбалансированных данных, а f1, и pr_auc позволяют увидеть картину о качестве работы модели с минорными классами более ясно. В свою очередь roc_auc тоже зависит от мажорного класса, хоть и учитывает False Positive Rate, поэтому эта метрика может быть излишне завышенной при оценке модели.

Для уменьшения эффекта несбалансированных данных можно использовать веса для классов, которые обратно пропорциональны их количеству. Это позволит "выровнять" классы между собой.