In [None]:
from pathlib import Path
import pandas as pd
import tarfile
import urllib.request

def load_housing_data():
    tarball_path = Path("datasets/housing.tgz")
    if not tarball_path.is_file():
        Path("datasets").mkdir(parents = True, exist_ok = True)
        url = "https://github.com/ageron/data/raw/main/housing.tgz"
        urllib.request.urlretrieve(url, tarball_path)
        with tarfile.open(tarball_path) as housing_tarball:
            housing_tarball.extractall(path = "datasets")
    return pd.read_csv(Path("datasets/housing/housing.csv"))

housing = load_housing_data()

```
성능 추정 (Performance estimation):
대규모 데이터 세트: 2-way 홀드아웃 방법(훈련/테스트 분할)과 정규 근사를 통한 신뢰구간 계산이 사용됩니다.
소규모 데이터 세트: (반복적인) k-겹 교차 검증, Leave-one-out 교차 검증, 그리고 0.632 부트스트랩을 통한 신뢰구간 계산이 사용됩니다.

모델 선택 (Model selection): 하이퍼파라미터 최적화 및 성능 추정:
대규모 데이터 세트: 3-way 홀드아웃 방법(훈련/검증/테스트 분할)이 사용됩니다.
소규모 데이터 세트: 독립적인 테스트 세트를 가진 (반복적인) k-겹 교차 검증과 Leave-one-out 교차 검증이 사용됩니다.

모델 및 알고리즘 비교 (Model & algorithm comparison):
대규모 데이터 세트: 여러 독립적인 훈련 세트와 테스트 세트를 사용한 알고리즘 비교, McNemar 검정을 통한 모델 비교가 사용됩니다.
소규모 데이터 세트: 5x2cv F 검정과 중첩 교차 검증이 사용됩니다. 여기서 AC는 알고리즘 비교(Algorithm Comparison), MC는 모델 비교(Model Comparison)를 의미합니다.
```

In [None]:
"""
Pandas API

housing.head() # 첫 5개 행 확인
housing.info() # 데이터에 관한 전체적 Summary
housing['ocean_proximity'].value_counts() # feature가 어떤 레이블로 이루어져 있는지 확인
housing.describe() # count, mean, std ,min ,25% , 50%, 75%, max 등의 데이터 제공
"""

In [None]:
import numpy as np
# 중위소득을 5개 계층으로 나눈 income_cat 레이블 생성
housing["income_cat"] = pd.cut(housing["median_income"],
                               bins = [0., 1.5, 3.0, 4.5, 6., np.inf],
                               labels = [1, 2, 3, 4, 5])

In [None]:
from sklearn.model_selection import train_test_split

strat_train_set, strat_test_set = train_test_split(
    housing, test_size = 0.2, stratify = housing["income_cat"], random_state=42)
# 소득 카테고리 비율을 유지해 train_set와 test_set 만들기

In [None]:
# income_cat 레이블은 이제 사용되지 않으므로(적어도 예제에서) 삭제하기
for set_ in (strat_train_set, strat_test_set):
    set_.drop("income_cat", axis = 1, inplace = True) #axis = 0,1 / 행,열

In [None]:
housing = strat_train_set.copy() # 원본 복사본을 만들어 재사용

In [None]:
"""
 # 경도 위도에 따른 인구수 시각화 항상 데이터를 시각화 해보는 것은
 데이터와 문제를 더 잘 이해할 수 있게 하기 때문에 추천되는 방법임."""
import matplotlib.pyplot as plt

housing.plot(kind="scatter", x="longitude", y="latitude", grid=True,
             s=housing["population"] / 100, label="population",
             c="median_house_value", cmap="jet", colorbar=True,
             legend=True, figsize=(10, 7))
plt.ylabel('oo')
cax=plt.gcf().get_axes()[1]
plt.gcf().get_axes()[1].set_ylabel('test')
plt.show()

In [None]:
corr_matrix = housing.corr(numeric_only = True) # 숫자로된 레이블로만 상관관계 테이블 만들기
corr_matrix["median_house_value"].sort_values(ascending = False) #중간 주택 가격에 대한 상관관계

median_house_value    1.000000
median_income         0.688380
total_rooms           0.137455
housing_median_age    0.102175
households            0.071426
total_bedrooms        0.054635
population           -0.020153
longitude            -0.050859
latitude             -0.139584
Name: median_house_value, dtype: float64

In [None]:
# 새로운 레이블 만들기 wow 모먼트..

housing["room_per_house"] = housing["total_rooms"] / housing["households"]
housing["bedrooms_ratio"] = housing["total_bedrooms"] / housing["total_rooms"]
housing["population_per_house"] = housing["population"] / housing["households"]

# 레이블을 추가한 뒤의 상관관계
corr_matrix = housing.corr(numeric_only = True)
corr_matrix["median_house_value"].sort_values(ascending = False)

median_house_value      1.000000
median_income           0.688380
room_per_house          0.143663
total_rooms             0.137455
housing_median_age      0.102175
households              0.071426
total_bedrooms          0.054635
population             -0.020153
population_per_house   -0.038224
longitude              -0.050859
latitude               -0.139584
bedrooms_ratio         -0.256397
Name: median_house_value, dtype: float64

In [None]:
 """median_house_value를 예측하고자 하므로, 이를 제외한 레이블을 만든다.
inplace = True를 하지 않으면 원본 파일을 수정하진 않는다."""

housing = strat_train_set.drop("median_house_value", axis = 1)
housing_labels = strat_train_set["median_house_value"].copy()

In [None]:
""" 결측값(특성이 비어있는 행)을 처리하는 3가지 방법
1. 그 행만 지운다.
housing.dropna(subset=["문제가 있는 열"], inplace = True)
2. 결측값이 있는 특성 자체를 없앤다.
 housing.drop("문제가 있는 열", axis = 1, inplace = True)
3. 그 행의 결측값을 다른 정보로 메운다.
 housing["문제가 있는 열"].fillna(median, inplace = True)

모든 특성에 이렇게 전처리 하는걸 추천함
"""

# 3번째 전략
from sklearn.impute import SimpleImputer

imputer = SimpleImputer(strategy = 'median') #평균, 중앙값, 최빈값, 상수
housing_num = housing.select_dtypes(include = [np.number]) #숫자 타입만 복사
imputer.fit(housing_num)

X = imputer.transform(housing_num)

In [None]:
import sklearn
#변환기가 pandas입력을 pandas출력으로 내놓게 하는 설정
#기본 넘파이 배열은 default
sklearn.set_config(transform_output = 'default')

In [None]:
# 텍스트 특성 다루기
housing_cat = housing[["ocean_proximity"]]
housing_cat.head(8)

from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder

#텍스트 특성을 숫자로 변환
ordinal_encoder = OrdinalEncoder()
housing_cat_encoded = ordinal_encoder.fit_transform(housing_cat)

housing_cat_encoded[:8]
# <1H OCEAN, INLAND, ISLAND, NEAR BAY, NEAR OCEAN이 0,1,2,3,4로 변환됨

array([[3.],
       [0.],
       [1.],
       [1.],
       [4.],
       [1.],
       [0.],
       [3.]])

In [None]:
# 그러나 지금은 숫자 크기에 아무런 의미가 없으므로 원 핫 인코딩으로 변환함
cat_encoder = OneHotEncoder(sparse_output = False)
housing_cat_1hot = cat_encoder.fit_transform(housing_cat)

#housing_cat_1hot.toarray()
print(housing_cat_1hot)

[[0. 0. 0. 1. 0.]
 [1. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0.]
 ...
 [0. 0. 0. 0. 1.]
 [1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 1.]]


In [None]:
""" 데이터 프레임 형태로 출력도 가능
df_test_unknown = pd.DataFrame({"ocean_proximity": ["<1H OCEAN", "ISLAND"]})
pd.DataFrame(cat_encoder.transform(df_test_unknown),
             columns = cat_encoder.get_feature_names_out(),
             index = df_test_unknown.index)
"""

In [None]:
""" 데이터 전처리는 트리 기반 알고리즘을 사용할때를 제외하고는 거의 모든
환경에서 필수적으로 행해야 한다. 각 특성마다 Scale 범위가 달라서
모든 특성이 동등하게 평가받지 못하기 때문이다.
정규화(Normalization) : MinMaxScaler / 특성을 0~1 또는 -1~1의 범위로 조정함
표준화(Standardization) : StandardScaler / 각 샘플 값을 특성 전체의 평균으로 빼고
표준편차로 나눈다.

이 둘의 차이 -> min-max와 달리 표준화는 특성을 일정 범위로 조절하지 않으며
어떤 이상한 값에 영향을 덜 받는다. (정규화는 0~15 범위의 특성값에 100이라는 이상치가
들어오면 100을 1로 만들고 나머지 값은 0~0.15안에 들어가버릴 것이다.)
"""
from sklearn.preprocessing import MinMaxScaler # 정규화
min_max_scaler = MinMaxScaler(feature_range = (-1, 1))
housing_num_min_max_scaled = min_max_scaler.fit_transform(housing_num)

from sklearn.preprocessing import StandardScaler # 표준화
std_scaled = StandardScaler()
housing_num_std_scaled = std_scaled.fit_transform(housing_num)

In [None]:
from sklearn.compose import TransformedTargetRegressor
from sklearn.linear_model import LinearRegression

model = TransformedTargetRegressor(LinearRegression(),
                                   transformer=StandardScaler())
# 이러면 median_income이 알아서 표준화 전처리되고
# 선형회귀 모델로 학습된다.
model.fit(housing[["median_income"]], housing_labels)
# 예측을 불러올때는 표준화로 전처리된 값을 자동으로 되돌리고 예측한다.
#predictions = model.predict(new_model)

In [None]:
# 사용자 정의 변환기 예시
from sklearn.preprocessing import FunctionTransformer
from sklearn.metrics.pairwise import rbf_kernel

#인구 특성을 로그값으로 변환
"""만약 변환기를 TransformedTargetRegressor에 사용한다면 inverse_func를
지정해주어야 나중에 predict할때 자동으로 복구해서 출력해준다.
그렇지 않으면 변환한 값 그대로 출력한다."""
log_transformer = FunctionTransformer(np.log, inverse_func=np.exp)
log_pop = log_transformer.transform(housing[["population"]])

# 샌프란 시스코 위도,경도와 다른 샘플의 위도,경도 유사성(RBF커널)
sf_coords = 37.7749, -122.41
sf_transformer = FunctionTransformer(rbf_kernel,
                                     kw_args=dict(Y=[sf_coords], gamma=0.1))
sf_simil = sf_transformer.transform(housing[["latitude", "longitude"]])

# 특성 1번과 특성 2번의 비율 출력
ratio_transformer = FunctionTransformer(lambda X: X[:, [0]] / X[:, [1]])
ratio_transformer.transform(np.array([[1., 2.], [3., 4.]]))

array([[0.5 ],
       [0.75]])

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_array, check_is_fitted

""" 사용자 정의 변환기 만드는 방법
BaseEstimator = 파이프라인과 그리드 서치를 위한 get_params, set_parms 상속
TransformerMixin = fit_transform 메서드 상속
"""
class StandardSclaerClone(BaseEstimator, TransformerMixin):
    def __init__(self, with_mean = True):
        self.with_mean = with_mean

    def fit(self, X, y = None): # y를 사용하지 않아도 넣어야 함.
        X = check_array(X) # 부동소수점 배열인지 확인
        self.mean_ = X.mean(axis = 0)
        self.scale_ = X.std(axis = 0)
        self.n_features_in_ = X.shape[1] # 이것도 반드시 필요
        return self # 항상 self를 반환해야함.

    def transform(self, X):
        check_is_fitted(self) #학습된 속성이 있는지 확인
        X = check_array(X)
        assert self.n_features_in_ == X.shape[1]
        if self.with_mean:
            X = X - self.mean_
        return X / self.scale_

# sklearn.utils.estimator_checks 모듈의 check_estimator() 함수에
# 사용자 정의 추정기 객체를 전달해 sklearn API를 준수하는지 확인가능
# 위 함수에는 해당 없음. 119page 코드 참고

In [None]:
""" 변환 파이프 라인
여러가지 전처리 과정들과 전체 파이프라인을 하나로 묶어주는 클래스 """

from sklearn.pipeline import Pipeline

# 이름 / 추정기 쌍으로 입력한다. 이름은 내맘대로 가능
num_pipeline = Pipeline([
    ("impute", SimpleImputer(strategy = "median")),
    ("standardize", StandardScaler()),
])
"""아래 처럼 할 수도 있으며 이때는 이름이 변환기 클래스 이름으로 만들어짐
from sklearn.pipeline import make_pipeline
num_pipeline = make_pipeline(SimpleImputer(strategy="median"), StandardScaler())
"""
# sklearn.set_config(display = 'diagram')
num_pipeline # 파이프라인 시각화

In [None]:
""" 전체 데이터를 하나의 파이프라인으로 전처리 하는 방법
물론 파이프라인 끝에 추정기(predict)를 넣을수도 있음
"""
from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer

num_attribs = ["longitude", "latitude", "housing_median_age", "total_rooms",
               "total_bedrooms", "population", "households", "median_income"]
cat_attribs = ["ocean_proximity"]

num_pipeline = make_pipeline(
    SimpleImputer(strategy="median"),
    StandardScaler())

cat_pipeline = make_pipeline(
    SimpleImputer(strategy="most_frequent"),
    OneHotEncoder(handle_unknown="ignore"))

preprocessing = ColumnTransformer([
    ("num", num_pipeline, num_attribs), # 이름, 변환기, 적용할 열 이름(리스트)
    ("cat", cat_pipeline, cat_attribs),
])

In [None]:
""" 그리고 이 모든 파이프라인 과정을 아래와 같이 간략화할 수 있다."""
from sklearn.compose import make_column_selector, make_column_transformer

preprocessing = make_column_transformer(
    (num_pipeline, make_column_selector(dtype_include = np.number)),
    (cat_pipeline, make_column_selector(dtype_include = object)),
)

housing_prepared = preprocessing.fit_transform(housing)

In [None]:
# 사용자 정의 변환기 안에 추정기를 넣는 방법
from sklearn.cluster import KMeans

class ClusterSimilarity(BaseEstimator, TransformerMixin):
    def __init__(self, n_clusters=10, gamma=1.0, random_state=None):
        self.n_clusters = n_clusters
        self.gamma = gamma
        self.random_state = random_state

    def fit(self, X, y=None, sample_weight=None):
        # 사이킷런 1.2버전에서 최상의 결과를 찾기 위해 반복하는 횟수를 지정하는 `n_init` 매개변수 값에 `'auto'`가 추가되었습니다.
        # `n_init='auto'`로 지정하면 초기화 방법을 지정하는 `init='random'`일 때 10, `init='k-means++'`일 때 1이 됩니다.
        # 사이킷런 1.4버전에서 `n_init`의 기본값이 10에서 `'auto'`로 바뀝니다. 경고를 피하기 위해 `n_init=10`으로 지정합니다.
        self.kmeans_ = KMeans(self.n_clusters, n_init=10, random_state=self.random_state)
        self.kmeans_.fit(X, sample_weight=sample_weight)
        return self  # 항상 self를 반환

    def transform(self, X):
        return rbf_kernel(X, self.kmeans_.cluster_centers_, gamma=self.gamma)

    def get_feature_names_out(self, names=None):
        return [f"클러스터 {i} 유사도" for i in range(self.n_clusters)]

In [None]:
""" 그리고 여태껏 한 모든 과정을 하나의 파이프라인으로 만들 수 있다."""

def column_ratio(X):
    return X[:, [0]] / X[:, [1]]

def ratio_name(function_transformer, feature_names_in):
    return ["ratio"] # get_feature_names_out에 사용

def ratio_pipeline():
    return make_pipeline(
        SimpleImputer(strategy = 'median'),
        FunctionTransformer(column_ratio, feature_names_out = ratio_name),
        StandardScaler())

log_pipeline = make_pipeline(
    SimpleImputer(strategy = 'median'),
    FunctionTransformer(np.log, feature_names_out = 'one-to-one'),
    StandardScaler())

cluster_simil = ClusterSimilarity(n_clusters = 10, gamma = 1, random_state = 42)

default_num_pipeline = make_pipeline(SimpleImputer(strategy = 'median'),
                                     StandardScaler())

cat_pipeline = make_pipeline(
    SimpleImputer(strategy="most_frequent"),
    OneHotEncoder(handle_unknown="ignore"))

preprocessing = ColumnTransformer([
    ("bedrooms", ratio_pipeline(), ["total_bedrooms", "total_rooms"]),
    ("room_per_house", ratio_pipeline(), ["total_rooms", "households"]),
    ("people_per_house", ratio_pipeline(), ["population", "households"]),
    ("log", log_pipeline, ["total_bedrooms", "total_rooms", "population",
                             "households", "median_income"]),
    ("geo", cluster_simil, ["latitude", "longitude"]),
    ("cat", cat_pipeline, make_column_selector(dtype_include = object)),
],
remainder = default_num_pipeline)

In [None]:
housing_prepared = preprocessing.fit_transform(housing)
housing_prepared.shape # 16512, 24
preprocessing.get_feature_names_out()

array(['bedrooms__ratio', 'room_per_house__ratio',
       'people_per_house__ratio', 'log__total_bedrooms',
       'log__total_rooms', 'log__population', 'log__households',
       'log__median_income', 'geo__클러스터 0 유사도', 'geo__클러스터 1 유사도',
       'geo__클러스터 2 유사도', 'geo__클러스터 3 유사도', 'geo__클러스터 4 유사도',
       'geo__클러스터 5 유사도', 'geo__클러스터 6 유사도', 'geo__클러스터 7 유사도',
       'geo__클러스터 8 유사도', 'geo__클러스터 9 유사도',
       'cat__ocean_proximity_<1H OCEAN', 'cat__ocean_proximity_INLAND',
       'cat__ocean_proximity_ISLAND', 'cat__ocean_proximity_NEAR BAY',
       'cat__ocean_proximity_NEAR OCEAN', 'remainder__housing_median_age'],
      dtype=object)

In [None]:
# 아주 간단한 회귀 모델 훈련하기

from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor

lin_reg = make_pipeline(preprocessing, RandomForestRegressor(random_state = 42))
lin_reg.fit(housing, housing_labels)

In [None]:
housing_predictions = lin_reg.predict(housing)
housing_predictions[:5].round(-2)

array([434800., 475200., 104100.,  99600., 377100.])

In [None]:
from sklearn.metrics import mean_squared_error

lin_rmse = mean_squared_error(housing_labels, housing_predictions,
                              squared = False)

lin_rmse # 결정트리, 랜덤트리 등등 다 값이 엉터리다

17474.619286483998

In [None]:
from sklearn.model_selection import cross_val_score

lin_rmses = -cross_val_score(lin_reg, housing,housing_labels,
                             scoring = 'neg_root_mean_squared_error', cv = 10)

pd.Series(lin_rmses).describe()
#rmse를 타겟으로 하여 교차검증 score 출력

In [None]:
"""더 나은 하이퍼 파라미터를 검색하기 위한 Grid Search"""

from sklearn.model_selection import GridSearchCV

# 전처리부터 훈련까지 한번에 하는 파이프라인
full_pipeline = Pipeline([
    ("preprocessing", preprocessing),
    ("random_forest", RandomForestRegressor(random_state = 42)),
])

""" 다음과 같이 파이프라인이나 ColumnTransformer가
추정기를 감싸고 있더라도, 그 안의 하이퍼파라미터에 접근할 수 있다.
경로 구분은 __ 로 한다즉 preprocessing__geo__n_clusters는
preprocessing안에 있는 geo 안에 있는 n_clusters 파라미터를 수정하는 것이다.
"""
param_grid = [
    {'preprocessing__geo__n_clusters' : [5, 8, 10],
     'random_forest__max_features' : [4, 6, 8]},
    {'preprocessing__geo_-n_clusters' : [10, 15],
     'random_forest__max_features' : [6, 8, 10]},
]

grid_search = GridSearchCV(full_pipeline, param_grid, cv = 3,
                           scoring = "neg_root_mean_squared_error")
grid_search.fit(housing, housing_labels)

In [None]:
# 최상의 파라미터
#grid_search.best_params_

cv_res = pd.DataFrame(grid_search.cv_results_)
cv_res.sort_values(by="mean_test_score", ascending=False, inplace=True)

# 추가 코드 – 데이터프레임을 깔끔하게 출력하기 위한 코드
cv_res = cv_res[["param_preprocessing__geo__n_clusters",
                 "param_random_forest__max_features", "split0_test_score",
                 "split1_test_score", "split2_test_score", "mean_test_score"]]
score_cols = ["split0", "split1", "split2", "mean_test_rmse"]
cv_res.columns = ["n_clusters", "max_features"] + score_cols
cv_res[score_cols] = -cv_res[score_cols].round().astype(np.int64)

cv_res.head()

In [None]:
# 그리드 서치의 훈련 시간을 단축하기 위한 랜덤 그리드 서치
from sklearn.model_selection import RandomizedSearchCV
from sklearn.feature_selection import SelectFromModel
from scipy.stats import randint

full_pipeline = Pipeline([
    ("preprocessing", preprocessing),
    ("random_forest", RandomForestRegressor(random_state = 42)),
])

params_distribs = {'preprocessing__geo__n_clusters' : randint(low = 3, high = 50),
                   'random_forest__max_features': randint(low = 2, high = 20)}

rnd_search = RandomizedSearchCV(
    full_pipeline, param_distributions = params_distribs, n_iter = 10,
    cv = 3, scoring = 'neg_root_mean_squared_error', random_state = 42)


rnd_search.fit(housing, housing_labels)

"""
하이퍼파라미터를 위한 샘플링 분포 선택 방법

scipy.stats.randint(a, b+1) : a~b 사이의 이산적인 값을 가진 하이퍼파라미터.
                              이 범위의 모든 값은 동일한 확률 가집니다.

scipy.stats.uniform(a, b)   : 매우 비슷하지만 연속적인 파라미터에 사용합니다.

scipy.stats.geom(1 / scale) : 이산적인 값의 경우 주어진 스케일 안에서 샘플링하고 싶을 때 사용합니다.
                              예를 들어 scale=1000인 경우 대부분의 샘플은 이 범주 안에 있지만
                              모든 샘플 중 10% 정도는 100보다 작고, 10% 정도는 2300보다 큽니다.

scipy.stats.expon(scale)    : geom의 연속적인 버전입니다. scale을 가장 많이 등장할 값으로 지정합니다.

scipy.stats.loguniform(a, b): 하이퍼파라미터 값의 스케일을 어떻게 지정할지 모를 때 사용합니다.
                              a=0.01, b=100으로 지정하면 0.01과 0.1 사이의 샘플링과
                              10과 100 사이의 샘플링 비율이 동일합니다.
"""


In [None]:
cv_res = pd.DataFrame(rnd_search.cv_results_)
cv_res.sort_values(by="mean_test_score", ascending=False, inplace=True)

# 추가 코드 – 데이터프레임을 깔끔하게 출력하기 위한 코드
cv_res = cv_res[["param_preprocessing__geo__n_clusters",
                 "param_random_forest__max_features", "split0_test_score",
                 "split1_test_score", "split2_test_score", "mean_test_score"]]
score_cols = ["split0", "split1", "split2", "mean_test_rmse"]
cv_res.columns = ["n_clusters", "max_features"] + score_cols
cv_res[score_cols] = -cv_res[score_cols].round().astype(np.int64)

cv_res.head()

In [None]:
""" RandomForestRegressor(Tree 모델)는 모델을 분석하고 특성의 중요도를 알려줌
이렇게 중요도를 표시하면 데이터를 파악하기 위한 insight를 얻을 수 있고
불필요한 특성을 제거할 수도 있음
실제로 sklearn.feature_selection.SelectFromModel은 자동으로 덜 유용한
특성을 제거해줌
select_model = SelectFromModel(rnd_search) -> 모델 이름
select_model.fit(housing, housing_labels) 처럼 쓰면 되고
transform() 메서드를 호출할 때 특성이 삭제됨
->>  SelectionFromModel(rnd_search).fit_transform(housing, housing_labels)"""

# 특성 중요도를 표시
final_model = rnd_search.best_estimator_
feature_importances = final_model["random_forest"].feature_importances_
feature_importances.round(2)

# 중요도를 내림차순으로 정렬하고 특성 이름과 함께 표시
sorted(zip(feature_importances,
           final_model["preprocessing"].get_feature_names_out()),
           reverse = True)

In [None]:
# 마지막 테스트 평가

X_test = strat_test_set.drop("median_house_value", axis = 1)
y_test = strat_test_set["median_house_value"].copy()

final_predictions = final_model.predict(X_test)

final_rmse = mean_squared_error(y_test, final_predictions, squared = False)
print(final_rmse) # 41424.40026462184

In [None]:
# 마지막 95 % 신뢰구간 평가


from scipy import stats

"""
# z-점수를 사용해 신뢰 구간 계산하기
zscore = stats.norm.ppf((1 + confidence) / 2)
zmargin = zscore * squared_errors.std(ddof=1) / np.sqrt(m)
np.sqrt(mean - zmargin), np.sqrt(mean + zmargin)
"""

confidence = 0.95
squared_errors = (final_predictions - y_test) ** 2
np.sqrt(stats.t.interval(confidence, len(squared_errors) - 1,
                         loc = squared_errors.mean(),
                         scale = stats.sem(squared_errors)))


array([39275.40861216, 43467.27680583])

### 모델 배포 단계

In [None]:
import joblib

joblib.dump(final_model, "my_california_housing_model.pkl") # 모델 저장

['my_california_housing_model.pkl']

In [None]:
""" 모델을 불러와서 재사용하려면 모델이 사용했던 함수와 클래스를 먼저 임포트 해야한다."""
from sklearn.cluster import KMeans
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics.pairwise import rbf_kernel

def column_ratio(X):
    return X[:, [0]] / X[:, [1]]

def ratio_name(function_transformer, feature_names_in):
    return ["ratio"] # get_feature_names_out에 사용

class ClusterSimilarity(BaseEstimator, TransformerMixin):
    def __init__(self, n_clusters=10, gamma=1.0, random_state=None):
        self.n_clusters = n_clusters
        self.gamma = gamma
        self.random_state = random_state

    def fit(self, X, y=None, sample_weight=None):
        self.kmeans_ = KMeans(self.n_clusters, n_init=10, random_state=self.random_state)
        self.kmeans_.fit(X, sample_weight=sample_weight)
        return self  # 항상 self를 반환

    def transform(self, X):
        return rbf_kernel(X, self.kmeans_.cluster_centers_, gamma=self.gamma)

    def get_feature_names_out(self, names=None):
        return [f"클러스터 {i} 유사도" for i in range(self.n_clusters)]

final_model_reloaded = joblib.load("my_california_housing_model.pkl")

new_data = housing.iloc[:5]  # 새로운 구역이라 가정
predictions = final_model_reloaded.predict(new_data)

[442737.15 457566.06 105965.    98462.   332992.01]


# 연습 문제

연습 문제 1, 2

In [None]:
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV
from sklearn.svm import SVR
from scipy.stats import randint, uniform, expon, loguniform
from sklearn.model_selection import GridSearchCV

# 노트: 커널이 "linear"일 때 gamma는 무시됨.
""" 일반 그리드 서치 방법
param_grid = [
        {'svr__kernel': ['linear'], 'svr__C': [10., 30., 100., 300., 1000.,
                                               3000., 10000., 30000.0]},
        {'svr__kernel': ['rbf'], 'svr__C': [1.0, 3.0, 10., 30., 100., 300.,
                                            1000.0],
         'svr__gamma': [0.01, 0.03, 0.1, 0.3, 1.0, 3.0]},
    ]
"""

param_grid = {
        'svr__kernel': ['linear', 'rbf'],
        'svr__C': loguniform(20, 200_000),
        'svr__gamma': expon(scale=1.0),
    }

rnd_search = RandomizedSearchCV(svr_pipeline,
                                param_distributions=param_distribs,
                                n_iter=50, cv=3,
                                scoring='neg_root_mean_squared_error',
                                random_state=42)
rnd_search.fit(housing.iloc[:5000], housing_labels.iloc[:5000])

연습 문제 3

In [None]:
""" 중요!
다음 코드는 전처리 라인, 모델을 훈련하고 필요없는 특성을 제거하는 라인과
SVR 모델등  2개의 모델을 사용하는 방법으로 구현되어 있어 유용한 예시임.
"""

from sklearn.feature_selection import SelectFromModel

seletor_pipeline = Pipeline([
    ('preprocessing', preprocessing),
    ('selector', SelectFromModel(RandomForestRegressor(random_state = 42),
                                 threshold = 0.005)),
    ('svr', SVR(C = rnd_search.best_params_["svr__C"],
                gamma = rnd_search.best_params_["svr__gamma"],
                kernel = rnd_search.best_params_["svr__kernel"])),
])

"""
rnd_search.best_params_
{'svr__C': 157055.10989448498,
 'svr__gamma': 0.26497040005002437,
 'svr__kernel': 'rbf'}
"""


In [None]:
from sklearn.model_selection import cross_val_score

svr_rmses = -cross_val_score(seletor_pipeline,
                             housing.iloc[:5000],
                             housing_labels.iloc[:5000],
                             scoring = 'neg_root_mean_squared_error',
                             cv = 3)

pd.Series(svr_rmses).describe()

연습 문제 4

In [None]:
""" transform 메서드에서 예측을 출력하는 사용자 정의 변환기
어떤 회귀 모델도 사용할 수 있는 버전
MetaEstimatorMixin = 여러 개 모델 조합/get_params, set_params, fit, predict 제공
BaseEstimator = 파이프라인과 그리드 서치를 위한 get_params, set_params 제공
TransformerMixin = fit, transform, fit_transfrom 제공
"""

from sklearn.neighbors import KNeighborsRegressor
from sklearn.base import MetaEstimatorMixin,BaseEstimator,TransformerMixin,clone
from sklearn.utils.validation import check_is_fitted

class FeatureFromRegressor(MetaEstimatorMixin, BaseEstimator, TransformerMixin):
    def __init__(self, estimator): # 추정기 모델 받기
        self.estimator = estimator

    def fit(self, X, y = None): #y값이 없어도 None으로 써야함
        estimator_ = clone(self.estimator) #unfitting model
        estimator_.fit(X,y)
        self.estimator_ = estimator_
        self.n_features_in_ = self.estimator_.n_features_in_
        if hasattr(self.estimator, "feature_names_in_"):
            #  메타 추정기가 활용할 특징 이름 정보를 제공
            self.feature_names_in_ = self.estimator.feature_names_in_
        return self # 항상 self를 반환할 것.

    def transform(self, X):
        check_is_fitted(self)  # 훈련되었는지 확인
        predictions = self.estimator_.predict(X)
        if predictions.ndim == 1: # sklearn의 변환기,추정기는 2차원 입력을 기대
            predictions = predictions.reshape(-1, 1)
        return predictions

    def get_feature_names_out(self, names = None):
        check_is_fitted(self)
        n_outputs = getattr(self.estimator_, "n_outputs_", 1)
        estimator_class_name = self.estimator_.__class__.__name__
        estimator_short_name = estimator_class_name.lower().replace("_", "")
        return [f"{estimator_short_name}_prediction_{i}"
                for i in range(n_outputs)]

In [None]:
knn_reg = KNeighborsRegressor(n_neighbors=3, weights="distance")
knn_transformer = FeatureFromRegressor(knn_reg) # 이 모델은 transform할때 prediction을 출력
geo_features = housing[["latitude", "longitude"]] # 변환기의 입력으로 경도와 위도 사용
knn_transformer.fit_transform(geo_features, housing_labels)
"""
array([[442766.66666667],
       [483800.        ],
       [101400.        ],
       ...,
       [109800.        ],
       [163200.        ],
       [160866.66666667]])
"""

# 변환기 이름 출력시, 사용한 모델 이름 나옴
knn_transformer.get_feature_names_out()
"""
['kneighborsregressor_prediction_0']
"""

In [None]:
"""
위에서 만든 knn.transformer 변환기를 전처리 파이프라인에 추가해보기
기존 파이프 라인
preprocessing = ColumnTransformer([
       # 이름       #적용할 변환기(파이프라인)   # 적용할 레이블 리스트
    ("bedrooms", ratio_pipeline(), ["total_bedrooms", "total_rooms"]),
    ("room_per_house", ratio_pipeline(), ["total_rooms", "households"]),
    ("people_per_house", ratio_pipeline(), ["population", "households"]),
    ("log", log_pipeline, ["total_bedrooms", "total_rooms", "population",
                             "households", "median_income"]),
    ("geo", cluster_simil, ["latitude", "longitude"]),
    ("cat", cat_pipeline, make_column_selector(dtype_include = object)),
],
remainder = default_num_pipeline)
"""
from sklearn.base import clone

# 1. 기존 전처리 파이프라인을 리스트로 불러온다
transformers = [(name, clone(transformer), columns)\
                for name, transformer, columns in preprocessing.transformers]
# 2. 위도,경도를 입력으로 받으니 geo 라인에 만든 변환기를 추가한다.
# 2-1 먼저 geo 인덱스를 찾고..
geo_index = [name for name, _, _ in transformers].index("geo")
# 2-2 거기에 내가 만든 변환기를 추가한다.
transformers[geo_index] = ("geo", knn_transformer, ["latitude", "longitude"])

new_preprocessing = ColumnTransformer(transformers)

In [None]:
new_pipeline = Pipeline([
    ('preprocessing', new_preprocessing),
    ('svr', SVR(C=rnd_search.best_params_["svr__C"],
                gamma=rnd_search.best_params_["svr__gamma"],
                kernel=rnd_search.best_params_["svr__kernel"])),
])

In [None]:
new_pipe_rmses = -cross_val_score(new_pipeline,
                                  housing.iloc[:5000],
                                  housing_labels.iloc[:5000],
                                  scoring="neg_root_mean_squared_error",
                                  cv=3)
pd.Series(new_pipe_rmses).describe()
# 성능은 더 안좋음. mean : 104866

연습문제 5\
문제: GridSearchCV를 사용해 준비 단계의 옵션을 자동으로 탐색해보세요.

In [None]:
from sklearn.model_selection import RandomizedSearchCV

param_distribs = {
    "preprocessing__geo__estimator__n_neighbors " : range(1, 30),
    "preprocessing__geo__estimator__weights" : ["distance" ,"uniform"],
    "svr__C" : loguniform(20, 200_000),
    "svr__gamma" : expon(scale = 1.0),
}

new_geo_rnd_search = RandomizedSearchCV(new_pipeline,
                                        param_distributions = param_distribs,
                                        n_iter = 50,
                                        cv = 3,
                                        scoring = 'neg_root_mean_squared_error',
                                        random_state = 42)

new_geo_rnd_search.fit(housing.iloc[:5000], housing_labels.iloc[:5000])

In [None]:
new_geo_rnd_search_rmse = -new_geo_rnd_search.best_score_
new_geo_rnd_search_rmse
# 이것도 점수 안좋음.

연습문제 6\
StandardScalerClone 클래스 구현

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_array, check_is_fitted

class StandardScalerClone(BaseEstimator, TransformerMixin):
    def __init__(self, with_mean=True):  # *args나 **kwargs를 사용하지 않습니다!
        self.with_mean = with_mean

    def fit(self, X, y=None):  # y는 사용하지 않더라고 필수입니다
        X_orig = X
        X = check_array(X)  # X가 부동소수 배열인지 확인합니다
        self.mean_ = X.mean(axis=0)
        self.scale_ = X.std(axis=0)
        self.n_features_in_ = X.shape[1]  # 모든 추정기는 fit()에서 이를 저장해야 합니다
        if hasattr(X_orig, "columns"):
            self.feature_names_in_ = np.array(X_orig.columns, dtype=object)
        return self  # 항상 self를 반환합니다!

    def transform(self, X):
        check_is_fitted(self)  #  (_가 붙은) 학습된 속성을 확인합니다
        X = check_array(X)
        if self.n_features_in_ != X.shape[1]:
            raise ValueError("Unexpected number of features")
        if self.with_mean:
            X = X - self.mean_
        return X / self.scale_

    def inverse_transform(self, X):
        check_is_fitted(self)
        X = check_array(X)
        if self.n_features_in_ != X.shape[1]:
            raise ValueError("Unexpected number of features")
        X = X * self.scale_
        return X + self.mean_ if self.with_mean else X

    def get_feature_names_out(self, input_features=None):
        if input_features is None:
            return getattr(self, "feature_names_in_",
                           [f"x{i}" for i in range(self.n_features_in_)])
        else:
            if len(input_features) != self.n_features_in_:
                raise ValueError("Invalid number of features")
            if hasattr(self, "feature_names_in_") and not np.all(
                self.feature_names_in_ == input_features
            ):
                raise ValueError("input_features ≠ feature_names_in_")
            return input_features

In [None]:
from sklearn.utils.estimator_checks import check_estimator

check_estimator(StandardScalerClone)

In [None]:
np.random.seed(42)
X = np.random.rand(1000, 3)

scaler = StandardScalerClone()
X_scaled = scaler.fit_transform(X)

assert np.allclose(X_scaled, (X - X.mean(axis=0)) / X.std(axis=0))

In [None]:
df = pd.DataFrame({"a": np.random.rand(100), "b": np.random.rand(100)})
scaler = StandardScalerClone()
X_scaled = scaler.fit_transform(df)

assert np.all(scaler.feature_names_in_ == ["a", "b"])
assert np.all(scaler.get_feature_names_out() == ["a", "b"])