# Интерфейсы scikit-learn

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin, OneToOneFeatureMixin
from sklearn.metrics import r2_score
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
import numpy as np
import pandas as pd
from numpy.typing import NDArray
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer

In [None]:
import yaml

with open('../config.yaml', 'r') as f:
    cfg = yaml.safe_load(f)

## Estimator

Для примера построим простой estimator, который в перспективе будет вычитать из признаков их среднее значение и после сдвигать признаки на заранее заданную константу

In [None]:
class SubtractMeanAndShiftEstimator(BaseEstimator):
    def __init__(self, shift=0.):
        self.shift: float = shift
        self.means_: NDArray = None  # we add a trailing underscore for parameters which will be learnt in fit()

    def fit(self, X: NDArray, y: NDArray = None):
        # y is ignored here
        self.means_ = X.mean(axis=0)  # the first axis corresponds to samples by default
        return self

In [None]:
m = SubtractMeanAndShiftEstimator(shift=3)

Метод `get_params()` реализован в `BaseEstimator`, и мы можем сразу использовать его для получения гиперпараметров модели. Это возможно, так как единственный гиперпараметр `shift` был передан как явное ключевое слово в контрукторе

Обратите внимание, что соответствующий аттрибут класса должен совпадать с ключевым словом: `self.shift = shift`

In [None]:
m.get_params()

Аналогично мы можем использовать `set_params()` для задания значений гиперпараметров. Этот метод пригодится при поиске оптимальных значений гиперпараметров

In [None]:
m.set_params(shift=5)
m.get_params()

In [None]:
X = np.array([
    [1, 10],
    [3, 30],
    [2, 20],
])
y = np.array([
    [ 0, -8],
    [ 2, 10],
    [ 1,  1],
])
m.fit(X, y)
print(m.means_)

В sklearn есть класс sklearn.base.OutlierMixin, который позволяет реализовывать кастомные классы для определения выбросов.
Он добавляет:
- атрибут _estimator_type, по умолчанию outlier_detector
- fit_predict.

Метод fit() работает в формате без учителя, predict же должен классифицировать данные на аутлаеры (возвращать для них -1) и обыычные данные (возвращать 1). Для классификации используется отсечка по порогу предсказаний, полученных внутренним.
Во встроенных методах функция оценки доступна с помощью метода `score_samples`, в то время как порог можно задать параметром `contamination`. 
Например, для гауссовских данных можно использовать sklearn.covariance.EllipticEnvelope.

**Задание**: Создайте свой эстиматор с использованием sklearn.base.OutlierMixin, который будет определять выбросы на основе интерквартильного размаха. 
Он должен возвращать один столбец с 1 и -1, а также позволять задавать порог для квантиля, определяющего размах. Не забудьте, что он должен быть двухсторонним.
Ваш эстиматор должен работать и для датафреймов, и для numpy массивов.

In [None]:
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, OutlierMixin

class MyEstimator(BaseEstimator, OutlierMixin):
    def __init__(self, quantile_range=1.5):
        self.quantile_range = quantile_range
        self.lower_bound_ = None
        self.upper_bound_ = None

    def fit(self, X, y=None):
        X = self._convert_to_array(X)
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        IQR = Q3 - Q1

        self.lower_bound_ = Q1 - self.quantile_range * IQR
        self.upper_bound_ = Q3 + self.quantile_range * IQR
        return self

    def predict(self, X):
        X = self._convert_to_array(X)
        is_outlier = (X < self.lower_bound_) | (X > self.upper_bound_)
        return np.where(is_outlier.any(axis=1), -1, 1)

    def _convert_to_array(self, X):
        if isinstance(X, pd.DataFrame):
            return X.to_numpy()
        return np.array(X)

In [None]:
X = np.array([
    [10, 200],
    [15, 220],
    [50, 10000],
    [12, 205],
    [14, 210],
    [100, 4000]
])

detector = MyEstimator(quantile_range=1.5)
detector.fit(X)

preds = detector.predict(X)
print(preds)

## Predictor

Рассмотрим тот же класс, но добавим к нему методы `predict()` и `score()`

In [None]:
class SubtractMeanAndShiftPredictor(BaseEstimator):
    def __init__(self, shift=0.):
        self.shift: float = shift
        self.means_: NDArray = None  # we add a trailing underscore for parameters which will be learnt in fit()

    def fit(self, X: NDArray, y: NDArray = None):
        # y is ignored here
        self.means_ = X.mean(axis=0)  # the first axis corresponds to samples by default
        return self

    def predict(self, X: NDArray) -> NDArray:
        e = np.ones((X.shape[0], 1))
        return X -  e @ self.means_.reshape(-1, 1).T + self.shift

    def score(self, X: NDArray, y: NDArray) -> float:
        return r2_score(y, self.predict(X))  # R2 \in (-\infty; 1] is the coefficient of determination

Так как мы специально добавили небольшое отклонение в y, наш R2 чуть меньше 1

In [None]:
model = SubtractMeanAndShiftPredictor(shift=1)
model.fit(X)
model.predict(X)

## Transformer

Рассмотрим тот же класс, но добавим к нему метод `transform()`

In [None]:
class SubtractMeanAndShiftTransformer(BaseEstimator, OneToOneFeatureMixin, TransformerMixin):
    def __init__(self, shift=0.):
        self.shift: float = shift
        self.means_: NDArray = None  # we add a trailing underscore for parameters which will be learnt in fit()

    def fit(self, X: NDArray, y: NDArray = None):
        # y is ignored here
        self.means_ = X.mean(axis=0)  # the first axis corresponds to samples by default
        return self

    def transform(self, X: NDArray) -> NDArray:
        e = np.ones((X.shape[0], 1))
        return X -  e @ self.means_.reshape(-1, 1).T + self.shift

In [None]:
t = SubtractMeanAndShiftTransformer(shift=5)
t.fit(X)
t.transform(X)

Так как мы добавили `TransformerMixin`, мы можем использовать метод `fit_transform()`, не реализуя его явно

In [None]:
t.fit_transform(X)

Аналогично мы можем использовать метод `get_feature_names_out()`, так как мы добавили `OneToOneFeatureMixin`

In [None]:
t.get_feature_names_out(input_features=['x', 'y'])

Используем датасет с домами как пример. Вспомним, что мы делали в прошлый раз, и попробуем заполнить пропущенные значения в некоторых числовызх столбцах.
Для этого используем трансформер по столбцам. 


In [None]:
df = pd.read_csv(cfg['house_pricing']['train_dataset'])
df.head()

По умолчанию, только указанные столбцы трансформируются и возвращаются (remainder=`drop`). Мы же сделаем так, чтобы все остальные столбцы тоже возвращались, просто с ними бы ничего не делалось. 

In [None]:
ct = ColumnTransformer(
    [('mean_impute', SimpleImputer(strategy='mean'), ['SalePrice', 'LotArea', 'WoodDeckSF',  'MasVnrArea'])], 
    remainder="passthrough", force_int_remainder_cols=False)

ct.fit(df)

In [None]:
ct.transform(df)

In [None]:
ct.set_output(transform='pandas')

Если у датасета появятсся столбцы, которые не были представлены во время fit (даже среди тех, что не трансформировались), то они будут выкинуты на этапе transform. 

In [None]:
df["temp"] = 0
ct.transform(df)

**Задание**: Перейдите к медианному заполнению пропусков. Проверьте, что результаты, полученные с помощью трансформации, соответствуют преобразованию напрямую.

In [None]:
df = pd.read_csv(cfg['house_pricing']['train_dataset'])
df.head()

ct_median = ColumnTransformer(
    [('median_impute', SimpleImputer(strategy='median'), ['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea'])], 
    remainder="passthrough"
)

ct_median.fit(df)
df_transformed_ct = ct_median.transform(df)

imputer_median = SimpleImputer(strategy='median')
df_direct_impute = df.copy()

df_direct_impute[['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea']] = imputer_median.fit_transform(df_direct_impute[['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea']])

comparison = (df_transformed_ct[:, :4] == df_direct_impute[['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea']].to_numpy())

print(comparison.all())

**Задание**: Добавьте еще нормализатор для LotFrontage, LotArea и запустите в ColumnTransformer. Обучите его и примените.

In [None]:
from sklearn.preprocessing import StandardScaler

ct = ColumnTransformer(
    [('median_impute', SimpleImputer(strategy='median'), ['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea']),
     ('normalize', StandardScaler(), ['LotFrontage', 'LotArea'])],
    remainder="passthrough",
    verbose_feature_names_out=True
)

ct.set_output(transform='pandas')
df_transformed = ct.fit_transform(df)

df_direct = df.copy()

df_direct[['median_impute__SalePrice', 'median_impute__LotArea', 'median_impute__WoodDeckSF', 'median_impute__MasVnrArea']] = \
df_direct[['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea']].fillna(df[['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea']].median())

scaler = StandardScaler()
df_direct[['normalize__LotFrontage', 'normalize__LotArea']] = scaler.fit_transform(df_direct[['LotFrontage', 'LotArea']])

changed_columns = ['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea', 'LotFrontage']
remaining_columns = [col for col in df.columns if col not in changed_columns]
df_direct[[f'remainder__{col}' for col in remaining_columns]] = df_direct[remaining_columns]

df_direct = df_direct[[f"median_impute__{col}" for col in ['SalePrice', 'LotArea', 'WoodDeckSF', 'MasVnrArea']] + 
                      [f"normalize__{col}" for col in ["LotFrontage", "LotArea"]] +
                      [f"remainder__{col}" for col in remaining_columns]]

print("Индексы совпали:", df_transformed.index.equals(df_direct.index))
print("Колонки совпали:", df_transformed.columns.equals(df_direct.columns))
print("Совпали полностью:", df_transformed.equals(df_direct))

Sklearn располагает большим количеством встроенных трансформеров. Соответствующие трансформеры есть и для категориальных фичей (более подробно рассмотрим этот тип чуть позже). Например, известное нам бинарное кодирование можно проводить с помощью OneHotEncoder()

In [None]:
from sklearn.preprocessing import OneHotEncoder
ct = ColumnTransformer(
    transformers=[
        ('median_impute', SimpleImputer(strategy='mean'), ['SalePrice', 'LotArea', 'WoodDeckSF',  'MasVnrArea']),
        ("one_hot_encode", OneHotEncoder(handle_unknown="ignore", sparse_output=False), ["MSZoning", "SaleType", "SaleCondition"]),
    ], 
    remainder="passthrough", force_int_remainder_cols=False)

ct.fit(df)

In [None]:
ct.set_output(transform='pandas')
ct.transform(df)

Для выбора столбцов можно создавать make_selector, например, по выбору численных и категориальных значений. 

**Доп. задание**. Сделайте трансформер для OneHotEncoder на основе make_selector так, чтобы выбирать все нечисловые столбцы. Сколько столбцов получается после трансформации?

In [None]:
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.preprocessing import OneHotEncoder
import pandas as pd

non_numeric_columns = make_column_selector(dtype_include='object')(df)

print(f"Количество нечисловых столбцов до трансформации: {len(non_numeric_columns)}")

ct = ColumnTransformer(
    transformers=[
        ('onehot', OneHotEncoder(sparse_output=False), make_column_selector(dtype_include='object'))
    ],
    remainder='passthrough'
)

df_transformed = ct.fit_transform(df)

df_transformed = pd.DataFrame(df_transformed)

num_columns_after_transform = df_transformed.shape[1]

print(f"Количество столбцов после трансформации: {num_columns_after_transform}")

## Pipelines

С помощью Pipeline мы можем производить последовательную обработку данных и выполнять предсказание в конце

In [None]:
X = np.array([
    [1, 10],
    [3, 30],
    [2, 20],
])
y = np.array([
    [0],
    [2],
    [1],
])

pipeline = Pipeline([
    ("shifter", SubtractMeanAndShiftTransformer(shift=5)),
    ("regressor", LinearRegression()),
])
...
pipeline.fit(X, y)
y_pred = pipeline.predict(X)
print(y_pred)

Pipeline хранит последовательные Estimators в аттрибуте `steps`

In [None]:
pipeline.steps

Перейти к объекту i-го Estimator можно напрямую через `pipeline[i]`:

In [None]:
pipeline[0]

In [None]:
pipeline[1].coef_

Так как Pipeline сам является Estimator, мы можем увидеть список его параметров:

In [None]:
pipeline.get_params()

Видно, параметры промежуточных Estimator указаны как `<estimator>__<parameter>`. Следовательно, мы можем изменить параметры любого промежуточного Estimator:

In [None]:
pipeline.set_params(shifter__shift=10)
pipeline.get_params()

**Задание**: Создайте пайплайн по преобразованию чсиленных столбцов, содержащий импьютер и скейлер.

In [None]:
X = np.array([
    [1, 10, np.nan],
    [3, 30, 3],
    [2, 20, 2],
])

pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="mean")),
    ("scaler", StandardScaler())
])

pipeline.fit(X)

X_transformed = pipeline.transform(X)

print(X_transformed)

**Задание**: Создайте новый трансформер, который для категориальных столбцов будет заполнять пропущенные значения наиболее часто встречаюмшщимся или новой категорией (сделайте параметром). 

In [None]:
from sklearn.preprocessing import FunctionTransformer

X = np.array([
    ['cat', 'dog', np.nan],
    ['dog', np.nan, 'apple'],
    ['cat', 'dog', 'banana'],
], dtype=object)

def impute_with_new_category(X, new_category='Missing'):
    return np.where(pd.isnull(X), new_category, X)

categorical_columns = [0, 1, 2]

pipeline = Pipeline([
    ('preprocessor', ColumnTransformer(transformers=[('categorical', Pipeline([('imputer', SimpleImputer(strategy='most_frequent')),
    ('new_category', FunctionTransformer(lambda X: impute_with_new_category(X, 'NewCategory')))]), categorical_columns)]))
])

pipeline.fit(X)
X_transformed = pipeline.transform(X)

print(X_transformed)

**Задание**: Создайте пайплайн по преобразованию категориальных столбцов, содержащий ваш импьютер и OneHotEncoder.

In [None]:
X = np.array([
    ['cat', 'dog', np.nan],
    ['dog', np.nan, 'apple'],
    ['cat', 'dog', 'banana'],
], dtype=object)

categorical_columns = [0, 1, 2]

pipeline = Pipeline([
    ('preprocessor', ColumnTransformer(transformers=[('categorical', Pipeline([('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))]), categorical_columns)]))
])

X_transformed = pipeline.fit_transform(X)

print(X_transformed)

**Задание**: Создайте ColumnTransformer, который будет содержать в себе два вышеуказанных пайплайна.

In [None]:
X = np.array([
    ['cat', 'dog', np.nan, 5.0],
    ['dog', np.nan, 'apple', 7.0],
    ['cat', 'dog', 'banana', np.nan],
], dtype=object)

categorical_columns = [0, 1, 2]
numerical_columns = [3]

categorical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

numerical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler())
])

preprocessor = ColumnTransformer(
    transformers=[('categorical', categorical_pipeline, categorical_columns),
    ('numerical', numerical_pipeline, numerical_columns)]
)

X_transformed = preprocessor.fit_transform(X)

print(X_transformed)

**Доп.задание**: Используйте для комбинации результатов двух отдельных трансформеров FeatureUnion

In [None]:
from sklearn.pipeline import FeatureUnion

X = np.array([
    ['cat', 'dog', np.nan, 5.0],
    ['dog', np.nan, 'apple', 7.0],
    ['cat', 'dog', 'banana', np.nan],
], dtype=object)

categorical_columns = [0, 1, 2]
numerical_columns = [3]

categorical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

numerical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler())
])

preprocessor = ColumnTransformer(
    transformers=[
        ('categorical', categorical_pipeline, categorical_columns),
        ('numerical', numerical_pipeline, numerical_columns)
    ]
)

# Дополнительный трансформер: вычисляем длину строковых значений в категориальных столбцах
def string_lengths(X):
    return np.array([[len(str(x)) for x in row] for row in X[:, categorical_columns]])

string_length_transformer = FunctionTransformer(string_lengths)

feature_union = FeatureUnion([
    ('preprocessor', preprocessor),
    ('string_lengths', string_length_transformer)
])

final_pipeline = Pipeline([
    ('feature_union', feature_union),
])

X_transformed = final_pipeline.fit_transform(X)

print(X_transformed)