#**Cross Validation: 교차 검증**

훈련세트, 검증세트, 테스트세트 등으로 샘플을 분리하느라   
훈련세트와 검증세트의 샘플 개수가 줄어 모델을 훈련시킬 데이터가 부족해지는 경우   
**교차검증**을 이용해볼 수 있다.

In [None]:
# import dependencies

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDClassifier
cancer = load_breast_cancer()
c_data = cancer.data
c_target = cancer.target
train_data_all, test_data, train_target_all, test_target = train_test_split(c_data, c_target, stratify=c_target, test_size=0.2, random_state=42)
train_data, val_data, train_target, val_target = train_test_split(train_data_all, train_target_all, stratify=train_target_all, test_size=0.2, random_state=42)
train_mean = np.mean(train_data, axis=0)
train_std = np.std(train_data, axis=0)
train_data_scaled = (train_data - train_mean) / train_std
val_data_scaled = (val_data - train_mean) / train_std

In [None]:
class SingleLayer:

  def __init__(self, learning_rate=0.1, l1=0, l2=0):
    self.w = None   # 입력데이터의 특성이 많아 가중치와 절편을 미리 초기화하지 않는다.
    self.b = None   # 나중에 입력데이터를 보고 특성 개수에 맞게 결정
    self.losses = []
    self.w_history = []   # 가중치를 저장할 리스트
    self.lr = learning_rate   # 학습률 
    self.val_losses = []    # 검증세트 손실을 기록할 리스트
    self.l1 = l1
    self.l2 = l2

  def forpass(self, x):
    z = np.sum(self.w * x) + self.b   # x와 w는 1차원 배열이므로 np.sum을 이용해 모든 요소를 다 더한다.
    return z


  def backprop(self, x, err):   # 오차역전파 메서드
    w_grad = x * err
    b_grad = err
    return w_grad, b_grad


  def fit(self, x, y, epochs=100, x_val=None, y_val=None):  # 검증세트를 전달받을 x_val, y_val 추가
    self.w = np.ones(x.shape[1])  # 가중치와 절편 초기화
    self.b = 0
    self.w_history.append(self.w.copy())  # 가중치 기록 -> 넘파이 배열(w)을 추가하면 실제값이 추가되는 것이 아닌 배열을 참조하기 때문에 w값이 바뀌면 그 값을 복사하여 추가해주어야 한다.
    np.random.seed(42)
    for i in range(epochs):
      loss = 0
      indexes = np.random.permutation(np.arange(len(x))) # 샘플 개수만큼의 인덱스 섞기
      for i in indexes:
        z = self.forpass(x[i])   # 정방향 계산
        a = self.activation(z)  # 정방향 계산의 결과값인 z를 활성화 함수에 통과
        err =  -(y[i] - a)       # 활성화 함수를 거친 a값으로 오차량 계산
        w_grad, b_grad = self.backprop(x[i], err)  # 오차역전파
        w_grad += self.l1 * np.sign(self.w) + self.l2 * self.w # 그레이디언트에서 패널티 항의 미분값을 더한다.
        self.w -= self.lr * w_grad   # 그레이디언트 업데이트  (학습률 적용)
        self.b -= b_grad
        self.w_history.append(self.w.copy())  # 가중치 기록
        a = np.clip(a, 1e-10, 1-1e-10)
        loss += -(y[i] * np.log(a) + (1 - y[i]) * np.log(1 - a))  # 로지스틱 손실함수 -(ylog(a) - (1 - y)log(1 - a))
      self.losses.append(loss/len(y) + self.reg_loss())
      self.update_val_loss(x_val, y_val)  # 검증세트 손실을 업데이트하는 메서드 호출


  def reg_loss(self):
    return self.l1 * np.sum(np.abs(self.w)) + self.l2 / 2 * np.sum(self.w**2)

  
  def update_val_loss(self, x_val, y_val):
    if x_val is None:
      return
    val_loss = 0
    for i in range(len(x_val)):
      z = self.forpass(x_val[i])
      a = self.activation(z)
      a = np.clip(a, 1e-10, 1-1e-10)
      val_loss += -(y_val[i] * np.log(a) + (1 - y_val[i]) * np.log(1 - a))
    self.val_losses.append(val_loss / len(y_val) + self.reg_loss())


  def activation(self, z):  # 활성화 함수
    z = np.clip(z, -100, None)    # 안전한 계산을 위해 클리핑
    a = 1 / (1 + np.exp(-z))
    return a


  def predict(self, x): # 예측 함수
    z = [self.forpass(x_i) for x_i in x]
    return np.array(z) > 0  # 계단함수


  def score(self, x, y):    # 평가함수
    return np.mean(self.predict(x) == y)

#**K-fold Cross Validation: k-폴드 교차 검증**
전체 데이터셋을 훈련세트와 테스트세트로 8:2 분류한 후,   
다시 훈련세트를 k개의 작은 덩어리(폴드)로 나눈다.   
여기서 총 k 번의 성능 테스트에서 1 개의 폴드를 검증에 사용하고,   
(k - 1) 개의 폴드를 훈련에 사용한다.

기존에는 훈련:테스트=8:2로 나누고,   
훈련:검증=8:2로 나누어, 총 6:2:2의 비율로 나누었지만,   
k폴드 교차검증에서는 검증세트가 훈련세트에 포함되므로   
단순히 전체 데이터셋을 8:2로 한 번만 나눈다.
>따라서 한 번만 나눈 train_data_all, train_target_all을 사용한다.

In [None]:
validation_scores = []  # 각 폴드의 검증 점수를 저장하기 위한 리스트
                        # 이 리스트의 값을 평균하여 최종 검증 점수를 계산한다.

In [None]:
k = 10  # 폴드 개수
bins = len(train_data_all) // k   # 한 폴드에 들어가는 샘플 개수
# 이 bins 변수의 개수만큼 건너뛰며 검증 폴드와 훈련 폴드를 구분한다.

for i in range(k):
  start = i * bins  # 검증폴드 샘플의 시작 인덱스
  end = (i + 1) * bins  # 검증폴드 샘플의 끝 인덱스
  val_fold = train_data_all[start:end]
  val_target = train_target_all[start:end]

  train_index = list(range(0, start)) + list(range(end, len(train_data_all))) # 검증폴드 시작과 끝 범위를 뺀 나머지는 모두 훈련폴드
  train_fold = train_data_all[train_index]
  train_target = train_target_all[train_index]

  train_mean = np.mean(train_fold, axis=0)
  train_std = np.std(train_fold, axis=0)
  train_fold_scaled = (train_fold - train_mean) / train_std
  val_fold_scaled = (val_fold - train_mean) / train_std

  lyr = SingleLayer(l2=0.01)
  lyr.fit(train_fold_scaled, train_target, epochs=50)
  score = lyr.score(val_fold_scaled, val_target)
  validation_scores.append(score)

print(np.mean(validation_scores))


0.9668518518518519


여기서 중요한 점은 '폴드를 나눈 후에 표준화 전처리를 한다'라는 점이다.   
만약 폴드를 나누기 전에 전체 훈련세트를 전처리한다면 검증 폴드의 정보를 누설하게 되는 셈이다.

#**사이킷런으로 교차검증하기**

사이킷런의 model_selection 모듈에는 교차검증을 위한 cross_validate() 함수가 있다.   
우리가 만든 SingleLayer 클래스와 cross_validate() 함수를 같이 사용하기 위해 SingleLayer 클래스에 여러 기능을 추가해야 하지만,   
그것은 책의 범위를 벗어나므로   
SGDClassifier 클래스와 cross_validate() 함수를 어떻게 사용하는지만 알아보자.

In [None]:
from sklearn.model_selection import cross_validate
sgd = SGDClassifier(loss='log', penalty='l2', alpha=0.001, random_state=42)
scores = cross_validate(sgd, train_data_all, train_target_all, cv=10) # cv 파라미터에 교차 검증을 수행할 폴드의 개수를 넣는다.
print(np.mean(scores['test_score']))  # 'test_score'는 검증 점수

0.850096618357488


표준화 전처리 과정을 거치지 않아 교차 검증 점수가 낮다.   
표준화 전처리를 해보자.

###**Pipeline 클래스 이용하기**

검증 폴드가 전처리 단계에서 누설되지 않도록 전처리 단계와 모델 클래스를 하나로 연결해주는 pipeline 클래스를 제공한다.



In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
pipe = make_pipeline(StandardScaler(), sgd)
scores = cross_validate(pipe, train_data_all, train_target_all, cv=10, return_train_score=True)
print(np.mean(scores['test_score']))

0.9694202898550724


In [None]:
print(np.mean(scores['train_score']))

0.9875478561631581
