- 작성자: 김명환 (Kim Myunghwan)
- 작성일: 2025년 7월 9일
- 목적: matplotlib.pyplot에서 한글 폰트가 깨지지 않도록 설정한 템플릿
- github: https://github.com/c0z0c/jupyter_hangul
- 환경: Jupyter Notebook 및 Google Colab 겸용
- 자유롭게 편집해서 사용하세요

**한글 폰트 설정 셀**

In [7]:
# coalb 에서는 두번 실행 해야 합니다.
# 첫 번째 실행 - 폰트 설치 후 자동 재시작
# 두번째 실행 - 폰트 설치 완료
# https://github.com/c0z0c/jupyter_hangul
# 코딩 중간에 한글 깨진다 싶으면 helper.setup() 다시 호출 해줘도 됩니다.

from urllib.request import urlretrieve
urlretrieve("https://raw.githubusercontent.com/c0z0c/jupyter_hangul/master/helper_c0z0c_dev.py", "helper_c0z0c_dev.py")
import helper_c0z0c_dev as helper
print("📁 helper 모듈을 로드했습니다.")
helper.setup()

📁 helper 모듈을 로드했습니다.
🚀 Jupyter/Colab 한글 환경 설정 중... (helper v2.3.0)
✅ matplotlib 한글 폰트 설정 완료
✅ 한글 폰트 및 pandas 확장 기능 설정 완료
🎉 사용 가능: 한글 폰트, CSV 읽기, DataFrame.head_att(), 캐시 기능


# >기본< 라이브리 로드

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.datasets import (
    fetch_california_housing, load_iris, make_moons, make_circles,
    load_breast_cancer, load_wine
)
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier, plot_tree
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, mean_squared_error

# --- 다중공선성 진단용 (선택적) ---
try:
    from statsmodels.stats.outliers_influence import variance_inflation_factor
    print("statsmodels 라이브러리 로드 완료")
except Exception:
    print("statsmodels 라이브러리가 설치되어 있지 않습니다. 설치: !pip install statsmodels")

# --- 표준 라이브러리 및 유틸리티 ---
import io
import os
import gc
import json
import time
import random
import math
from datetime import datetime, timedelta
import pytz  # 시간대 처리

# --- 이미지 처리 ---
from PIL import Image, ImageDraw, ImageFilter
import cv2  # 필요시 설치: !pip install opencv-python

# --- 데이터/시각화 ---
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.colors import ListedColormap
from IPython.display import display, Markdown
from tqdm.notebook import tqdm

# --- PyTorch 관련 ---
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torchvision import transforms as T
from torchvision.transforms import functional as TF
from torchvision.models import vgg16, VGG16_Weights
from torch.utils.data import Dataset, DataLoader, random_split

# --- 선택 패키지(주석) ---
# !pip install pytorch-msssim

# --- 시드 고정 ---
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# --- 디바이스 및 전역 설정 ---
__kst = pytz.timezone('Asia/Seoul')
global __kst
__device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
global __device
print('Device:', __device)

__model_class_list = []
global __model_class_list

DEBUG_ON = False
global DEBUG_ON
# ...existing code...

statsmodels 라이브러리 로드 완료
Device: cpu


# >기본< 모델 저장 및 로드

In [9]:
def safe_makedirs(path):
    if os.path.exists(path) and not os.path.isdir(path):
        os.remove(path)  # 파일이면 삭제
    os.makedirs(path, exist_ok=True)
    
def tensor_to_jsonable(obj):
    if isinstance(obj, torch.Tensor):
        return obj.item() if obj.numel() == 1 else obj.tolist()
    if isinstance(obj, dict):
        return {k: tensor_to_jsonable(v) for k, v in obj.items()}
    if isinstance(obj, (list, tuple)):
        return [tensor_to_jsonable(v) for v in obj]
    return obj   

def save_model_dict(model, path, pth_name, kwargs=None):
    """
    모델 state_dict와 추가 정보를 지정한 디렉토리(path)에 name.pth 파일로 저장
    - model: 저장할 모델 객체
    - path: 저장할 디렉토리 경로 (예: './save')
    - name: 저장할 파일 이름 (확장자 없이, 예: 'model1')
    - kwargs: dict 또는 JSON 문자열(딕셔너리로 변환 가능한 객체)
    """
    import torch
    import json
    import os

    # 디렉토리 생성
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True)

    # 모델 구조 정보 추출
    model_info = {
        'class_name': model.__class__.__name__,
        'init_args': {},  # 생성자 인자(가능하면)
        'str': str(model),
        'repr': repr(model),
        'modules': [m.__class__.__name__ for m in model.modules()],
    }
    # 생성자 인자 자동 추출(가능한 경우)
    if hasattr(model, '__dict__'):
        for key in ['in_ch', 'base_ch', 'num_classes', 'out_ch']:
            if hasattr(model, key):
                model_info['init_args'][key] = getattr(model, key)

    # kwargs 처리
    extra_info = {}
    if kwargs is not None:
        if isinstance(kwargs, str):
            extra_info = json.loads(kwargs)
        elif isinstance(kwargs, dict):
            extra_info = kwargs
        else:
            raise ValueError("kwargs는 dict 또는 JSON 문자열이어야 합니다.")

    # model_info에 kwargs 내용 추가
    model_info.update(extra_info)

    # 저장할 dict 구성
    save_dict = {
        'model_state': model.state_dict(),
        'class_name': model.__class__.__name__,
        'model_info': model_info,
    }

    save_path = os.path.join(path, f"{pth_name}.pth")
    torch.save(save_dict, save_path)
    return save_path

def load_model_dict(path, pth_name, model_class=None):
    """
    save_model_dict로 저장한 모델을 로드하는 함수
    - path: 저장된 디렉토리 경로 (예: './save')
    - pth_name: 저장된 파일 이름 (확장자 없이, 예: 'model1')
    - model_class: 모델 클래스 (None이면 모델 인스턴스는 반환하지 않음)
    반환값: (model, class_name, model_info)
    """
    import torch
    import os

    pth_path = os.path.join(path, f"{pth_name}.pth")
    checkpoint = torch.load(pth_path, map_location=__device)
    class_name = checkpoint.get('class_name', None)
    model_info = checkpoint.get('model_info', {})

    model = None
    if model_class is not None:
        try:
            model = model_class().to(__device)
            model.load_state_dict(checkpoint['model_state'])
        except Exception as e:
            print(f"모델 state_dict 로드 실패: {e}")
            model = None

    return model, class_name, model_info

def print_model_recovery_guide_from_info(model_info):
    """
    model_info(dict)를 받아서 복구용 클래스 정의 가이드를 출력합니다.
    """
    class_name = model_info.get('class_name', 'UnknownModel')
    init_args = model_info.get('init_args', {})
    model_str = model_info.get('str', '')

    print("="*80)
    print(f"[모델 복구 가이드] class_name: {class_name}")
    print("="*80)

    print(f"\n[1] 복사 붙여넣기용 클래스 정의:")
    print("# " + "="*70)
    print("# 아래 코드를 그대로 복사해서 붙여넣으세요")
    print("# " + "="*70)

    print(f"\nclass {class_name}(nn.Module):")
    print(f"    def __init__(self, ", end="")
    if init_args:
        args_list = [f"{k}={repr(v)}" for k, v in init_args.items()]
        print(", ".join(args_list) + "):")
    else:
        print("in_ch=1, base_ch=64):  # ← 실제 값으로 수정 필요")
    print("        super().__init__()")

    if model_str:
        print("        # 아래는 저장된 구조를 기반으로 한 추정 코드입니다")
        print("        # 필요시 수정하세요")
        import re
        # Conv2d
        conv_matches = re.findall(r'Conv2d\((\d+), (\d+), kernel_size=\((\d+), (\d+)\)', model_str)
        if conv_matches:
            print("        # Convolution layers:")
            for i, (in_ch, out_ch, k1, k2) in enumerate(conv_matches[:5]):
                print(f"        self.conv{i+1} = nn.Conv2d({in_ch}, {out_ch}, {k1})")
        # Linear
        linear_matches = re.findall(r'Linear\(in_features=(\d+), out_features=(\d+)', model_str)
        if linear_matches:
            print("        # Linear layers:")
            for i, (in_feat, out_feat) in enumerate(linear_matches[:3]):
                print(f"        self.fc{i+1} = nn.Linear({in_feat}, {out_feat})")
        # BatchNorm
        bn_matches = re.findall(r'BatchNorm2d\((\d+)', model_str)
        if bn_matches:
            print("        # BatchNorm layers:")
            for i, (num_feat,) in enumerate(bn_matches[:3]):
                print(f"        self.bn{i+1} = nn.BatchNorm2d({num_feat})")

    print("\n    def forward(self, x):")
    print("        # TODO: forward 로직 구현")
    print("        # 아래 구조를 참고하여 구현하세요:")
    if model_str:
        if "Conv2d" in model_str and "ReLU" in model_str:
            print("        # x = F.relu(self.conv1(x))")
            print("        # x = F.relu(self.conv2(x))")
        if "Linear" in model_str:
            print("        # x = x.view(x.size(0), -1)  # flatten")
            print("        # x = self.fc1(x)")
        if "Sigmoid" in model_str:
            print("        # x = torch.sigmoid(x)")
        elif "Softmax" in model_str:
            print("        # x = F.softmax(x, dim=1)")
    print("        return x")

    print("\n# 모델 클래스를 __model_class_list에 등록")
    print(f"__model_class_list.append({class_name})")

    print("\n# " + "="*70)
    print("# 여기까지 복사 붙여넣기")
    print("# " + "="*70)
    print("\n[2] 사용자 설정 가이드:")
    print("# " + "-"*70)
    print("# 아래 값들을 실제 환경에 맞게 수정하세요")
    print("# " + "-"*70)
    print("\n# 모델 인스턴스 생성")
    if init_args:
        args_str = ', '.join([f"{k}={repr(v)}" for k, v in init_args.items()])
        print(f"model = {class_name}({args_str}).to(__device)")
    else:
        print(f"model = {class_name}(")
        print("    in_ch=1,     # ← 입력 채널 수 (1=그레이스케일, 3=RGB)")
        print("    base_ch=64   # ← 기본 채널 수 (메모리에 따라 32, 64, 128 등)")
        print(").to(__device)")
    print("="*80)
    print("복구 가이드 완료! 위 코드를 복사해서 새로운 셀에 붙여넣고 실행하세요.")


# >기본< 모델링 함수

In [None]:
# 훈련 함수
def train_fn(model, train_loader, criterion, optimizer, device, epoch=None, epochs=None):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    res = {}
    desc = f"Train [{epoch+1}/{epochs}]" if epoch is not None and epochs is not None else "Train"
    tqdm_kwargs = {} 
    if DEBUG_ON:
        tqdm_kwargs['disable'] = False
        tqdm_kwargs['mininterval'] = 1
    else:
        tqdm_kwargs['disable'] = True
        tqdm_kwargs['mininterval'] = 3

    pbar = tqdm(train_loader, leave=True, desc=desc, position=0, **tqdm_kwargs)
    
    # tqdm_kwargs = {
    #     'disable': False,  # 항상 표시
    #     'mininterval': 0.5,
    #     'leave': True,
    #     'desc': desc,
    #     'position': 1
    # }
    # pbar = tqdm(train_loader, **tqdm_kwargs)
        
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        batch_size = labels.size(0)
        # criterion이 기본(reduction='mean')이라 가정하여 샘플 기준 누적
        running_loss += loss.item() * batch_size
        _, predicted = torch.max(outputs, 1)
        total += batch_size
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / total if total > 0 else 0.0
    epoch_acc = correct / total if total > 0 else 0.0
    res['loss'] = epoch_loss
    res['acc'] = epoch_acc
    return res

# 검증 함수
def evaluate_fn(model, val_loader, criterion, device, epoch, epochs):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    res = {}
    desc = f"Val   [{epoch+1}/{epochs}]"

    tqdm_kwargs = {} 
    if DEBUG_ON:
        tqdm_kwargs['disable'] = False
        tqdm_kwargs['mininterval'] = 1
    else:
        tqdm_kwargs['disable'] = True
        tqdm_kwargs['mininterval'] = 3


    with torch.no_grad():
        pbar = tqdm(val_loader, leave=True, desc=desc, position=0, **tqdm_kwargs)
        
        # tqdm_kwargs = {
        #     'disable': False,  # 항상 표시
        #     'mininterval': 0.5,
        #     'leave': True,
        #     'desc': desc,
        #     'position': 1
        # }
        # pbar = tqdm(val_loader, **tqdm_kwargs)        
        for images, labels in pbar:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            batch_size = labels.size(0)
            running_loss += loss.item() * batch_size
            _, predicted = torch.max(outputs, 1)
            total += batch_size
            correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / total if total > 0 else 0.0
    epoch_acc = correct / total if total > 0 else 0.0
    res['loss'] = epoch_loss
    res['acc'] = epoch_acc
    return res

# 모델링 함수 (훈련 + 검증)
def modeling_fn(model, epochs, train_loader, val_loader, criterion, optimizer, scheduler, device):
    history = []
    best_val_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict())

    start_time = time.time()
    epoch_start_time = None

    estimated_completion_str = None
    
    tqdm_kwargs = {} 
    if DEBUG_ON:
        tqdm_kwargs['disable'] = False
        tqdm_kwargs['mininterval'] = 2
    else:
        tqdm_kwargs['disable'] = True
        tqdm_kwargs['mininterval'] = 6

    epoch = 0
    desc = f"Epoch [{epoch+1}/{epochs}]"
    pbar = tqdm(range(epochs), leave=True, desc=desc, position=0, **tqdm_kwargs)
    
    for epoch in pbar:
        print()
        
        epoch_start_time = time.time()
        pbar.set_description(f"Epoch [{epoch+1}/{epochs}]")
        
        train_res = train_fn(model, train_loader, criterion, optimizer, device, epoch=epoch, epochs=epochs)
        val_res = evaluate_fn(model, val_loader, criterion, device, epoch=epoch, epochs=epochs)
        if estimated_completion_str is not None:
            pbar.set_postfix_str("종료 " + estimated_completion_str)
            
        # 스케줄러 업데이트
        if scheduler:
            scheduler.step()

        res = {
            "epoch": epoch,
            "epochs": epochs,
            "train": train_res,
            "val": val_res,
        }

        history.append(res)

        # 최고 성능 모델 저장
        if val_res['acc'] > best_val_acc:
            best_val_acc = val_res['acc']
            best_model_wts = copy.deepcopy(model.state_dict())

        # 에포크 완료 시간 계산
        epoch_elapsed = time.time() - epoch_start_time

        # 첫 번째 에포크 완료 후 예상 완료 시간 계산
        if epoch == 0 and epochs > 1:
            remaining_epochs = epochs - 1
            estimated_total_time = epoch_elapsed * epochs
            estimated_completion = datetime.now(__kst) + timedelta(seconds=epoch_elapsed * remaining_epochs)
            estimated_completion_str = estimated_completion.strftime('%Y-%m-%d %H:%M:%S KST')
            pbar.set_postfix_str("종료 " + estimated_completion_str)
            # print(f"첫 에포크 완료 - 예상 완료 시간: {estimated_completion_str}")
            
        
        # print(f"Epoch {epoch+1}/{epochs} - "
        #       f"Train Loss: {train_res['loss']:.4f}, "
        #       f"Train Acc: {train_res['acc']:.4f} | "
        #       f"Val Loss: {val_res['loss']:.4f}, "
        #       f"Val Acc: {val_res['acc']:.4f} "
        #       f"({epoch_elapsed:.1f}s)")

    # 전체 학습 완료 시간
    total_elapsed = time.time() - start_time
    completion_time = datetime.now(__kst)

    # 최고 성능 모델 가중치 로드
    model.load_state_dict(best_model_wts)
    # print(f'\n최고 검증 정확도: {best_val_acc:.4f}')
    # print(f'전체 학습 시간: {total_elapsed:.1f}초 ({total_elapsed/60:.1f}분)')
    # print(f'학습 완료 시간: {completion_time.strftime("%Y-%m-%d %H:%M:%S KST")}')

    return model, history, completion_time, total_elapsed

# 테스트 함수
def test_model(model, test_loader, device):
    """모델 테스트"""
    model.eval()
    correct = 0
    total = 0
    class_correct = list(0. for i in range(10))
    class_total = list(0. for i in range(10))

    with torch.no_grad():
        pbar = tqdm(test_loader, leave=True, desc="Testing", position=0)
        for images, labels in pbar:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # 클래스별 정확도 계산
            c = (predicted == labels).squeeze()
            for i in range(labels.size(0)):
                label = labels[i]
                class_correct[label] += c[i].item()
                class_total[label] += 1

    accuracy = 100 * correct / total
    # print(f'전체 테스트 정확도: {accuracy:.2f}%')

    # # 클래스별 정확도 출력
    # print('\n클래스별 정확도:')
    # for i in range(10):
    #     if class_total[i] > 0:
    #         print(f'{classes[i]:>8}: {100 * class_correct[i] / class_total[i]:.1f}%')

    return accuracy, class_total, classes, class_correct
