# Контрольная работа №1

Время на выполнение 1 час 20 минут.

In [5]:
import pandas as pd
import numpy as np
from sklearn.datasets import load_wine

## Задание №1 (3 балла)

Реализовать алгоритм классификации KNN для мультиклассовой классификации с методом `predict_proba` для предсказания вероятностей классов.

Использовать датасет Wine (`load_wine` из `sklearn.datasets`).

In [47]:
class KNN:
    def __init__(self, k):
        self.k = k
        self.scaler = StandardScaler()

    def fit(self, X_train, y_train):
        self.scaler.fit(X_train)
        self.X_train = self.scaler.transform(X_train)
        self.y_train = y_train
        self.classes_ = np.unique(y_train)

    def euclidian_distance(self, a, b):
        return np.sqrt(np.sum((a - b)**2))

    def predict_proba(self, X_test):
        transformed_X_test = self.scaler.transform(X_test)

        distance_matrix = np.zeros((X_test.shape[0], self.X_train.shape[0]))
        for i in range(len(X_test)):
            for j in range(len(self.X_train)):
                distance_matrix[i][j] = self.euclidian_distance(transformed_X_test[i], self.X_train[j])

        idx_matrix = np.argsort(distance_matrix, axis=1)[:, :self.k]

        neighbors_classes = self.y_train[idx_matrix]

        proba = np.zeros((X_test.shape[0], len(self.classes_)))
        for i in range(X_test.shape[0]):
            counts = np.bincount(neighbors_classes[i], minlength=len(self.classes_))
            proba[i] = counts / self.k

        return proba

    def predict(self, X_test):
        proba = self.predict_proba(X_test)
        y_pred = self.classes_[np.argmax(proba, axis=1)]
        return y_pred

In [48]:
def preprocess_data_sklearn():
  data = load_wine()
  X = data.data
  y = data.target
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

  return X_train, X_test, y_train, y_test

# Мое решение

In [46]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, f1_score

X_train, X_test, y_train, y_test = preprocess_data_sklearn()

knn = KNN(k=5)
knn.fit(X_train, y_train)
y_pred = knn.predict(X_test)

print("MSE:", mean_squared_error(y_test, y_pred))
print("MAE:", mean_absolute_error(y_test, y_pred))
print("F1 score (weighted):", f1_score(y_test, y_pred, average='weighted'))

MSE: 0.05555555555555555
MAE: 0.05555555555555555
F1 score (weighted): 0.9436036129748098


# Sklearn

In [50]:
from sklearn.neighbors import KNeighborsClassifier

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

sk_knn = KNeighborsClassifier(n_neighbors=5)
sk_knn.fit(X_train_scaled, y_train)
y_pred_sk = sk_knn.predict(X_test_scaled)

print("MSE:", mean_squared_error(y_test, y_pred_sk))
print("MAE:", mean_absolute_error(y_test, y_pred_sk))
print("F1 score (weighted):", f1_score(y_test, y_pred_sk, average='weighted'))

MSE: 0.05555555555555555
MAE: 0.05555555555555555
F1 score (weighted): 0.9436036129748098


## Задание №2 (3 балла)

В линейной регрессии оценить влияние признаков на результат.

Использовать датасет <https://archive.ics.uci.edu/dataset/10/automobile> (из файла `imports-85.data`)

In [24]:
def preprocess_data(data):
  # Preprocessing from homework 2
  data.loc[:, data.dtypes == "float64"] = data.loc[:, data.dtypes == "float64"].fillna(
      data.loc[:, data.dtypes == "float64"].mean(axis=0)
  )
  data.loc[:, data.dtypes == "object"] = data.loc[:, data.dtypes == "object"].fillna("")
  data_encoded = pd.get_dummies(data)

  data_encoded.columns = data_encoded.columns.astype(str)
  X = data_encoded.drop(columns=["25"])
  y = data_encoded["25"]
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=3)

  scaler = RobustScaler()
  scaler.fit(X_train)

  X_train = scaler.transform(X_train)
  X_test = scaler.transform(X_test)

  return X_train, X_test, y_train, y_test

## Задание №3 (4 балла)

Изменить решающее дерево под задачу регрессии (минимизация MSE).

Использовать тот же датасет, что и в предыдущем задании.

In [32]:
class MyDecisionTree:
    LEAF = 1
    NON_LEAF = 0

    def __init__(self, max_depth=5, criterion="mse", min_samples_split=2) -> None:
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.criterion = criterion
        self.tree = dict()
        self.feature_importance = None

    def fit(self, X_train, y_train):
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        self.feature_importance = np.zeros(X_train.shape[1])
        self.fit_node(X_train, y_train, 0, 0)

    def _entropy(self, y):
        p = np.bincount(y) / len(y)
        p = p[np.where(p > 0)[0]]
        return -np.sum(p * np.log(p))

    def _gini(self, y):
        p = np.bincount(y) / len(y)
        return np.sum(p * (1 - p))

    def get_entropy_score(self, y_left, y_right):
        Hs = []
        for split in (y_left, y_right):
            H = self._entropy(split)
            R = len(split) / (len(y_left) + len(y_right))
            Hs.append(R * H)
        return np.sum(Hs)

    def get_gini_score(self, y_left, y_right):
        Hs = []
        for split in (y_left, y_right):
            H = self._gini(split)
            R = len(split) / (len(y_left) + len(y_right))
            Hs.append(R * H)
        return np.sum(Hs)

    def get_mse_score(self, y_left, y_right):
        left_mean = np.mean(y_left) if len(y_left) > 0 else 0
        right_mean = np.mean(y_right) if len(y_right) > 0 else 0
        mse_left = np.sum((y_left - left_mean) ** 2)
        mse_right = np.sum((y_right - right_mean) ** 2)
        return mse_left + mse_right

    def find_threshold(self, X, y: np.ndarray):
        min_criterion = np.inf
        feature_split = -1
        threshold_split = -1

        if self.criterion == "gini":
            H = self._gini(y)
            criterion = self.get_gini_score
        elif self.criterion == "entropy":
            H = self._entropy(y)
            criterion = self.get_entropy_score
        elif self.criterion == "mse":
            mean_y = np.mean(y)
            H = np.sum((y - mean_y)**2)
            criterion = self.get_mse_score
        else:
            raise RuntimeError("invalid criterion type")

        for feat in range(X.shape[1]):
            column = X[:, feat]
            unique_values = np.unique(column)
            for value in unique_values:
                left_mask = column <= value
                right_mask = column > value
                y_left = y[left_mask]
                y_right = y[right_mask]
                current_criterion = criterion(y_left, y_right)
                if current_criterion < min_criterion:
                    feature_split = feat
                    threshold_split = value
                    min_criterion = current_criterion

        information_gain = H - min_criterion
        if feature_split != -1:
            self.feature_importance[feature_split] += information_gain
        return feature_split, threshold_split

    def make_final_leaf(self, y, node_id):
        predict = np.mean(y)
        self.tree[node_id] = [self.LEAF, predict]

    def fit_node(self, X_train, y_train, node_id, depth):
        if depth == self.max_depth - 1 or X_train.shape[0] <= self.min_samples_split:
            self.make_final_leaf(y_train, node_id)
            return

        feature_split, threshold_split = self.find_threshold(X_train, y_train)

        if feature_split == -1:
            self.make_final_leaf(y_train, node_id)
            return

        X_left = X_train[X_train[:, feature_split] <= threshold_split]
        y_left = y_train[X_train[:, feature_split] <= threshold_split]
        X_right = X_train[X_train[:, feature_split] > threshold_split]
        y_right = y_train[X_train[:, feature_split] > threshold_split]

        if X_left.shape[0] == 0 or X_right.shape[0] == 0:
            self.make_final_leaf(y_train, node_id)
            return

        self.fit_node(X_left, y_left, 2 * node_id + 1, depth + 1)
        self.fit_node(X_right, y_right, 2 * node_id + 2, depth + 1)

        self.tree[node_id] = [self.NON_LEAF, feature_split, threshold_split]

    def predict_one_elem(self, x, node_id):
        if self.tree[node_id][0] == self.LEAF:
            return self.tree[node_id][1]
        else:
            _, feature, split = self.tree[node_id]
            if x[feature] <= split:
                return self.predict_one_elem(x, 2 * node_id + 1)
            else:
                return self.predict_one_elem(x, 2 * node_id + 2)

    def predict(self, X):
        return np.array([self.predict_one_elem(x, 0) for x in X])


In [33]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.metrics import mean_squared_error

data = pd.read_csv("./imports-85.data", header=None, na_values="?")

X_train, X_test, y_train, y_test = preprocess_data(data)

In [34]:
my_tree = MyDecisionTree(max_depth=5, criterion="mse")

my_tree.fit(X_train, y_train)
mean_squared_error(y_test, my_tree.predict(X_test))

14537614.080683947

In [35]:
from sklearn.tree import DecisionTreeRegressor

clf = DecisionTreeRegressor(criterion="squared_error", max_depth=5)

clf.fit(X_train, y_train)
mean_squared_error(y_test, preds)
mean_squared_error(y_test, clf.predict(X_test))

3747941.4367754124

In [41]:
pd.DataFrame(
    {
        "columns": data_encoded.drop(columns=["25"]).columns,
        "my_tree": my_tree.feature_importance/10**10,
        "sklearn": clf.feature_importances_,
    }
).sort_values(by="my_tree", ascending=False).reset_index(drop=True)

Unnamed: 0,columns,my_tree,sklearn
0,16,0.659198,0.627070
1,13,0.249336,0.243851
2,11,0.059908,0.058835
3,10,0.021554,0.021293
4,2_bmw,0.013018,0.020618
...,...,...,...
71,2_volvo,0.000000,0.000000
72,3_diesel,0.000000,0.000000
73,1,0.000000,0.000842
74,4_std,0.000000,0.000000
