# Практика по линейной регрессии и методу главных компонент

Датасет скачивать по ссылке: https://disk.yandex.ru/d/cwL3Ka4ECyQwpw

In [None]:
import yaml
import warnings

import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.linear_model import ElasticNet, Lasso, LinearRegression, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import VarianceThreshold

from sklearn.preprocessing import OneHotEncoder, RobustScaler, StandardScaler

SEED = 314159
TRAIN_TEST_SPLIT = 0.75

with open("../config.yaml", "r") as f:
    cfg = yaml.safe_load(f)

In [None]:
df_init = pd.read_csv(cfg["house_prices"]["train_dataset"])
y = df_init["SalePrice"]
df_init.drop(columns=["SalePrice", "Id"], inplace=True)

df_train, df_test, y_train, y_test = train_test_split(df_init, y, test_size = 1 - TRAIN_TEST_SPLIT, random_state=SEED)

dfs = (df_train, df_test)

### Предобработка

In [None]:
for df in dfs:
    df["Exterior2nd"] = df["Exterior2nd"].replace({"Brk Cmn": "BrkComm"})
    
    # names beginning with numbers are awkward to work with
    df.rename(
        columns={"1stFlrSF": "FirstFlrSF", "2ndFlrSF": "SecondFlrSF", "3SsnPorch": "Threeseasonporch"},
        inplace=True,
    )
    
    # удаляем скореллированные признаки
    df.drop(
        columns=["GarageYrBlt", "TotRmsAbvGrd", "FirstFlrSF", "GarageCars"],
        inplace=True
    )

### Заполнение пустых значений

In [None]:
assert not y_train.isna().sum() and not y_test.isna().sum()
assert not df_train["Neighborhood"].isna().sum() and not df_test["Neighborhood"].isna().sum()

# выбрасываем признаки, процент пропущенных значений у которых больше 80
nan_info_train = (df_train.isnull().mean() * 100).reset_index()
nan_info_train.columns = ["column_name", "percentage"]
nan80_cols = list(nan_info_train[nan_info_train.percentage > 80]["column_name"])

print("Отбрасываем", nan80_cols)
for df in dfs:
    df.drop(columns=nan80_cols, inplace=True)

print("Суммарной количество пропущенных значений", df_train.isna().sum().sum(), df_test.isna().sum().sum())

# создаем новую категорию для подмножества категориальных признаков
na_cat_cols = [
    "GarageType", "GarageFinish", "BsmtFinType2", "BsmtExposure", "BsmtFinType1",
    "GarageCond", "GarageQual", "BsmtCond", "BsmtQual", "FireplaceQu", "KitchenQual",
    "HeatingQC", "ExterQual", "ExterCond"
]
for df in dfs:
    df[na_cat_cols] = df[na_cat_cols].fillna("NA")

print("Суммарной количество пропущенных значений", df_train.isna().sum().sum(), df_test.isna().sum().sum())

# заполняем медианой с учетом группы
na_group_fill_num_cols = ["LotFrontage", "GarageArea"]
for col in na_group_fill_num_cols:
    na_group_fill_num_mapping = df_train.groupby("Neighborhood")[col].median().to_dict()
    for df in dfs:
        for neighb, fill_value in na_group_fill_num_mapping.items():
            mask = df["Neighborhood"] == neighb
            df.loc[mask, col] = df.loc[mask, col].fillna(fill_value)
            
print("Суммарной количество пропущенных значений", df_train.isna().sum().sum(), df_test.isna().sum().sum())

# заполняем модой с учетом группы
na_group_fill_cat_cols = [
    "MasVnrType", "MSZoning", "Exterior1st", "Exterior2nd", "SaleType", "Electrical", "Functional"
]
for col in na_group_fill_cat_cols:
    fill_value_gen = df_train[col].dropna().mode()[0]
    for neighb, df_group in df_train.groupby("Neighborhood"):
        mode_output = df_group[col].dropna().mode()
        fill_value = mode_output[0] if len(mode_output) else fill_value_gen
        for df in dfs:
            mask = df["Neighborhood"] == neighb
            df.loc[mask, col] = df.loc[mask, col].fillna(fill_value)

print("Суммарной количество пропущенных значений", df_train.isna().sum().sum(), df_test.isna().sum().sum())

# заполняем медианой и модой все столбцы с пропущенными значениями
num_cols = df_train.select_dtypes(exclude=["object"]).columns
cat_cols = df_train.select_dtypes(include=["object"]).columns

num_imputer = SimpleImputer(strategy="median").fit(df_train[num_cols])
cat_imputer = SimpleImputer(strategy="most_frequent").fit(df_train[cat_cols])
for df in dfs:
    df[num_cols] = num_imputer.transform(df[num_cols])
    df[cat_cols] = cat_imputer.transform(df[cat_cols])

print("Суммарной количество пропущенных значений", df_train.isna().sum().sum(), df_test.isna().sum().sum())

### Отбрасываем "скучные" признаки

In [None]:
def get_almost_constant_columns(df: pd.DataFrame, dropna: bool = True) -> list[str]:
    cols = []
    for i in df:
        counts = df[i].dropna().value_counts() if dropna else df[i].value_counts()
        most_popular_value_count = counts.iloc[0]
        if (most_popular_value_count / len(df)) * 100 > 95:
            cols.append(i)

    return cols


# отбрасываем категориальные столбцы, у которых 95+% повторяющихся значений
almost_constant_cat_cols = get_almost_constant_columns(df_train.select_dtypes(include=["object"]))
print("Отбрасываем", almost_constant_cat_cols)
for df in dfs:
    df.drop(columns=almost_constant_cat_cols, inplace=True)


# удаляем столбцы в интервальной шкале с незначительной дисперсией
df_train_num = df_train.select_dtypes(exclude=["object"])
var_thd = VarianceThreshold(threshold=0.1).fit(df_train_num)
low_var_cols = df_train_num.columns[~var_thd.get_support()]
print("Отбрасываем", low_var_cols)
for df in dfs:
    df.drop(columns=low_var_cols, inplace=True)

### Удаляем выбросы

In [None]:
for col, upper_bound in (
    ("LotFrontage", 200),
    ("LotArea", 100000),
    ("BsmtFinSF1", 4000),
    ("TotalBsmtSF", 5000),
    ("GrLivArea", 4000),
):
    drop_index = df_train[df_train[col] > upper_bound].index
    df_train = df_train.drop(drop_index, axis=0)
    y_train = y_train.drop(drop_index, axis=0)

    drop_index = df_test[df_test[col] > upper_bound].index
    df_test = df_test.drop(drop_index, axis=0)
    y_test = y_test.drop(drop_index, axis=0)

### Feature Engineering

In [None]:
ordinal_map = {"Ex": 5,"Gd": 4, "TA": 3, "Fa": 2, "Po": 1, "NA": 0}
fintype_map = {"GLQ": 6,"ALQ": 5,"BLQ": 4,"Rec": 3,"LwQ": 2,"Unf": 1, "NA": 0}
expose_map = {"Gd": 4, "Av": 3, "Mn": 2, "No": 1, "NA": 0}
# fence_map = {"GdPrv": 4,"MnPrv": 3,"GdWo": 2, "MnWw": 1,"NA": 0}  -- выброшен

ord_col = [
    "ExterQual", "ExterCond", "BsmtQual", "BsmtCond", "HeatingQC", 
    "KitchenQual", "GarageQual", "GarageCond", "FireplaceQu"
]
fin_col = ["BsmtFinType1", "BsmtFinType2"]

for df in dfs:
    # столбец с числовым признаком, который на самом деле можно представить как категориальный
    df["MSSubClass"] = df["MSSubClass"].apply(str)

    for col in ord_col:
        df[col] = df[col].map(ordinal_map)

    for col in fin_col:
        df[col] = df[col].map(fintype_map)

    df["BsmtExposure"] = df["BsmtExposure"].map(expose_map)

In [None]:
df_train_processed = df_train.copy()
df_test_processed = df_test.copy()

In [None]:
df_train = df_train_processed.copy()
df_test = df_test_processed.copy()

ADD_CUSTOM_FEATURES = True
APPLY_PCA = True
ADD_OHE_CAT = True
PCA_COMPONENTS_NUM = 5
NORMALIZE = True

In [None]:
cols_to_bin = [
    "MasVnrArea", "TotalBsmtFin", "TotalBsmtSF", "SecondFlrSF", "WoodDeckSF", "TotalPorch"
]


def add_new_features(df: pd.DataFrame) -> pd.DataFrame:
    df["TotalLot"] = df["LotFrontage"] + df["LotArea"]
    df["TotalBsmtFin"] = df["BsmtFinSF1"] + df["BsmtFinSF2"]
    df["TotalSF"] = df["TotalBsmtSF"] + df["SecondFlrSF"]
    df["TotalBath"] = df["FullBath"] + df["HalfBath"]
    df["TotalPorch"] = df["OpenPorchSF"] + df["EnclosedPorch"] + df["ScreenPorch"]
    df["LivLotRatio"] = df["GrLivArea"] / df["LotArea"]

    for col in cols_to_bin:
        df.loc[:, f"{col}_bin"] = df[col].apply(lambda x: 1 if x > 0 else 0)

    return df

# добавим новых признаков
if ADD_CUSTOM_FEATURES:
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        df_train = add_new_features(df_train)
        df_test = add_new_features(df_test)

num_cols = df_train.select_dtypes(exclude=["object"]).columns
cat_cols = df_train.select_dtypes(include=["object"]).columns

# метод главных компонент
if APPLY_PCA:
    print("Количество признаков в интервальной шкале", len(num_cols))
    pca = PCA(n_components=PCA_COMPONENTS_NUM, random_state=SEED)
    
    df_train_pca = pd.DataFrame(
        pca.fit_transform(df_train[num_cols]),
        columns=pca.get_feature_names_out(),
        index=df_train.index,
    )
    df_test_pca = pd.DataFrame(
        pca.transform(df_test[num_cols]),
        columns=pca.get_feature_names_out(),
        index=df_test.index,
    )
    
    df_train = pd.concat([df_train_pca, df_train[cat_cols]], axis=1)
    df_test = pd.concat([df_test_pca, df_test[cat_cols]], axis=1)
    num_cols = df_train.select_dtypes(exclude=["object"]).columns


# one hot encoding
if ADD_OHE_CAT:
    onehot_encoder = OneHotEncoder(
        sparse_output=False, min_frequency=0.3, handle_unknown="ignore"
    ).fit(df_train[cat_cols])
    df_train_ohe = pd.DataFrame(
        onehot_encoder.transform(df_train[cat_cols]),
        columns=onehot_encoder.get_feature_names_out(),
        index=df_train.index,
    )
    df_test_ohe = pd.DataFrame(
        onehot_encoder.transform(df_test[cat_cols]),
        columns=onehot_encoder.get_feature_names_out(),
        index=df_test.index,
    )
    df_train = pd.concat([df_train[num_cols], df_train_ohe], axis=1)
    df_test = pd.concat([df_test[num_cols], df_test_ohe], axis=1)
else:
    df_train = df_train[num_cols]
    df_test = df_test[num_cols]

# нормализация
if NORMALIZE or APPLY_PCA:
    num_cols = df_train.select_dtypes(np.number).columns
    scaler = RobustScaler()
    df_train[num_cols] = scaler.fit_transform(df_train[num_cols])
    df_test[num_cols] = scaler.transform(df_test[num_cols])
    # df_train[df_train.columns] = scaler.fit_transform(df_train)
    # df_test[df_test.columns] = scaler.transform(df_test)

### Моделирование

In [None]:
linreg = LinearRegression()
ridge_001 = Ridge(alpha=0.01, random_state=SEED)
ridge_01 = Ridge(alpha=0.1, random_state=SEED)
ridge_1 = Ridge(alpha=1.0, random_state=SEED)
lasso_001 = Lasso(alpha=0.01, random_state=SEED)
lasso_01 = Lasso(alpha=0.01, random_state=SEED)
lasso_1 = Lasso(alpha=1.0, random_state=SEED)
elastic_net_001_001 = ElasticNet(alpha=0.01, l1_ratio=0.01, random_state=SEED)
elastic_net_01_01 = ElasticNet(alpha=0.1, l1_ratio=0.1, random_state=SEED)
elastic_net_1_1 = ElasticNet(alpha=1.0, l1_ratio=1.0, random_state=SEED)

models = dict(
    zip(
        [
            "linreg", "ridge_001", "ridge_01", "ridge_1", "lasso_001", "lasso_01", "lasso_1",
            "elastic_net_001_001", "elastic_net_01_01", "elastic_net_1_1"
        ],
        [
            linreg, ridge_001, ridge_01, ridge_1, lasso_001, lasso_01, lasso_1, 
            elastic_net_001_001, elastic_net_01_01, elastic_net_1_1
        ],
    )
)

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    for model in models.values():
        model.fit(df_train, y_train)

y_preds = {}
for model_name, model in models.items():
    y_preds[model_name] = model.predict(df_test)

In [None]:
data = []
for model_name, y_pred in y_preds.items():
    data.append(
        [
            model_name,
            mean_absolute_error(y_true=y_test, y_pred=y_pred),
            np.sqrt(mean_squared_error(y_true=y_test, y_pred=y_pred)),
            r2_score(y_true=y_test, y_pred=y_pred),
        ]
    )

df_res = pd.DataFrame(data, columns=["model_name", "MAE", "RMSE", "R2"])

In [None]:
df_res.sort_values(by="R2", ascending=False)

In [None]:
sns.scatterplot(pd.DataFrame({"true": y_test, "pred": y_preds["elastic_net_01_01"]}), x="pred", y="true")

In [None]:
sns.scatterplot(
    pd.DataFrame({"index": range(len(y_test)), "diff": y_test - y_preds["elastic_net_01_01"]}),
    x="index",
    y="diff",
)