## 1주차 (10/31): 프로젝트 주제 선정 및 데이터셋 선택
### 프로젝트 주제: 보스턴 주택 가격 예측
이 프로젝트는 보스턴 주택 가격을 예측하는 회귀 모델을 개발하는 것을 목표로 합니다.

### 데이터셋 설명
- **출처**: 보스턴 주택 가격 데이터
- **타겟 변수**: `medv` (주택 가격)
- **특성**: 범죄율, 방 개수, 찰스강 근처 여부, 세율 등 다양한 지표

## 2주차 (11/7): 데이터셋 탐색 및 전처리
데이터를 불러오고, 기본적인 탐색적 데이터 분석(EDA)을 수행하며, 필요한 전처리를 진행합니다.

## 3주차 (11/14): 모델 선택 및 초기 설계
회귀 문제에 적합한 모델을 선택하고 초기 설계를 진행합니다.

## 4주차 (11/21): 특성 엔지니어링 및 모델 튜닝
특성 엔지니어링과 하이퍼파라미터 튜닝을 통해 모델 성능을 개선합니다.

## 5주차 (11/28): 최종 튜닝 및 모델 결정
최종 하이퍼파라미터 튜닝을 수행하고 최적 모델을 결정합니다.

## 6주차 (12/5): 프로젝트 보고서 준비 및 발표 자료 제작
모델 성능 지표와 시각화를 포함한 보고서를 준비하고 발표 자료를 제작합니다.

## 7주차 (12/12): 프로젝트 보고서 제출 및 발표
최종 보고서를 제출하고 프로젝트 결과를 발표합니다.

In [None]:
# Pyhthon 3.7 모델 설치
import sys

assert sys.version_info >= (3, 7)

In [None]:
!pip install tensorflow

from packaging import version
import tensorflow as tf

assert version.parse(tf.__version__) >= version.parse("2.8.0")

In [None]:
import sys
# google colab의 경우 나눔 폰트를 설치
if 'google.colab' in sys.modules:
    !sudo apt-get -qq -y install fonts-nanum
    import matplotlib.font_manager as fm
    font_files = fm.findSystemFonts(fontpaths=['/usr/share/fonts/truetype/nanum'])
    for fpath in font_files:
        fm.fontManager.addfont(fpath)

# 나눔 폰트를 사용
import matplotlib

matplotlib.rc('font', family='NanumBarunGothic')
matplotlib.rcParams['axes.unicode_minus'] = False

In [None]:
# 모델 튜닝을 위해 keras-tuner를 설치
!pip install keras-tuner

In [None]:
import sys

if 'google.colab' in sys.modules:
    from google.colab import files
    # Colab에서 필요한 추가 코드 (예: 파일 업로드)
    uploaded = files.upload()
else:
    print("This code is not running in Google Colab.")


In [None]:
import pandas as pd

df = pd.read_csv("datasets/BostonHousing.csv")
X = df.iloc[:, :-1].values  # 입력 특성
y = df.iloc[:, -1].values  # 출력 값

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import keras_tuner as kt
from tensorflow.keras import layers
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# 데이터 로드
df = pd.read_csv("datasets/BostonHousing.csv")
X = df.iloc[:, :-1].values  # 입력 특성
y = df.iloc[:, -1].values  # 출력 값

# 데이터 분리
X_train_full, X_test, y_train_full, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, test_size=0.2, random_state=42)

# 데이터 스케일링
scaler = StandardScaler()
X_train_scaled = np.nan_to_num(scaler.fit_transform(X_train))
X_valid_scaled = np.nan_to_num(scaler.transform(X_valid))
X_test_scaled = np.nan_to_num(scaler.transform(X_test))
y_train = np.nan_to_num(y_train)
y_valid = np.nan_to_num(y_valid)
y_test = np.nan_to_num(y_test)

# 사용자 정의 HuberLoss 함수
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# 모델 구조 정의
def build_model(hp):
    n_hidden = hp.Int("n_hidden", min_value=0, max_value=8, default=2)
    n_neurons = hp.Int("n_neurons", min_value=16, max_value=256, step=16)
    learning_rate = hp.Float("learning_rate", min_value=1e-5, max_value=1e-3, sampling="log")
    optimizer = hp.Choice("optimizer", values=["adam", "sgd"])

    if optimizer == "sgd":
        optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)
    else:
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Flatten(input_shape=(X_train_scaled.shape[1],)))
    for _ in range(n_hidden):
        model.add(tf.keras.layers.Dense(n_neurons, activation="relu", kernel_initializer="he_normal"))
    model.add(tf.keras.layers.Dense(1))

    loss = HuberLoss(threshold=hp.Float("threshold", min_value=0.5, max_value=5.0, step=0.5))
    model.compile(loss=loss, optimizer=optimizer, metrics=["mae"])
    return model

# 이 문단부터 하이퍼 파라미터
# Keras Tuner 설정
tuner = kt.Hyperband(
    build_model,
    objective="val_mae",
    max_epochs=20,
    factor=3,
    directory="tuner_dir",
    project_name="huber_loss_tuning",
    max_consecutive_failed_trials=10
)

# 하이퍼파라미터 검색 실행
tuner.search(
    X_train_scaled, y_train,
    validation_data=(X_valid_scaled, y_valid),
    verbose=2
)

# 최적의 하이퍼파라미터로 모델 재설정 및 훈련
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
model = build_model(best_hps)
history = model.fit(X_train_scaled, y_train, validation_data=(X_valid_scaled, y_valid), epochs=20)

# 모델 평가
test_loss, test_mae = model.evaluate(X_test_scaled, y_test)

# 모델 저장
model.save("final_model.keras")

# 최종 평가 결과 출력
print(f"Test loss: {test_loss}")
print(f"Test MAE: {test_mae}")


In [None]:
results = {"Metric": ["Test Loss", "Test MAE"], "Value": [test_loss, test_mae]}
df = pd.DataFrame(results)
df.to_csv("results.csv", index=False)

# CSV 다운로드
files.download("results.csv")

In [None]:
print("Train Datas Shape : {}".format(train_data.shape))
print("Train Labels Shape : {}".format(train_targets.shape))

In [None]:
display(train_data[0])
display(train_targets[0:10])

In [None]:
mean = train_data.mean(axis=0)
train_data -= mean
std = train_data.std(axis=0)
train_data /= std

test_data -= mean
test_data /= std


In [None]:
from tensorflow.keras import models
from tensorflow.keras import layers

def build_network(input_shape=(0,)):
    model = models.Sequential()
    model.add(layers.Dense(64, activation='relu', input_shape=input_shape))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(1))
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    return model

In [None]:
import numpy as np

k = 4
num_val_samples = len(train_data) // k
num_epochs = 150
all_scores = []
all_history = []
for i in range(k):
    print('폴드 번호 #{}'.format(i))
    fold_start_index = i * num_val_samples
    fold_end_index = (i + 1) * num_val_samples

    val_data = train_data[fold_start_index : fold_end_index]
    val_targets = train_targets[fold_start_index : fold_end_index]

    partial_train_data = np.concatenate(
        [train_data[:fold_start_index], train_data[fold_end_index:]],
        axis=0
    )

    partial_train_targets = np.concatenate(
        [train_targets[:fold_start_index], train_targets[fold_end_index:]],
        axis=0
    )

    model = build_network((partial_train_data.shape[1], ))
    history = model.fit(
        partial_train_data,
        partial_train_targets,
        epochs=num_epochs,
        validation_data=(val_data, val_targets),
        batch_size=1,
        verbose=0
    )
    val_mse, val_mae = model.evaluate(val_data, val_targets, verbose=0)
    all_scores.append(val_mse)
    all_history.append(history.history)

In [None]:
val_mae_lst = [hist['val_mae'] for hist in all_history]
val_mae_lst = np.array(list(val_mae_lst))
avg_mae = [
    np.mean([x[i] for x in val_mae_lst]) for i in range(num_epochs)
]

In [None]:
val_mae_lst = [hist['val_loss'] for hist in all_history]
val_mae_lst = np.array(list(val_mae_lst))
avg_mae = [
    np.mean([x[i] for x in val_mae_lst]) for i in range(num_epochs)
]

In [None]:
import matplotlib.pyplot as plt
def smooth_curve(points, factor=.9):
    smoothed_points = []
    for point in points:
        if smoothed_points:
            previous = smoothed_points[-1]
            smoothed_points.append(previous * factor + point * (1 - factor))
        else:
            smoothed_points.append(point)
    return smoothed_points

In [None]:
def show_graph(data):
    smooth_data = smooth_curve(data)
    plt.plot(range(1, len(smooth_data) + 1), smooth_data)
    plt.xlabel('Epochs')
    plt.ylabel('Validation MAE')
    plt.show()

In [None]:
show_graph(avg_mae[10:])

키 설정


In [None]:
# all_history 내 각 요소의 키를 확인
for i, hist in enumerate(all_history):
    print(f"Fold {i} history keys:", hist.keys())


In [None]:
for i, hist in enumerate(all_history):
    print(f"Fold {i} keys: {list(hist.keys())}")


In [None]:
val_mae_lst = [
    hist['val_mae'] for hist in all_history if 'val_mae' in hist
]


In [None]:
for i, hist in enumerate(all_history):
    print(f"Fold {i} history:", hist)


In [None]:
# 각 폴드의 history에서 키 확인
for i, hist in enumerate(all_history):
    print(f"Fold {i} history keys: {list(hist.keys())}")


In [None]:
# val_mae_lst 추출
val_mae_lst = [
    hist['val_mae'] for hist in all_history if 'val_mae' in hist
]

# numpy 배열로 변환
val_mae_lst = np.array(val_mae_lst)

# 에포크별 평균 MAE 계산
avg_mae = [
    np.mean(val_mae_lst[:, i]) for i in range(val_mae_lst.shape[1])
]

print("에포크별 평균 MAE:", avg_mae)


In [None]:
for i, hist in enumerate(all_history):
    print(f"Fold {i} history data:", hist)


In [None]:
model.compile(
    optimizer='adam',
    loss='mse',
    metrics=['mean_absolute_error']  # 또는 ['mae']
)


In [None]:
# 모든 폴드의 history 데이터 확인
for i, hist in enumerate(all_history):
    print(f"Fold {i} history:")
    for key, value in hist.items():
        print(f"  {key}: {value[:5]}...")  # 처음 5개의 값만 출력


In [None]:
val_mae_lst = [
    hist['val_mae'] for hist in all_history if 'val_mae' in hist
]


In [None]:
val_mae_lst = []
for hist in all_history:
    if 'val_mean_absolute_error' in hist:
        val_mae_lst.append(hist['val_mean_absolute_error'])
    elif 'val_mae' in hist:  # 다른 대체 키 확인
        val_mae_lst.append(hist['val_mae'])
    else:
        print("경고: 이 history에 'val_mean_absolute_error' 또는 'val_mae'가 없습니다.", hist.keys())


In [None]:
model.compile(
    optimizer='adam',
    loss='mse',
    metrics=['mean_absolute_error']  # 또는 'mae'
)


In [None]:
# 모든 history 데이터에서 문제가 있는 항목 확인
for i, hist in enumerate(all_history):
    if not isinstance(hist, dict):
        print(f"Fold {i} 데이터가 잘못되었습니다: {hist}")
    elif 'val_mean_absolute_error' not in hist:
        print(f"Fold {i}에서 'val_mean_absolute_error' 키가 없습니다. 키들: {list(hist.keys())}")


In [None]:
# val_mae_lst를 numpy 배열로 변환
val_mae_lst = np.array(val_mae_lst)

# 에포크별 평균 MAE 계산
avg_mae = [
    np.mean(val_mae_lst[:, i]) for i in range(val_mae_lst.shape[1])
]

print("에포크별 평균 MAE:", avg_mae)


In [None]:
import sys

assert sys.version_info >= (3, 7)

In [None]:
from packaging import version
import tensorflow as tf

assert version.parse(tf.__version__) >= version.parse("2.8.0")

In [None]:
# 코랩의 경우 나눔 폰트를 설치합니다.
if 'google.colab' in sys.modules:
    !sudo apt-get -qq -y install fonts-nanum
    import matplotlib.font_manager as fm
    font_files = fm.findSystemFonts(fontpaths=['/usr/share/fonts/truetype/nanum'])
    for fpath in font_files:
        fm.fontManager.addfont(fpath)

# 나눔 폰트를 사용합니다.
import matplotlib

matplotlib.rc('font', family='NanumBarunGothic')
matplotlib.rcParams['axes.unicode_minus'] = False

In [None]:
!pip install keras-tuner

In [None]:
from google.colab import files
uploaded = files.upload()  # 데이터 파일 업로드


In [None]:
import pandas as pd

df = pd.read_csv("BostonHousingprice.csv")
X = df.iloc[:, :-1].values  # 입력 특성
y = df.iloc[:, -1].values  # 출력 값


In [None]:
import keras_tuner as kt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# 데이터 로드
df = pd.read_csv("BostonHousingprice.csv")
X = df.iloc[:, :-1].values  # 입력 특성
y = df.iloc[:, -1].values  # 출력 값

# 데이터 분리
X_train_full, X_test, y_train_full, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, test_size=0.2, random_state=42)

# 데이터 스케일링
scaler = StandardScaler()
X_train_scaled = np.nan_to_num(scaler.fit_transform(X_train))
X_valid_scaled = np.nan_to_num(scaler.transform(X_valid))
X_test_scaled = np.nan_to_num(scaler.transform(X_test))
y_train = np.nan_to_num(y_train)
y_valid = np.nan_to_num(y_valid)
y_test = np.nan_to_num(y_test)

# 사용자 정의 HuberLoss 함수
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# 모델 구조 정의
def build_model(hp):
    n_hidden = hp.Int("n_hidden", min_value=0, max_value=8, default=2)
    n_neurons = hp.Int("n_neurons", min_value=16, max_value=256, step=16)
    learning_rate = hp.Float("learning_rate", min_value=1e-5, max_value=1e-3, sampling="log")
    optimizer = hp.Choice("optimizer", values=["adam", "sgd"])

    if optimizer == "sgd":
        optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)
    else:
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Flatten(input_shape=(X_train_scaled.shape[1],)))
    for _ in range(n_hidden):
        model.add(tf.keras.layers.Dense(n_neurons, activation="relu", kernel_initializer="he_normal"))
    model.add(tf.keras.layers.Dense(1))

    loss = HuberLoss(threshold=hp.Float("threshold", min_value=0.5, max_value=5.0, step=0.5))
    model.compile(loss=loss, optimizer=optimizer, metrics=["mae"])
    return model

# 이 문단부터 하이퍼 파라미터
# Keras Tuner 설정
tuner = kt.Hyperband(
    build_model,
    objective="val_mae",
    max_epochs=20,
    factor=3,
    directory="tuner_dir",
    project_name="huber_loss_tuning",
    max_consecutive_failed_trials=10
)

# 하이퍼파라미터 검색 실행
tuner.search(
    X_train_scaled, y_train,
    validation_data=(X_valid_scaled, y_valid),
    verbose=2
)

# 최적의 하이퍼파라미터로 모델 재설정 및 훈련
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
model = build_model(best_hps)
history = model.fit(X_train_scaled, y_train, validation_data=(X_valid_scaled, y_valid), epochs=20)

# 모델 평가
test_loss, test_mae = model.evaluate(X_test_scaled, y_test)

# 모델 저장
model.save("final_model.keras")

# 최종 평가 결과 출력
print(f"Test loss: {test_loss}")
print(f"Test MAE: {test_mae}")


In [None]:

# 저장된 모델 다운로드
files.download("final_model.keras")


In [None]:
results = {"Metric": ["Test Loss", "Test MAE"], "Value": [test_loss, test_mae]}
df = pd.DataFrame(results)
df.to_csv("results.csv", index=False)

# CSV 다운로드
files.download("results.csv")

In [None]:
%matplotlib inline

import seaborn as sns

import warnings
warnings.filterwarnings('ignore')


In [None]:
df = pd.read_csv("https://raw.githubusercontent.com/yoonkt200/FastCampusDataset/master/BostonHousing2.csv")




In [None]:
df.head()

In [None]:
# 그래프 배경 설정
sns.set_style('darkgrid')

In [None]:
# shape (dimension)
df.shape

In [None]:
# 결측치
df.isnull().sum()

In [None]:
# data type
df.info()

In [None]:
# numerical variable
df.describe()

In [None]:
# categorical variable
num_town = df['TOWN'].unique()
print(len(num_town))
num_town


In [None]:
# 기초 통계량
df['CMEDV'].describe()

In [None]:
# 분포
df['CMEDV'].hist(bins=50)

In [None]:
# boxplot - Pandas
df.boxplot(column=['CMEDV'])
plt.show()

In [None]:
# boxplot - matplotlib
plt.boxplot(df['CMEDV'])
plt.show()


In [None]:
# numerical features (except "LON" & "LAT")
numerical_columns = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']

fig = plt.figure(figsize = (16, 20))
ax = fig.gca()  # Axes 생성

df[numerical_columns].hist(ax=ax)
plt.show()

In [None]:
# Person 상관계수
cols = ['CMEDV', 'CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']

corr = df[cols].corr(method = 'pearson')
corr


In [None]:
# heatmap (seaborn)
fig = plt.figure(figsize = (16, 12))
ax = fig.gca()

sns.set(font_scale = 1.5)  # heatmap 안의 font-size 설정
heatmap = sns.heatmap(corr.values, annot = True, fmt='.2f', annot_kws={'size':15},
                      yticklabels = cols, xticklabels = cols, ax=ax, cmap = "RdYlBu")
plt.tight_layout()
plt.show()

In [None]:
# scatter plot
sns.scatterplot(data=df, x='RM', y='CMEDV', markers='o', color='blue', alpha=0.6)
plt.title('Scatter Plot')
plt.show()

In [None]:
# scatter plot
sns.scatterplot(data=df, x='LSTAT', y='CMEDV', markers='o', color='blue', alpha=0.6)
plt.title('Scatter Plot')
plt.show()

In [None]:
# 도시별 데이터 갯수
df['TOWN'].value_counts()

In [None]:
# 도시별 데이터 갯수 (bar plot)
df['TOWN'].value_counts().hist(bins=50)

In [None]:
# 도시별 주택 가격 특징 (boxplot 이용)
fig = plt.figure(figsize = (12, 20))
sns.boxplot(x='CMEDV', y='TOWN', data=df)

In [None]:
# 도시별 범죄율 특징
fig = plt.figure(figsize = (12, 20))
sns.boxplot(x='CRIM', y='TOWN', data=df)

In [None]:
df.head()

In [None]:
df.info()

In [None]:

# feature standardization  (numerical_columns except dummy var.-"CHAS")
scaler = StandardScaler()  # 평균 0, 표준편차 1
scale_columns = ['CRIM', 'ZN', 'INDUS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']
df[scale_columns] = scaler.fit_transform(df[scale_columns])

In [None]:
df.head()

In [None]:
# features for linear regression model
df[numerical_columns].head()

In [None]:

# split dataset into training & test
X = df[numerical_columns]
y = df['CMEDV']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)

In [None]:
X_train.shape, y_train.shape

In [None]:
X_test.shape, y_test.shape

In [None]:
from statsmodels.stats.outliers_influence import variance_inflation_factor

vif = pd.DataFrame()
vif['features'] = X_train.columns
vif["VIF Factor"] = [variance_inflation_factor(X_train.values, i) for i in range(X_train.shape[1])]
vif.round(1)

In [None]:
from sklearn import linear_model

# fit regression model in training set
lr = linear_model.LinearRegression()
model = lr.fit(X_train, y_train)

# predict in test set
pred_test = lr.predict(X_test)

In [None]:
# print coef
print(lr.coef_)

In [None]:
# "feature - coefficients" DataFrame 만들기
coefs = pd.DataFrame(zip(df[numerical_columns].columns, lr.coef_), columns = ['feature', 'coefficients'])
coefs

In [None]:
# 크기 순서로 나열
coefs_new = coefs.reindex(coefs.coefficients.abs().sort_values(ascending=False).index)
coefs_new

In [None]:
## coefficients 시각화

# figure size
plt.figure(figsize = (8, 8))

# bar plot
plt.barh(coefs_new['feature'], coefs_new['coefficients'])
plt.title('"feature - coefficient" Graph')
plt.xlabel('coefficients')
plt.ylabel('features')
plt.show()

In [None]:
import statsmodels.api as sm

X_train2 = sm.add_constant(X_train)
model2 = sm.OLS(y_train, X_train2).fit()
model2.summary()

In [None]:
# 예측 결과 시각화 (test set)
df = pd.DataFrame({'actual': y_test, 'prediction': pred_test})
df = df.sort_values(by='actual').reset_index(drop=True)

plt.figure(figsize=(12, 9))
plt.scatter(df.index, df['prediction'], marker='x', color='r')
plt.scatter(df.index, df['actual'], alpha=0.7, marker='o', color='black')
plt.title("Prediction Result in Test Set", fontsize=20)
plt.legend(['prediction', 'actual'], fontsize=12)
plt.show()

In [None]:
# R square
print(model.score(X_train, y_train))  # training set
print(model.score(X_test, y_test))  # test set

In [None]:
# RMSE
from sklearn.metrics import mean_squared_error
from math import sqrt

# training set
pred_train = lr.predict(X_train)
print(sqrt(mean_squared_error(y_train, pred_train)))

# test set
print(sqrt(mean_squared_error(y_test, pred_test)))