# Tensor

numpy의 ndarray 처럼 여러 데이터를 동시에 다루며 PyTorch의 autograd 등 머신러닝에 도움되는 기능을 가지고 있음

In [None]:
# PyTorch
import torch

"""
torch의 텐서
"""
x = torch.rand(3, 3)
y = torch.ones(3, 3)
print(x)
print(x.T)
print(y)
print(x + y)
print(x @ y)

In [None]:
"""
역전파: backward
"""
x = torch.tensor([2.0, 3.0], requires_grad=True)
y = x ** 2 + 3 * x + 1
z = y.sum()
# back propagation
z.backward()
# 역전파된 기울기는 .grad에서 확인 가능
print(x.grad)


## 실습 준비

장치 설정 및 난수 고정

In [None]:
# 장치 관리
device = 'cpu'
# windows - cuda
if torch.cuda.is_available():
    device = torch.device('cuda')
# apple silicon - mps
if torch.mps.is_available():
    device = torch.device('mps')
print(device)

# x.to(device)

In [None]:
# 난수 고정
import numpy as np
import random
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

# windows - cuda
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# apple silicon - mps
if torch.mps.is_available():
    torch.mps.manual_seed(SEED)
    torch.use_deterministic_algorithms(True, warn_only=True)

# PyTorch로 ML

## 선형 회귀 모델 만들기

In [None]:
"""
선형 회귀 모델을 PyTorch로 구현
"""
from torch import nn
from torch import optim

# 더미 데이터 생성
X = torch.linspace(0, 10, 100).unsqueeze(1)
y_true = 2 * X + 1 + 0.5 * torch.randn_like(X)

# 1차원 입력, 1차원 출력의 선형 모델을 만들것이다
model = nn.Linear(1, 1)
# 비용 함수는 MSE를 쓸거다
criterion = nn.MSELoss()
# SGD로 파라미러를 갱신할거다
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 학습 시작
epochs = 200
for epoch in range(epochs):
    # 기울기 초기화
    optimizer.zero_grad()

    # 예측 진행
    y_pred = model(X)
    # 손실 계산
    loss = criterion(y_pred, y_true)
    # 비용의 미분 계산
    loss.backward()
    # 매개변수 업데이트
    optimizer.step()
    # 20번의 학습마다 기록
    if (epoch + 1) % 20 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

print('학습된 가중치 + 편향')
for name, param in model.named_parameters():
    print(f'{name}: {param.data}')


### 시각화

In [None]:
# 실제 데이터와 회귀선 비교
def show_data():
    X_np = X.squeeze().numpy()
    y_true_np = y_true.squeeze().numpy()
    w_learned = model.weight.item()
    b_learned = model.bias.item()
    print(w_learned, b_learned)
    y_line_np = w_learned * X_np + b_learned

    import matplotlib.pyplot as plt
    plt.figure(figsize=(8, 6))
    plt.scatter(X_np, y_true_np, s=20, color='b', alpha=0.6, label='Synthetic Data')
    plt.plot(X_np, y_line_np, color='red', linewidth=2, label=f'Learned Model')

    plt.title('Dummy Data')
    plt.xlabel('X (Feature)')
    plt.ylabel('y_true (Label)')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.show()
show_data()

## MLP 만들기

In [None]:
"""
입력 - 은닉 - 출력 구조의 MLP 만들기
"""
import torch
from torch import nn

# 어떤 계층을 넣을지를 결정해서 층 구조를 class로 정의
class SimpleMLP(nn.Module):  # __call__
    # 각 층의 뉴런의 갯수는 인스턴스를 만들때 결정
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()

        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    # nn.Module()의 __call__이 호출될 때 자동으로 호출
    def forward(self, x):
        # 각 층을 순차적으로 호출
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x


class SequentialMLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()

        # 각 층을 순차적으로 쌓는다.
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim),
        )

    def forward(self, x):
        return self.layers(x)


model = SimpleMLP(20, 10, 1)
# 20개의 feature를 가진 40개의 데이터
X = torch.rand(size=(40, 20))
# 모델을 이용해 예측하기
y_pred = model(X)
# 40개의 데이터에 대한 예측 출력 1개씩 40줄
print(y_pred.size())
y_pred



# 숫자 판독기 만들기

## 데이터 준비

### 데이터 불러오기

In [None]:
import matplotlib.pyplot as plt
from sklearn.datasets import load_digits

digits = load_digits()
X = digits.data.astype(np.float32)
y = digits.target.astype(np.int64)

fig, ax = plt.subplots(figsize=(13, 5), nrows=2, ncols=5)
for i in range(10):
    ax[i // 5, i % 5].imshow(X[i].reshape(8, 8), cmap="gray")
    ax[i // 5, i % 5].set_title(f"Label: {y[i]}")
    ax[i // 5, i % 5].axis("off")
fig.tight_layout()

### 학습 / 검증 데이터 분리 및 표준화

In [None]:
# TODO
# 1. 데이터를 분할하고 표준화해줍니다.
# 이번에는 검증데이터 (X_valid)과 테스트데이터 (X_test)까지 만들어줍시다.
# X_test는 실제로 정답이 없는 처음 보는 데이터로 최종 성능을 확인해야할 데이터,
# X_val은 중간중간 모델의 검증결과를 확인하기 위한 데이터입니다.
# 학습데이터는 전체의 80%, 나머지 검증과 테스트데이터는 각각 10%씩 할당해주세요.
# 난수는 위에서 정의된 `SEED`를 활용해주세요.
# stratify 하는것도 잊지 마시구요.
from sklearn.model_selection import train_test_split
# 우선 8:2로 나누고
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.2, random_state=SEED, stratify=y
)

# 남은 2를 1:1로 나누자
X_valid, X_test, y_valid, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=SEED, stratify=y_temp
)

# 표준화
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_valid = scaler.transform(X_valid)
X_test  = scaler.transform(X_test)

### Dataset과 DataLoader

In [None]:
# TODO
# 2. PyTorch Dataset을 만들어줍니다
# batch 처리방식을 기억하시나요?
# 매번 X에서 인덱싱을 통해 데이터를 배치로 뽑아냈습니다.
# 여간 귀찮은게 아니었는데요, 이를 간편하게 만들어주는 방식이 PyTorch에 존재합니다.

# TODO:
# 모든 데이터를 텐서로 변환해주세요
X_train_t = torch.from_numpy(X_train)
y_train_t = torch.from_numpy(y_train)
X_valid_t = torch.from_numpy(X_valid)
y_valid_t = torch.from_numpy(y_valid)
X_test_t  = torch.from_numpy(X_test)
y_test_t  = torch.from_numpy(y_test)

# TODO:
# Pytorch TensorDataset에 넣어줍시다.
# https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.TensorDataset

# 하나의 데이터의 기준
# - TensorDataset: index를 기준으로 하나의 샘플 데이터를 구분
from torch.utils.data import TensorDataset
train_ds = TensorDataset(X_train_t, y_train_t)
valid_ds = TensorDataset(X_valid_t, y_valid_t)
test_ds  = TensorDataset(X_test_t, y_test_t)

# TODO:
# 위에서 만든 TensorDataset을 DataLoader로 변환시켜줍니다.
# 둘의 차이를 간단하게 설명하자면
#   - Dataset: 데이터 하나를 어떻게 읽어올지 정의
#   - DataLoader: 배치처리를 어떻게할지 정의
# 따라서 DataLoader는 batch_size를 정의해줘야합니다.
# 아래 설젛애둔 BATCH_SIZE를 활용해주세요.
from torch.utils.data import DataLoader
BATCH_SIZE = 32

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
for idx, batch in enumerate(train_loader):
    print(idx, batch)
    print(len(batch[1]))
    break

## MLP 구현

In [None]:
# TODO
# 3. Multi-Layer Perceptron 구현하기
# 가장 기본적인 MultiLayer Perceptron을 구현해주세요.
# 계층은 총 3개입니다.
# 입력표현 -> 은닉표현1 -> 은닉표현2 -> 출력표현
# 입력값들은 `nn.Linear`를 통과한 뒤 ReLU를 거치고 Dropout을 거치도록 설계해주세요.
# 제가 정의해준 `__init__` 안의 인자에 맞춰서 작업해주세요 :)

class MLP(nn.Module):
    def __init__(self,
                 input_dim: int,
                 num_classes: int,
                 hidden_dims=(128, 64),
                 dropout=0.2):
        super().__init__()
        h1, h2 = hidden_dims

        self.net = nn.Sequential(
            # 은닉1
            nn.Linear(input_dim, h1),
            # 활성화 + Dropout
            nn.ReLU(),
            nn.Dropout(p=dropout),
            # 은닉2 (입력 차원은 은닉1의 출력차원)
            nn.Linear(h1, h2),
            # 활성화 + Dropout
            nn.ReLU(),
            nn.Dropout(p=dropout),
            # 출력
            nn.Linear(h2, num_classes),
        )

    def forward(self, x):
        return self.net(x)

## 학습 및 테스트

In [None]:
# 정답코드
# 4. DataLoader, Model, Optimizer가 주어졌을 때
# 학습데이터에 대해 model을 업데이트하는 코드를 작성해주세요.
def train_one_epoch(model, loader, optimizer, device):
    # 모델이 학습모드로 들어가게 설정해야합니다.
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for xb, yb in loader:
        # 데이터를 가속화기기로 보내주세요.
        xb, yb = xb.to(device), yb.to(device)

        # 위에서 배운 파라미터 업데이트 방식을 구현해주세요.
        # 1. 모델의 출력 만들어주기
        # 2. 미분값 지워주기 (zero_grad)
        # 3. Loss 계산해주기: loss는 cross_entropy를 사용해주세요
        #   - https://docs.pytorch.org/docs/stable/generated/torch.nn.functional.cross_entropy.html
        # 4. 3에서 계산한 Loss를 기준으로 역전파를 수행합니다.
        # 5. 파라미터 업데이트를 진행해주세요
        optimizer.zero_grad()
        logits = model(xb)
        loss = torch.nn.functional.cross_entropy(logits, yb)
        loss.backward()
        optimizer.step()

        # 파라미터 업데이트가 끝났으면 몇 가지를 기록해줍니다.
        # 1. batch당 로스 계산 (running_loss에 업데이트)
        # 2. 예측된 클래스 계산 (`preds`)
        # 3. batch 내 정확도 계산 (batch 내에서 정답개수를 correct에 추가)
        running_loss += loss.item() * xb.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == yb).sum().item()
        total += yb.size(0)

    # 위에서 잘 계산된 값들을 정리해서 반환합니다.
    avg_loss = running_loss / total
    acc = correct / total
    return avg_loss, acc

# 정답코드
# 5. 이번에는 모델과 검증/테스트데이터가 입력으로 들어왔을 때 정확도를 계산해주는 코드를 작성해주세요.
def evaluate(model, loader, device):
    # 모델이 추론할 때는 추론모드로 설정해주어야합니다.
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_targets = []

    # 미분그래프를 생성하지 않도록 컨텍스트를 생성해주셔야합니다.
    with torch.no_grad():
        for xb, yb in loader:
            # 데이터로더에서 나온 배치를 `device`로 보내주세요.
            xb, yb = xb.to(device), yb.to(device)

            # 위에서 한 것과 비슷하게
            # 모델의 출력값을 만들어 `logits`에 담고
            # 현재 출력값 `logits`와 `yb` 사이의 Cross Entropy를 계산해주세요.
            logits = model(xb)
            loss = torch.nn.functional.cross_entropy(logits, yb)

            # 이번에도 다음을 기록해줍니다.
            # 1. batch당 로스 계산 (running_loss에 update)
            # 2. 예측된 클래스 계산 (`preds`)
            # 3. batch 내 정확도 계산 (batch 내에서 정답개수를 correct에 추가)
            # 4. 후에 정확한 분류성능을 보기 위해 prediction 값들도 전부 기록해줍니다.
            #  - all_preds, all_targets에 append해줍니다.
            running_loss += loss.item() * xb.size(0)
            preds = logits.argmax(dim=1)
            correct += (preds == yb).sum().item()
            total += yb.size(0)

            all_preds.append(preds.cpu())
            all_targets.append(yb.cpu())

    # 위에서 잘 계산된 값들을 정리해서 반환합니다.
    avg_loss = running_loss / total
    acc = correct / total
    preds_cat = torch.cat(all_preds).numpy()
    targets_cat = torch.cat(all_targets).numpy()
    return avg_loss, acc, preds_cat, targets_cat

In [None]:
# 모든 것을 하나로!
# 여러분이 코드를 잘 짰다면 아래 코드가 잘 작동할겁니다!
# 에러가 뜬다면 에러메시지를 확인해서 잘 구현되도록 맞춰주세요.
import os
import math
import time

from sklearn.metrics import classification_report, confusion_matrix

# device = torch.device("cuda")
model = MLP(
    input_dim=64,
    num_classes=10,
    hidden_dims=(128, 64),
    dropout=0.2
).to(device)

# 이번에는 지난 과제2에서 배웠던 Adam을 사용해봅시다.
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

best_val_loss = math.inf
best_val_acc = -1.0
epochs_no_improve = 0

checkpoint_path = "model.ckpt"
earlystop_patience = 5

train_losses, train_accs = [], []
valid_losses, valid_accs = [], []
for epoch in range(1, 201):
    t0 = time.time()
    # 여러분이 정의한 `train_one_epoch`와 `evaluate`입니다.
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, device)
    train_losses.append(train_loss)
    train_accs.append(train_acc)

    val_loss, val_acc, _, _ = evaluate(model, valid_loader, device)
    valid_losses.append(val_loss)
    valid_accs.append(val_acc)

    dt = time.time() - t0
    print(
        f"Epoch {epoch:03d} | "
        f"train loss {train_loss:.4f} acc {train_acc*100:5.2f}% | "
        f"val loss {val_loss:.4f} acc {val_acc*100:5.2f}% | "
        f"{dt:.1f}s"
    )

    # 🌟 개선 시 체크포인트 저장
    improved = (val_loss < best_val_loss) or (val_acc > best_val_acc)
    if improved:
        best_val_loss = min(best_val_loss, val_loss)
        best_val_acc = max(best_val_acc, val_acc)
        torch.save({
            'model_state_dict': model.state_dict(),
            'input_dim': 64,
            'num_classes': 10,
        }, checkpoint_path)
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1

    # 조기 종료
    if epochs_no_improve >= earlystop_patience:
        print(f"Early stopping at epoch {epoch} (no improve {earlystop_patience})")
        break

# 베스트 모델 로드 후 테스트 평가
if os.path.exists(checkpoint_path):
    ckpt = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(ckpt['model_state_dict'])
    print(f"Loaded best checkpoint: val_best_acc={best_val_acc*100:.2f}% val_best_loss={best_val_loss:.4f}")

test_loss, test_acc, y_pred, y_true = evaluate(model, test_loader, device)
print("\n==== Test Result ====")
print(f"Test loss: {test_loss:.4f}")
print(f"Test acc : {test_acc*100:.2f}%")

# 분류 리포트 / 혼동행렬
print("\nClassification report:")
print(classification_report(y_true, y_pred, digits=4))

print("Confusion matrix:")
print(confusion_matrix(y_true, y_pred))