# package

In [3]:
import numpy as np
import pandas as pd

# Base / Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.compose import ColumnTransformer, make_column_transformer, TransformedTargetRegressor
from sklearn.preprocessing import FunctionTransformer

# Preprocessing
from sklearn.preprocessing import (
    OneHotEncoder,
    StandardScaler,
    RobustScaler,
    MinMaxScaler,
    PowerTransformer,
    QuantileTransformer
)
from sklearn.impute import SimpleImputer

# FE
from sklearn.feature_selection import mutual_info_regression

# Statistics
from scipy import stats
from scipy.stats import skew, randint
from scipy.special import boxcox1p

# Models
from sklearn.linear_model import ElasticNet, Lasso, Ridge
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, StackingRegressor
from sklearn.svm import SVR
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from catboost import CatBoostRegressor
import category_encoders as ce

# Eval
from sklearn.model_selection import (
    GridSearchCV,
    cross_val_score,
    cross_validate,
    RandomizedSearchCV,
    ShuffleSplit
)
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    r2_score,
    make_scorer
)

# Optuna
import optuna
from optuna.samplers import TPESampler
from optuna.visualization import (
    plot_contour,
    plot_edf,
    plot_intermediate_values,
    plot_optimization_history,
    plot_parallel_coordinate,
    plot_param_importances,
    plot_slice
)

# Stacking
from mlxtend.regressor import StackingCVRegressor


In [51]:
 
pred_train = final3_pipe.predict(pipe)

mse = mean_squared_error(y, pred_train)
rmse = np.sqrt(mse)
mae  = mean_absolute_error(y, pred_train)
r2   = r2_score(y, pred_train)


print(mse, rmse, mae, r2)

227011056.59598246 15066.886094876489 9883.684942361559 0.9635498329914917


# Solving

In [35]:
# 1. derivative variable
class FeatureCreator1(BaseEstimator, TransformerMixin):
    def __init__(self, add_attributes=True):
        self.add_attributes = add_attributes

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if not self.add_attributes:
            return X

        X_copy = X.copy()

        X_copy["Lack_of_feature_index"] = (
            X_copy[["Street", "Alley", "MasVnrType", "GarageType", "MiscFeature",
                    "BsmtQual", "FireplaceQu", "PoolQC", "Fence"]]
            .isnull().sum(axis=1)
            + (X_copy["MasVnrType"] == "None")
            + (X_copy["CentralAir"] == "No")
        )

        X_copy["MiscFeatureExtended"] = (
            X_copy["PoolQC"].notnull().astype(int)
            + X_copy["MiscFeature"].notnull().astype(int)
            + X_copy["Fence"].notnull().astype(int)
        ).astype("int64")

        X_copy["Has_Alley"] = X_copy["Alley"].notnull().astype("int64")
        X_copy["Lot_occupation"] = X_copy["GrLivArea"] / X_copy["LotArea"]

        X_copy["Number_of_floors"] = (
            (X_copy["TotalBsmtSF"] != 0).astype("int64")
            + (X_copy["1stFlrSF"] != 0).astype("int64")
            + (X_copy["2ndFlrSF"] != 0).astype("int64")
        )

        X_copy["Total_Close_Live_Area"] = X_copy["GrLivArea"] + X_copy["TotalBsmtSF"]

        X_copy["Outside_live_area"] = (
            X_copy["WoodDeckSF"]
            + X_copy["OpenPorchSF"]
            + X_copy["EnclosedPorch"]
            + X_copy["3SsnPorch"]
            + X_copy["ScreenPorch"]
        )

        X_copy["Total_usable_area"] = (
            X_copy["Total_Close_Live_Area"] + X_copy["Outside_live_area"]
        )

        X_copy["Area_Quality_Indicator"] = (
            X_copy["Total_usable_area"] * X_copy["OverallQual"]
        )

        X_copy["Area_Qual_Cond_Indicator"] = (
            X_copy["Total_usable_area"]
            * X_copy["OverallQual"]
            * X_copy["OverallCond"]
        )

        X_copy["TotalBath"] = (
            X_copy["FullBath"]
            + 0.5 * X_copy["HalfBath"]
            + X_copy["BsmtFullBath"]
            + 0.5 * X_copy["BsmtHalfBath"]
        )

        X_copy["Has_garage"] = X_copy["GarageYrBlt"].notnull().astype("int64")
        X_copy["House_Age"] = X_copy["YrSold"] - X_copy["YearBuilt"]
        X_copy["Is_Remodeled"] = (X_copy["YearBuilt"] != X_copy["YearRemodAdd"]).astype(
            "int64"
        )
        X_copy["HasBsmt"] = X_copy["BsmtQual"].notnull().astype("int64")

        X_copy["Quality_conditition"] = X_copy["OverallQual"] * X_copy["OverallCond"]
        X_copy["Quality_conditition_2"] = (
            X_copy["OverallQual"] + X_copy["OverallCond"]
        )
        X_copy["House_Age2"] = X_copy["YrSold"] - X_copy["YearRemodAdd"]

        return X_copy


 
# 2. preprocess
 
def drop_outliers(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df = df.drop(
        df[
            (df["GrLivArea"] > 4000)
            & (df["SalePrice"] < 200000)
        ].index
    )
    df = df.drop(
        df[
            (df["GarageArea"] > 1200)
            & (df["SalePrice"] < 300000)
        ].index
    )
    df = df.drop(
        df[
            (df["TotalBsmtSF"] > 4000)
            & (df["SalePrice"] < 200000)
        ].index
    )
    df = df.drop(
        df[
            (df["1stFlrSF"] > 4000)
            & (df["SalePrice"] < 200000)
        ].index
    )
    df = df.drop(
        df[
            (df["TotRmsAbvGrd"] > 12)
            & (df["SalePrice"] < 230000)
        ].index
    )
    return df

 
# 3. main

class HousePriceStackingModel:
    """
    - drop_outliers
    - Creator1
    - tree_preprocessor / linear_preprocessor
    - 튜닝된 XGB / GBM / LGBM / CatBoost / ElasticNet / Lasso / Ridge / SVR
    - StackingRegressor + 최종 Lasso
    를 그대로 사용.
    """

    def __init__(self, random_state: int = 1):
        self.random_state = random_state
        self.pipeline_ = None
        self.X_train_ = None
        self.y_train_ = None

        # ★ 나중에 재사용할 feature 그룹들
        self.categorical_features_ = None
        self.ordinal_features_ = None
        self.nominal_features_ = None
        self.numerical_features_ = None


    # 3-1. 변수정의
    def _define_feature_groups(self, X: pd.DataFrame):
        # object 타입 -> categorical
        categorical_features = [
            col for col in X.columns if X[col].dtype == "object"
        ]

        ordinal_features = [
            "LotShape",
            "Utilities",
            "LandSlope",
            "ExterQual",
            "ExterCond",
            "BsmtQual",
            "BsmtCond",
            "BsmtExposure",
            "BsmtFinType1",
            "BsmtFinType2",
            "HeatingQC",
            "KitchenQual",
            "Functional",
            "FireplaceQu",
            "GarageFinish",
            "GarageQual",
            "GarageCond",
            "PavedDrive",
            "PoolQC",
            "Fence",
        ]

        nominal_features = list(set(categorical_features) - set(ordinal_features))
        numerical_features = list(set(X.columns) - set(categorical_features))

        self.categorical_features_ = categorical_features
        self.ordinal_features_ = ordinal_features
        self.nominal_features_ = nominal_features
        self.numerical_features_ = numerical_features

 
    # 3-2. mapping
    def _build_preprocessors(self):
        GarageQual_map = {"Ex": 5, "Gd": 4, "TA": 3, "Fa": 2, "Po": 1, "NA": 0}
        Fence_map = {"GdPrv": 4, "MnPrv": 3, "GdWo": 2, "MnWw": 1, "NA": 0}
        GarageFinish_map = {"Fin": 3, "RFn": 2, "Unf": 1, "NA": 0}
        KitchenQual_map = {"Ex": 4, "Gd": 3, "TA": 2, "Fa": 1, "Po": 0}
        GarageCond_map = {"Ex": 5, "Gd": 4, "TA": 3, "Fa": 2, "Po": 1, "NA": 0}
        HeatingQC_map = {"Ex": 4, "Gd": 3, "TA": 2, "Fa": 1, "Po": 0}
        ExterQual_map = {"Ex": 4, "Gd": 3, "TA": 2, "Fa": 1, "Po": 0}
        BsmtCond_map = {"Ex": 5, "Gd": 4, "TA": 3, "Fa": 2, "Po": 1, "NA": 0}
        LandSlope_map = {"Gtl": 2, "Mod": 1, "Sev": 0}
        ExterCond_map = {"Ex": 4, "Gd": 3, "TA": 2, "Fa": 1, "Po": 0}
        BsmtExposure_map = {"Gd": 4, "Av": 3, "Mn": 2, "No": 1, "NA": 0}
        PavedDrive_map = {"Y": 2, "P": 1, "N": 0}
        BsmtQual_map = {"Ex": 5, "Gd": 4, "TA": 3, "Fa": 2, "Po": 1, "NA": 0}
        LotShape_map = {"Reg": 3, "IR1": 2, "IR2": 1, "IR3": 0}
        BsmtFinType2_map = {
            "GLQ": 6,
            "ALQ": 5,
            "BLQ": 4,
            "Rec": 3,
            "LwQ": 2,
            "Unf": 1,
            "NA": 0,
        }
        BsmtFinType1_map = {
            "GLQ": 6,
            "ALQ": 5,
            "BLQ": 4,
            "Rec": 3,
            "LwQ": 2,
            "Unf": 1,
            "NA": 0,
        }
        FireplaceQu_map = {"Ex": 5, "Gd": 4, "TA": 3, "Fa": 2, "Po": 1, "NA": 0}
        Utilities_map = {"AllPub": 3, "NoSewr": 2, "NoSeWa": 1, "ELO": 0}
        Functional_map = {
            "Typ": 7,
            "Min1": 6,
            "Min2": 5,
            "Mod": 4,
            "Maj1": 3,
            "Maj2": 2,
            "Sev": 1,
            "Sal": 0,
        }
        PoolQC_map = {"Ex": 5, "Gd": 4, "TA": 3, "Fa": 2, "Po": 1, "NA": 0}

        mapping_dict = {
            "GarageQual": GarageQual_map,
            "Fence": Fence_map,
            "GarageFinish": GarageFinish_map,
            "KitchenQual": KitchenQual_map,
            "GarageCond": GarageCond_map,
            "HeatingQC": HeatingQC_map,
            "ExterQual": ExterQual_map,
            "BsmtCond": BsmtCond_map,
            "LandSlope": LandSlope_map,
            "ExterCond": ExterCond_map,
            "BsmtExposure": BsmtExposure_map,
            "PavedDrive": PavedDrive_map,
            "BsmtQual": BsmtQual_map,
            "LotShape": LotShape_map,
            "BsmtFinType2": BsmtFinType2_map,
            "BsmtFinType1": BsmtFinType1_map,
            "FireplaceQu": FireplaceQu_map,
            "Utilities": Utilities_map,
            "Functional": Functional_map,
            "PoolQC": PoolQC_map,
        }

        ordinal_mapping = [
            {"col": col, "mapping": mapping_dict[col]}
            for col in self.ordinal_features_
            if col in mapping_dict
        ]

        # tree
        numerical_transformer = Pipeline(
            steps=[
                ("imputer", SimpleImputer(strategy="constant", fill_value=0)),
            ]
        )

        # ohe
        nominal_transformer = Pipeline(
            steps=[
                (
                    "imputer",
                    SimpleImputer(
                        strategy="constant", fill_value="Do_not_have_this_feature"
                    ),
                ),
                ("onehot", OneHotEncoder(handle_unknown="ignore")),
            ]
        )

        # ordinal 용도
        ordinal_encoder = ce.OrdinalEncoder(mapping=ordinal_mapping)
        ordinal_transformer = Pipeline(
            steps=[("ordinal_encoder", ordinal_encoder)]
        )

        # Tree 용도
        self.tree_preprocessor_ = ColumnTransformer(
            remainder=numerical_transformer,
            transformers=[
                ("nominal_transformer", nominal_transformer, self.nominal_features_),
                ("ordinal_transformer", ordinal_transformer, self.ordinal_features_),
            ],
        )

        # --- linear용도
        skewed_features = [
            "MiscVal",
            "PoolArea",
            "LotArea",
            "3SsnPorch",
            "LowQualFinSF",
            "BsmtFinSF2",
            "ScreenPorch",
            "EnclosedPorch",
            "Lot_occupation",
            "MasVnrArea",
            "OpenPorchSF",
            "Area_Qual_Cond_Indicator",
            "LotFrontage",
            "WoodDeckSF",
            "Area_Quality_Indicator",
            "Outside_live_area",
        ]

        skewness_transformer = Pipeline(
            steps=[
                ("imputer", SimpleImputer(strategy="constant", fill_value=0)),
                (
                    "PowerTransformer",
                    PowerTransformer(method="yeo-johnson", standardize=True),
                ),
            ]
        )

        numerical_transformer2 = Pipeline(
            steps=[
                ("imputer", SimpleImputer(strategy="constant", fill_value=0)),
                ("Scaler", StandardScaler()),
            ]
        )

        self.linear_preprocessor_ = ColumnTransformer(
            remainder=numerical_transformer2,
            transformers=[
                ("skewness_transformer", skewness_transformer, skewed_features),
                ("nominal_transformer", nominal_transformer, self.nominal_features_),
                ("ordinal_transformer", ordinal_transformer, self.ordinal_features_),
            ],
        )

        # Creator1 인스턴스
        self.creator1_ = FeatureCreator1(add_attributes=True)

    # -------------------------
    # 3-3. 모델 구성 (튜닝값 그대로)
    # -------------------------
    def _build_models(self):
        # xgb
        xgb_tunned = XGBRegressor(
            n_estimators=6500,
            max_depth=5,
            learning_rate=0.004828231865923587,
            subsample=0.3231512729662032,
            colsample_bytree=0.25528017285233484,
            min_child_weight=2,
            gamma=0.0026151163125498213,
            alpha=1.7938525031017074e-09,
            random_state=self.random_state,
            tree_method="hist",  # 속도용(원하면 제거해도 됨)
        )

        pipe_xgb = Pipeline(
            steps=[
                ("tree_preprocessor", self.tree_preprocessor_),
                ("regressor1", xgb_tunned),
            ]
        )

        # gradeint boosting
        gbm_tunned = GradientBoostingRegressor(
            n_estimators=5500,
            max_depth=5,
            min_samples_leaf=14,
            learning_rate=0.006328507206504974,
            subsample=0.9170443266552768,
            max_features="sqrt",
            random_state=self.random_state,
        )

        pipe_gbm = Pipeline(
            steps=[
                ("tree_preprocessor", self.tree_preprocessor_),
                ("regressor2", gbm_tunned),
            ]
        )

        # lgbm
        lgbm_tunned = LGBMRegressor(
            n_estimators=7000,
            max_depth=7,
            learning_rate=0.002536841439596437,
            min_data_in_leaf=22,
            subsample=0.7207500503954922,
            max_bin=210,
            feature_fraction=0.30010067215105635,
            random_state=self.random_state,
            verbosity=-1,
        )

        pipe_lgbm = Pipeline(
            steps=[
                ("tree_preprocessor", self.tree_preprocessor_),
                ("regressor3", lgbm_tunned),
            ]
        )

        # cat
        catboost_tunned = CatBoostRegressor(
            iterations=4500,
            colsample_bylevel=0.05367479984702603,
            learning_rate=0.018477566955501026,
            random_strength=0.1321272840705348,
            depth=6,
            l2_leaf_reg=4,
            boosting_type="Plain",
            bootstrap_type="Bernoulli",
            subsample=0.7629052520889268,
            logging_level="Silent",
            random_state=self.random_state,
        )

        pipe_catboost = Pipeline(
            steps=[
                ("tree_preprocessor", self.tree_preprocessor_),
                ("regressor4", catboost_tunned),
            ]
        )

        # ElasticNet , log target
        elasticnet_tunned = ElasticNet(
            max_iter=3993,
            alpha=0.0007824887724782356,
            l1_ratio=0.25,
            tol=3.78681184748232e-06,
            random_state=self.random_state,
        )

        pipe_Elasticnet = Pipeline(
            steps=[
                ("linear_preprocessor", self.linear_preprocessor_),
                ("regressor5", elasticnet_tunned),
            ]
        )

        TargetTransformedElasticnet = TransformedTargetRegressor(
            regressor=pipe_Elasticnet, func=np.log1p, inverse_func=np.expm1
        )

        # Lasso 튜닝, log target
        lasso_tunned = Lasso(
            max_iter=2345,
            alpha=0.00019885959230548468,
            tol=2.955506894549702e-05,
            random_state=self.random_state,
        )

        pipe_Lasso = Pipeline(
            steps=[
                ("linear_preprocessor", self.linear_preprocessor_),
                ("regressor6", lasso_tunned),
            ]
        )

        TargetTransformedLasso = TransformedTargetRegressor(
            regressor=pipe_Lasso, func=np.log1p, inverse_func=np.expm1
        )

        # Ridge, log target 
        ridge_tunned = Ridge(
            max_iter=1537,
            alpha=6.654338887411367,
            tol=8.936831872581897e-05,
            random_state=self.random_state,
        )

        pipe_Ridge = Pipeline(
            steps=[
                ("linear_preprocessor", self.linear_preprocessor_),
                ("regressor7", ridge_tunned),
            ]
        )

        TargetTransformedRidge = TransformedTargetRegressor(
            regressor=pipe_Ridge, func=np.log1p, inverse_func=np.expm1
        )

        # log target , RMSLE, MAE 도 같이봐야해
        svr_tunned = SVR(
            kernel="linear",
            C=0.019257948556667938,
            epsilon=0.016935170969518305,
            tol=0.0006210492106739069,
        )

        pipe_SVR = Pipeline(
            steps=[
                ("linear_preprocessor", self.linear_preprocessor_),
                ("regressor8", svr_tunned),
            ]
        )

        TargetTransformedSVR = TransformedTargetRegressor(
            regressor=pipe_SVR, func=np.log1p, inverse_func=np.expm1
        )

        estimators = [
            ("pipe_xgb", pipe_xgb),
            ("pipe_gbm", pipe_gbm),
            ("pipe_lgbm", pipe_lgbm),
            ("pipe_catboost", pipe_catboost),
            ("TargetTransformedElasticnet", TargetTransformedElasticnet),
            ("TargetTransformedLasso", TargetTransformedLasso),
            ("TargetTransformedRidge", TargetTransformedRidge),
            ("TargetTransformedSVR", TargetTransformedSVR),
        ]

        stacking_regressor = StackingRegressor(
            estimators=estimators,
            final_estimator=Lasso(alpha=0.01, random_state=self.random_state),
        )

        self.pipeline_ = Pipeline(
            steps=[
                ("Creator1", self.creator1_),
                ("stacking_regressor", stacking_regressor),
            ]
        )


    # 3-4. fit / predict / evaluate
    def fit(self, train_df: pd.DataFrame):
        """
        Id는 index로 두는게 편함
        """
        df = drop_outliers(train_df)
        y = df["SalePrice"].values
        X = df.drop("SalePrice", axis=1)

        self._define_feature_groups(X)
        self._build_preprocessors()
        self._build_models()

        self.pipeline_.fit(X, y)

        self.X_train_ = X
        self.y_train_ = y

        return self

    def predict(self, test_df: pd.DataFrame) -> np.ndarray:
        if self.pipeline_ is None:
            raise RuntimeError("callfit 먼저")
        return self.pipeline_.predict(test_df)

    def evaluate_in_sample(self):
        # 샘플성능 확인. 이상하면 다시
        if self.pipeline_ is None or self.X_train_ is None:
            raise RuntimeError("callfit 먼저")

        y_pred = self.pipeline_.predict(self.X_train_)

        mse = mean_squared_error(self.y_train_, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(self.y_train_, y_pred)
        r2 = r2_score(self.y_train_, y_pred)

        return {
            "mse": mse,
            "rmse": rmse,
            "mae": mae,
            "r2": r2,
        }


 


# execute

In [36]:
if __name__ == "__main__":
    train_path = r"C:\Users\kdh98\OneDrive\Desktop\모두의 연구소\4. ML\MLday2\train.csv"
    test_path = r"C:\Users\kdh98\OneDrive\Desktop\모두의 연구소\4. ML\MLday2\test.csv"

    train_df = pd.read_csv(train_path, index_col="Id")
    test_df = pd.read_csv(test_path, index_col="Id")

    model = HousePriceStackingModel(random_state=1)
    model.fit(train_df)
    metrics = model.evaluate_in_sample()
    print("In-sample:", metrics)

    # 최종 eval
    preds_test = model.predict(test_df)
    submission = pd.DataFrame({"Id": test_df.index, "SalePrice": preds_test})
    submission.to_csv("submission.csv", index=False)

In-sample: {'mse': 217182612.30423698, 'rmse': np.float64(14737.116824679004), 'mae': 9691.665796689207, 'r2': 0.96512794306789}
submission_class_based.csv saved.
