<a href="https://colab.research.google.com/github/Nungdoo/deep-learning-study/blob/main/exercise05_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# k-폴드 교차 검증 구현

In [2]:
# 데이터 세팅
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

cancer = load_breast_cancer()

x = cancer.data
y = cancer.target
x_train_all, x_test, y_train_all, y_test = train_test_split(x, y, stratify=y, test_size=0.2, random_state=42)

x_train, x_val, y_train, y_val = train_test_split(x_train_all, y_train_all, stratify=y_train_all, test_size=0.2, random_state=42)

train_mean = np.mean(x_train, axis=0)
train_std = np.std(x_train, axis=0)
x_train_scaled = (x_train - train_mean) / train_std

val_mean = np.mean(x_val, axis=0)
val_std = np.std(x_val, axis=0)
x_val_scaled = (x_val - val_mean) / val_std

In [3]:
class SingleLayer:

  def __init__(self, learning_rate=0.1, l1=0, l2=0):
    self.w = None
    self.b = None
    self.losses = []
    self.val_losses = []              # 검증 손실을 기록하기 위한 변수 추가
    self.w_history = []
    self.lr = learning_rate
    self.l1 = l1
    self.l2 = l2

  def forpass(self, x):
    z = np.sum(x * self.w) + self.b   # 직선 방정식을 계산합니다.
    return z

  def backprop(self, x, err):
    w_grad = x * err  # 가중치에 대한 그레이디언트를 계산합니다.
    b_grad = 1 * err  # 절편에 대한 그레이디언트를 계산합니다.
    return w_grad, b_grad

  def activation(self, z):
    z = np.clip(z, -100, None)  # 안전한 np.exp() 계산을 위해
    a = 1 / (1 + np.exp(-z))    # 시그모이드 계산
    return a

  def fit(self, x, y, epochs=100, x_val=None, y_val=None):
    self.w = np.ones(x.shape[1])
    self.b = 0
    self.w_history.append(self.w.copy())                  # 가중치 기록
    np.random.seed(42)
    for i in range(epochs):
      loss = 0
      indexes = np.random.permutation(np.arange(len(x)))  # 인덱스를 섞음
      for i in indexes:
        z = self.forpass(x[i])                            # 정방향 계산
        a = self.activation(z)                            # 활성화 함수 적용
        err = -(y[i] - a)                                 # 오차 계산
        w_grad, b_grad = self.backprop(x[i], err)         # 역방향 계산
        # 그레이디언트에서 페널티 항의 미분값을 더합니다.
        w_grad += self.l1 * np.sign(self.w) + self.l2 * self.w
        self.w -= self.lr * w_grad                        # 가중치 업데이트(학습률 적용)
        self.b -= b_grad
        self.w_history.append(self.w.copy())              # 가중치 기록
        # 안전한 로그 계산을 위해 클리핑한 후 손실을 누적함
        a = np.clip(a, 1e-10, 1-1e-10)
        loss += -(y[i]*np.log(a)+(1-y[i])*np.log(1-a))
      # 에포크마다 평균 손실을 저장
      self.losses.append(loss/len(y) + self.reg_loss())
      # 검증 세트에 대한 손실을 계산
      self.update_val_loss(x_val, y_val)

  def predict(self, x):
    z = [self.forpass(x_i) for x_i in x]
    # a = self.activation(np.array(z))
    # return a > 0.5
    return np.array(z) > 0

  def score(self, x, y):
    return np.mean(self.predict(x) == y)

  def update_val_loss(self, x_val, y_val):
    if x_val is None:
      return
    val_loss = 0
    for i in range(len(x_val)):
      z = self.forpass(x_val[i])    # 정방향 계산
      a = self.activation(z)        # 활성화 함수 적용
      a = np.clip(a, 1e-10, 1-1e-10)
      val_loss += -(y_val[i]*np.log(a)+(1-y_val[i])*np.log(1-a))
    self.val_losses.append(val_loss/len(y_val) + self.reg_loss())

  def reg_loss(self):
    return self.l1 * np.sum(np.abs(self.w)) + self.l2 / 2 * np.sum(self.w**2)

폴드를 나눈 후 훈련 데이터의 표준화 전처리 진행

그렇지 않으면, 검증 폴드의 정보가 누설됨

In [5]:
validation_scores = []

k = 10
bins = len(x_train_all) // k

for i in range(k):
  start = i * bins
  end = (i+1) * bins
  val_fold = x_train_all[start:end]
  val_target = y_train_all[start:end]

  train_index = list(range(0, start)) + list(range(end, len(x_train_all)))
  train_fold = x_train_all[train_index]
  train_target = y_train_all[train_index]

  train_mean = np.mean(train_fold, axis=0)
  train_std = np.std(train_fold, axis=0)
  train_fold_scaled = (train_fold - train_mean) / train_std
  val_fold_scaled = (val_fold - train_mean) / train_std

  lyr = SingleLayer(l2=0.01)
  lyr.fit(train_fold_scaled, train_target, epochs=50)
  score = lyr.score(val_fold_scaled, val_target)
  validation_scores.append(score)

print(np.mean(validation_scores))

0.9711111111111113


# 사이킷런으로 교차 검증

cross_validate 의 매개변수로 검증하려는 모델, 훈련 데이터, 타깃 데이터, 폴드 수 전달

In [7]:
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import cross_validate

sgd = SGDClassifier(loss='log', penalty='l2', alpha=0.001, random_state=42)
scores = cross_validate(sgd, x_train_all, y_train_all, cv=10)
print(np.mean(scores['test_score']))

0.850096618357488


전처리 단계를 포함해 교차 검증을 해야함

사이킷런에서 검증 폴드가 전처리 단계에서 누설되지 않도록, 전처리 단계와 모델 클래스를 하나로 연결해 주는 Pipeline 클래스를 제공함

In [9]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

# 표준화 전처리 단계(StandardScaler)와 모델을 PipeLine 클래스로 감쌈
# PipeLine 객체 리턴
pipe = make_pipeline(StandardScaler(), sgd)

# cross_validate은 pipe을 훈련 폴드와 검증 폴드로 나누고
# 표준화 전처리 단계(StandardScaler)와 모델은 PipeLine에서 호출하여 검증 폴드가 누설되지 않음
scores = cross_validate(pipe, x_train_all, y_train_all, cv=10, return_train_score=True)
print(np.mean(scores['test_score']))
print(np.mean(scores['train_score']))

0.9694202898550724
0.9875478561631581
