# [요구사항 1] 

In [None]:
import os
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, random_split
from datetime import datetime
import wandb
import argparse
from sklearn.preprocessing import LabelEncoder
from pathlib import Path

pd.set_option("display.width", None)
pd.set_option("display.max_columns", None)

class TitanicDataset(Dataset):
  def __init__(self, X, y):
    self.X = torch.FloatTensor(X)
    self.y = torch.LongTensor(y)

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    feature = self.X[idx]
    target = self.y[idx]
    return {'input': feature, 'target': target}

  def __str__(self):
    str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
      len(self.X), self.X.shape, self.y.shape
    )
    return str


class TitanicTestDataset(Dataset):
  def __init__(self, X):
    self.X = torch.FloatTensor(X)

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    feature = self.X[idx]
    return {'input': feature}

  def __str__(self):
    str = "Data Size: {0}, Input Shape: {1}".format(
      len(self.X), self.X.shape
    )
    return str

__init__: 입력 데이터(특징 X, 타겟 y)를 파이토치가 사용하는 데이터 형식인 텐서(Tensor)로 변환합니다. 테스트 데이터셋은 y를 예측하는 것이 목표이므로 X만 가집니다.

__len__: 데이터셋에 있는 총 샘플 수를 반환합니다.

__getitem__: 인덱스(예: dataset[10])를 사용해 하나의 데이터 샘플(학습용은 특징과 타겟, 테스트용은 특징만)을 가져올 수 있게 합니다.

__str__: print() 함수로 출력될 때 데이터셋 크기와 형태에 대한 간단한 문자열 설명을 제공합니다.

In [None]:
def get_preprocessed_dataset():
    # 스크립트가 실행되는 현재 파일 경로를 기준으로 CSV 파일 경로 설정
    try:
        CURRENT_FILE_PATH = os.path.dirname(os.path.abspath(__file__))
    except NameError:
        # __file__이 정의되지 않은 환경(예: Jupyter)에서는 현재 작업 디렉터리를 사용
        CURRENT_FILE_PATH = os.getcwd()

    train_data_path = os.path.join(CURRENT_FILE_PATH, "train.csv")
    test_data_path = os.path.join(CURRENT_FILE_PATH, "test.csv")

    train_df = pd.read_csv(train_data_path)
    test_df = pd.read_csv(test_data_path)

    all_df = pd.concat([train_df, test_df], sort=False)

    all_df = get_preprocessed_dataset_1(all_df)
    all_df = get_preprocessed_dataset_2(all_df)
    all_df = get_preprocessed_dataset_3(all_df)
    all_df = get_preprocessed_dataset_4(all_df)
    all_df = get_preprocessed_dataset_5(all_df)
    all_df = get_preprocessed_dataset_6(all_df)

    print("--- Preprocessed DataFrame Columns ---")
    print(all_df.columns)
    print("--- Preprocessed DataFrame Head ---")
    print(all_df.head(5))

    train_X = all_df[~all_df["Survived"].isnull()].drop("Survived", axis=1).reset_index(drop=True)
    train_y = train_df["Survived"]

    test_X = all_df[all_df["Survived"].isnull()].drop("Survived", axis=1).reset_index(drop=True)

    print(f"\nInput Features ({len(train_X.columns)}): {train_X.columns.tolist()}")

    dataset = TitanicDataset(train_X.values, train_y.values)
    print("--- Full Train Dataset ---")
    print(dataset)

    train_dataset, validation_dataset = random_split(dataset, [0.8, 0.2])
    test_dataset = TitanicTestDataset(test_X.values)

    return train_dataset, validation_dataset, test_dataset


CSV 찾기 & 로딩: 판다스를 이용해 train.csv와 test.csv 파일을 찾아 읽습니다.

결합: 학습 데이터와 테스트 데이터를 하나로 합칩니다. 이렇게 하면 전처리 단계(결측치 채우기, 범주 인코딩 등)를 양쪽 데이터에 일관되게 적용할 수 있습니다.

전처리: 보조 함수들(_1부터 _6까지)을 순서대로 호출하여 데이터를 정제하고 변환합니다.

분리: 결합했던 데이터를 다시 학습용 특징(train_X), 학습용 레이블(train_y), 테스트용 특징(test_X)으로 나눕니다.

Dataset 생성: 사용자 정의 클래스(TitanicDataset, TitanicTestDataset)를 사용해 처리된 데이터를 감쌉니다.

학습/검증 분할: 학습 데이터를 모델 훈련에 사용할 더 큰 세트와 훈련 중 성능 검증에 사용할 더 작은 세트로 나눕니다 (80/20 비율).

반환: 파이토치의 DataLoader에서 바로 사용할 수 있는 최종 Dataset 객체들을 반환합니다.

In [None]:
def get_preprocessed_dataset_1(all_df):
    # Pclass별 Fare (요금) 평균값을 사용하여 Fare 결측치 메우기
    Fare_mean = all_df[["Pclass", "Fare"]].groupby("Pclass").mean().reset_index()
    Fare_mean.columns = ["Pclass", "Fare_mean"]
    all_df = pd.merge(all_df, Fare_mean, on="Pclass", how="left")
    all_df.loc[(all_df["Fare"].isnull()), "Fare"] = all_df["Fare_mean"]
    all_df = all_df.drop(columns=["Fare_mean"])
    return all_df


def get_preprocessed_dataset_2(all_df):
    # name을 세 개의 컬럼으로 분리하여 다시 all_df에 합침
    name_df = all_df["Name"].str.split("[,.]", n=2, expand=True)
    name_df.columns = ["family_name", "title", "name"]
    name_df["family_name"] = name_df["family_name"].str.strip()
    name_df["title"] = name_df["title"].str.strip()
    name_df["name"] = name_df["name"].str.strip()
    all_df = pd.concat([all_df, name_df], axis=1)
    return all_df


def get_preprocessed_dataset_3(all_df):
    # title별 Age 평균값을 사용하여 Age 결측치 메우기
    title_age_mean = all_df[["title", "Age"]].groupby("title").median().round().reset_index()
    title_age_mean.columns = ["title", "title_age_mean", ]
    all_df = pd.merge(all_df, title_age_mean, on="title", how="left")
    all_df.loc[(all_df["Age"].isnull()), "Age"] = all_df["title_age_mean"]
    all_df = all_df.drop(["title_age_mean"], axis=1)
    return all_df


def get_preprocessed_dataset_4(all_df):
    # 가족수(family_num) 컬럼 새롭게 추가
    all_df["family_num"] = all_df["Parch"] + all_df["SibSp"]
    # 혼자탑승(alone) 컬럼 새롭게 추가
    all_df.loc[all_df["family_num"] == 0, "alone"] = 1
    all_df["alone"].fillna(0, inplace=True)
    # 학습에 불필요한 컬럼 제거
    all_df = all_df.drop(["PassengerId", "Name", "family_name", "name", "Ticket", "Cabin"], axis=1)
    return all_df


def get_preprocessed_dataset_5(all_df):
    # title 값 개수 줄이기
    all_df.loc[
    ~(
            (all_df["title"] == "Mr") |
            (all_df["title"] == "Miss") |
            (all_df["title"] == "Mrs") |
            (all_df["title"] == "Master")
    ),
    "title"
    ] = "other"
    all_df["Embarked"].fillna("missing", inplace=True)
    return all_df


def get_preprocessed_dataset_6(all_df):
    # 카테고리 변수를 LabelEncoder를 사용하여 수치값으로 변경하기
    category_features = all_df.columns[all_df.dtypes == "object"]
    for category_feature in category_features:
        le = LabelEncoder()
        if all_df[category_feature].dtypes == "object":
          le = le.fit(all_df[category_feature])
          all_df[category_feature] = le.transform(all_df[category_feature])
    return all_df


_1: 누락된 요금 값을 각 승객 등급의 평균 요금으로 채웁니다. 

_2: Name 열에서 호칭을 추출합니다. 

_3: 누락된 나이값을 추출된 title과 연관된 나이의 중앙값으로 채웁니다. 

_4: 새로운 특징인 family_num과 alone을 만듭니다. 모델링에 불필요하다고 판단되는 열을 제거합니다.

_5: 드문 호칭들을 'other'로 그룹화하여 title 열을 단순화합니다. 누락된 탑승 항구 값을 'missing'이라는 임시 값으로 채웁니다.

_6: 범주형 문자열 열을 Label Encoding을 사용해 숫자 표현으로 변환합니다. 

In [None]:
def get_data():
  # 1번 블록의 전처리 함수 호출 (이 함수는 다른 파일에 정의되어 있다고 가정)
  train_dataset, validation_dataset, test_dataset = get_preprocessed_dataset()

  print(f"\nTrain dataset size: {len(train_dataset)}")
  print(f"Validation dataset size: {len(validation_dataset)}")
  print(f"Test dataset size: {len(test_dataset)}")

  # wandb.config에서 배치 크기를 가져와 DataLoader 생성
  train_data_loader = DataLoader(dataset=train_dataset, batch_size=wandb.config.batch_size, shuffle=True)
  validation_data_loader = DataLoader(dataset=validation_dataset, batch_size=len(validation_dataset))
  test_data_loader = DataLoader(dataset=test_dataset, batch_size=len(test_dataset))

  return train_data_loader, validation_data_loader, test_data_loader

데이터들을 DataLoader로 감싸서 모델 학습 시 데이터를 미니배치 단위로 효율적으로 공급할 수 있도록 합니다.

In [None]:
class MyModel(nn.Module):
  def __init__(self, n_input, n_output):
    super().__init__()

    self.model = nn.Sequential(
      nn.Linear(n_input, wandb.config.n_hidden_unit_list[0]),
      nn.ReLU(),
      nn.Linear(wandb.config.n_hidden_unit_list[0], wandb.config.n_hidden_unit_list[1]),
      nn.ReLU(),
      nn.Linear(wandb.config.n_hidden_unit_list[1], n_output),
    )

  def forward(self, x):
    x = self.model(x)
    return x


def get_model_and_optimizer():
  # 입력 피처 10개 (Pclass, Sex, Age, SibSp, Parch, Fare, Embarked, title, family_num, alone)
  # 출력 클래스 2개 (0: 사망, 1: 생존)
  my_model = MyModel(n_input=10, n_output=2)
  optimizer = optim.SGD(my_model.parameters(), lr=wandb.config.learning_rate)

  return my_model, optimizer

__init__: 모델을 구성하는 층들을 정의합니다. 여기서는 입력층, 2개의 은닉층, 출력층으로 구성된 간단한 다층 퍼셉트론입니다. 각 층의 뉴런 수는 wandb.config에서 가져옵니다.

forward: 입력 데이터(x)가 모델의 층들을 어떤 순서로 통과하여 최종 출력을 만들어내는지 정의합니다

In [None]:
def training_loop(model, optimizer, train_data_loader, validation_data_loader):
  n_epochs = wandb.config.epochs
  loss_fn = nn.CrossEntropyLoss()  # 분류 문제이므로 CrossEntropyLoss 사용
  next_print_epoch = 100

  for epoch in range(1, n_epochs + 1):
    loss_train = 0.0
    num_trains = 0
    correct_train = 0
    total_train = 0

    model.train() # 모델을 학습 모드로 설정
    for batch in train_data_loader:
      # Dataset이 딕셔너리 형태이므로 키로 접근
      input = batch['input']
      target = batch['target']

      output_train = model(input)
      loss = loss_fn(output_train, target)
      loss_train += loss.item()
      num_trains += 1

      # 정확도 계산
      _, predicted = torch.max(output_train.data, 1)
      total_train += target.size(0)
      correct_train += (predicted == target).sum().item()

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    loss_validation = 0.0
    num_validations = 0
    correct_validation = 0
    total_validation = 0

    model.eval() # 모델을 평가 모드로 설정
    with torch.no_grad():
      for batch in validation_data_loader:
        input = batch['input']
        target = batch['target']

        output_validation = model(input)
        loss = loss_fn(output_validation, target)
        loss_validation += loss.item()
        num_validations += 1

        # 정확도 계산
        _, predicted = torch.max(output_validation.data, 1)
        total_validation += target.size(0)
        correct_validation += (predicted == target).sum().item()

    train_accuracy = 100 * correct_train / total_train
    validation_accuracy = 100 * correct_validation / total_validation

    wandb.log({
      "Epoch": epoch,
      "Training loss": loss_train / num_trains,
      "Validation loss": loss_validation / num_validations,
      "Training accuracy": train_accuracy,
      "Validation accuracy": validation_accuracy
    })

    if epoch % next_print_epoch == 0 or epoch == 1:
      print(
        f"Epoch {epoch}, "
        f"Training loss {loss_train / num_trains:.4f}, "
        f"Validation loss {loss_validation / num_validations:.4f}, "
        f"Training Acc {train_accuracy:.2f}%, "
        f"Validation Acc {validation_accuracy:.2f}%"
      )
      if epoch >= next_print_epoch:
          next_print_epoch += 100


* 에포크 반복: 정해진 횟수만큼 전체 데이터셋 학습을 반복합니다.

* 학습 모드: model.train()으로 모델을 학습 상태로 설정합니다.

* 미니배치 학습: train_data_loader에서 데이터를 미니배치 단위로 가져와 다음을 수행합니다.

* 모델 예측 (model(input))

* 손실 계산 (loss_fn)

* 역전파 (loss.backward())

* 가중치 업데이트 (optimizer.step())

* 학습 손실과 정확도 누적 계산

* 평가 모드: model.eval()으로 모델을 평가 상태로 설정합니다 (드롭아웃 등 비활성화).

* 검증: validation_data_loader에서 데이터를 가져와 모델 예측을 수행하고, 검증 손실과 정확도를 계산합니다 (가중치 업데이트는 안 함).

* 로깅: 각 에포크의 학습/검증 손실과 정확도를 wandb에 기록합니다.

* 출력: 주기적으로 학습 진행 상황(손실, 정확도)을 화면에 출력합니다.

In [None]:
def main(args):
  current_time_str = datetime.now().astimezone().strftime('%Y-%m-%d_%H-%M-%S')

  config = {
    'epochs': args.epochs,
    'batch_size': args.batch_size,
    'learning_rate': 1e-3,
    'n_hidden_unit_list': [20, 20], # 은닉층 설정은 그대로 사용
  }

  wandb.init(
    mode="online" if args.wandb else "disabled",
    project="titanic_survival_prediction", # wandb 프로젝트명 변경
    notes="Titanic survival prediction with MLP", # wandb 노트 변경
    tags=["mlp", "titanic"], # wandb 태그 변경
    name=current_time_str,
    config=config
  )
  print("--- wandb arguments ---")
  print(args)
  print("--- wandb config ---")
  print(wandb.config)

  # test_data_loader도 반환되지만, training_loop에서는 사용하지 않음
  train_data_loader, validation_data_loader, test_data_loader = get_data()

  linear_model, optimizer = get_model_and_optimizer()

  print("\n" + "#" * 50)
  print("Start Training...")
  training_loop(
    model=linear_model,
    optimizer=optimizer,
    train_data_loader=train_data_loader,
    validation_data_loader=validation_data_loader
  )
  print("Training Finished.")
  print("#" * 50 + "\n")

  wandb.finish()


설정 로드: wandb 실험 설정(config)을 정의합니다 (에포크 수, 배치 크기 등).

wandb 초기화: 실험 추적을 위해 wandb를 설정하고 시작합니다. 프로젝트 이름, 노트, 태그 등을 지정합니다.

데이터 로딩: get_data() 함수를 호출하여 학습/검증/테스트 데이터 로더를 가져옵니다.

모델/옵티마이저 생성: get_model_and_optimizer() 함수를 호출하여 모델과 옵티마이저를 준비합니다.

학습 시작: training_loop() 함수를 호출하여 모델 학습 및 검증을 시작합니다.

wandb 종료: 실험 기록을 마치고 wandb를 종료합니다.

In [None]:
if __name__ == "__main__":
  parser = argparse.ArgumentParser()

  parser.add_argument(
    "--wandb", action=argparse.BooleanOptionalAction, default=False, help="True or False"
  )

  parser.add_argument(
    "-b", "--batch_size", type=int, default=16, help="Batch size (int, default: 16)" # 기본 배치 사이즈 16으로 변경
  )

  parser.add_argument(
    "-e", "--epochs", type=int, default=1_000, help="Number of training epochs (int, default:1_000)"
  )

  args = parser.parse_args()

  main(args)

터미널에서 실행할 때 --epochs, --batch_size 같은 인자를 받을 수 있게 설정합니다. 받은 인자를 main 함수에 전달하여 실행합니다.

Wandb URL
https://wandb.ai/cyun0407-korea-university-of-technology-and-education/titanic_survival_prediction/runs/i4pqcafr  

[요구사항 2]

Wansb URL 
https://wandb.ai/cyun0407-korea-university-of-technology-and-education/titanic_hyperparameter_tuning?nw=nwusercyun0407

더 나은 성능을 산출하는 Activation Function : ReLU
더 나은 성능을 산출하는 Batch Size : 32

In [None]:
# find_best_hyperparameters.py
import os, json, sys, argparse, copy
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, random_split
from datetime import datetime
import wandb
from sklearn.preprocessing import LabelEncoder
from pathlib import Path

class TitanicDataset(Dataset):
    def __init__(self, X, y): self.X = torch.FloatTensor(X); self.y = torch.LongTensor(y)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return {'input': self.X[idx], 'target': self.y[idx]}
    def __str__(self): return f"Data Size: {len(self.X)}, Input Shape: {self.X.shape}, Target Shape: {self.y.shape}"



파이토치 DataLoader가 사용할 수 있도록 데이터셋의 구조(데이터 로딩, 길이 반환, 특정 항목 접근 방법)를 정의합니다. TitanicDataset은 학습 및 검증 데이터를 다룹니다.

In [None]:

def get_preprocessed_dataset_1(all_df): 
    Fare_mean = all_df[["Pclass", "Fare"]].groupby("Pclass").mean().reset_index(); Fare_mean.columns = ["Pclass", "Fare_mean"]
    all_df = pd.merge(all_df, Fare_mean, on="Pclass", how="left"); all_df.loc[all_df["Fare"].isnull(), "Fare"] = all_df["Fare_mean"]
    return all_df.drop(columns=["Fare_mean"])
def get_preprocessed_dataset_2(all_df): 
    name_df = all_df["Name"].str.split("[,.]", n=2, expand=True)
    if name_df.shape[1] == 3:
        name_df.columns = ["family_name", "title", "name"]; name_df["family_name"] = name_df["family_name"].str.strip(); name_df["title"] = name_df["title"].str.strip(); name_df["name"] = name_df["name"].str.strip()
        all_df = pd.concat([all_df, name_df], axis=1)
    else: all_df['title'] = 'unknown' # 이름 분리 실패 시 'unknown' title 추가
    return all_df
def get_preprocessed_dataset_3(all_df): 
    if 'title' not in all_df.columns:
        if all_df['Age'].isnull().any(): all_df['Age'] = all_df['Age'].fillna(all_df['Age'].median())
        return all_df
    title_age_median = all_df[["title", "Age"]].groupby("title").median().round().reset_index(); title_age_median.columns = ["title", "title_age_median"]
    all_df = pd.merge(all_df, title_age_median, on="title", how="left"); all_df.loc[all_df["Age"].isnull(), "Age"] = all_df["title_age_median"]
    if "title_age_median" in all_df.columns: all_df = all_df.drop(["title_age_median"], axis=1)
    return all_df
def get_preprocessed_dataset_4(all_df): 
    all_df["family_num"] = all_df["Parch"] + all_df["SibSp"]; all_df.loc[all_df["family_num"] == 0, "alone"] = 1; all_df["alone"] = all_df["alone"].fillna(0).astype(float)
    cols_to_drop = ["PassengerId", "Name", "family_name", "name", "Ticket", "Cabin"]; existing_cols_to_drop = [col for col in cols_to_drop if col in all_df.columns]
    return all_df.drop(existing_cols_to_drop, axis=1)
def get_preprocessed_dataset_5(all_df):
    if 'title' in all_df.columns: all_df.loc[~((all_df["title"] == "Mr") | (all_df["title"] == "Miss") | (all_df["title"] == "Mrs") | (all_df["title"] == "Master")), "title"] = "other"
    all_df["Embarked"] = all_df["Embarked"].fillna("missing")
    return all_df
def get_preprocessed_dataset_6(all_df): 
    category_features = all_df.select_dtypes(include=['object']).columns
    for cat_feat in category_features:
        le = LabelEncoder(); valid_indices = all_df[cat_feat].notna()
        if valid_indices.any():
            all_df.loc[valid_indices, cat_feat] = le.fit_transform(all_df.loc[valid_indices, cat_feat])
            try: all_df[cat_feat] = pd.to_numeric(all_df[cat_feat])
            except ValueError: pass
    return all_df

타이타닉 원본 데이터를 단계별로 가공하는 함수들입니다. 결측치 처리, 특징 생성 및 추출, 범주형 데이터의 수치화 등을 수행합니다.

In [None]:
def get_preprocessed_data_raw(): 
    """CSV 로드, 전처리, 학습/검증용 데이터 배열 반환"""
    try: CURRENT_FILE_PATH = os.path.dirname(os.path.abspath(__file__))
    except NameError: CURRENT_FILE_PATH = os.getcwd()
    train_data_path = os.path.join(CURRENT_FILE_PATH, "train.csv")
    test_data_path = os.path.join(CURRENT_FILE_PATH, "test.csv") # test는 로드만 하고 사용 안함
    try:
        train_df = pd.read_csv(train_data_path)
        test_df = pd.read_csv(test_data_path)
    except FileNotFoundError as e: print(f"Error: {e}"); sys.exit(1)

    all_df = pd.concat([train_df, test_df], sort=False) # 전처리 일관성 위해 합침
    all_df = get_preprocessed_dataset_1(all_df); all_df = get_preprocessed_dataset_2(all_df)
    all_df = get_preprocessed_dataset_3(all_df); all_df = get_preprocessed_dataset_4(all_df)
    all_df = get_preprocessed_dataset_5(all_df); all_df = get_preprocessed_dataset_6(all_df)
    train_X_df = all_df[~all_df["Survived"].isnull()].drop("Survived", axis=1)
    train_y = train_df["Survived"]

    # 학습 데이터에 대해서만 최종 NaN 및 타입 확인
    print("\n--- Final Data Check & Fill NaNs (Train Data Only) ---")
    columns_to_check = train_X_df.columns
    df = train_X_df
    for col in columns_to_check:
        if df[col].dtype == 'object':
            try: df[col] = pd.to_numeric(df[col])
            except ValueError: df[col] = 0 # 변환 불가시 0으로 채움
        if df[col].isnull().any(): df[col] = df[col].fillna(0) # NaN 0으로 채움
    train_X = train_X_df.reset_index(drop=True)

    print("Train data preprocessing finished.")
    return train_X.values, train_y.values # 학습 데이터(X, y)만 반환

CSV 파일을 로드하고 위의 전처리 함수들을 순서대로 호출합니다. 최종적으로 학습 데이터만 넘파이 배열 형태로 반환합니다.

In [None]:
def get_dataloaders(train_X, train_y, batch_size_config): 
    """학습/검증 DataLoader 생성"""
    dataset = TitanicDataset(train_X, train_y)
    generator = torch.Generator().manual_seed(42) # 데이터 분할 재현성 위한 시드 고정
    train_dataset, validation_dataset = random_split(dataset, [0.8, 0.2], generator=generator)
    print(f"\nTrain size: {len(train_dataset)}, Validation size: {len(validation_dataset)}")
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size_config, shuffle=True)
    validation_loader = DataLoader(dataset=validation_dataset, batch_size=len(validation_dataset)) # 검증은 전체 데이터 사용
    return train_loader, validation_loader

전처리된 학습 데이터(X, y)와 배치 크기를 받아, 데이터를 학습용과 검증용으로 8:2 비율로 나눈 뒤 각각에 대한 DataLoader 객체를 생성하여 반환합니다.

In [None]:
class MyModel(nn.Module): 
    """활성화 함수와 은닉층 크기를 인자로 받는 MLP 모델"""
    def __init__(self, n_input, n_output, activation_fn_class, n_hidden1=20, n_hidden2=20):
        super().__init__(); activation = activation_fn_class()
        self.model = nn.Sequential(
            nn.Linear(n_input, n_hidden1), activation,
            nn.Linear(n_hidden1, n_hidden2), activation,
            nn.Linear(n_hidden2, n_output)
        )
    def forward(self, x): return self.model(x)

파이토치 nn.Module을 상속받아 간단한 모델 구조를 정의합니다.

In [None]:
def get_model_and_optimizer(activation_fn_class, learning_rate, n_hidden1=20, n_hidden2=20): # ... (이전과 동일) ...
    """모델 객체와 SGD 옵티마이저 생성"""
    model = MyModel(10, 2, activation_fn_class, n_hidden1, n_hidden2) # 입력 10개, 출력 2개 고정
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    return model, optimizer

MyModel 클래스를 이용해 모델 객체를 생성하고, 학습에 사용할 SGD 옵티마이저를 설정하여 함께 반환합니다.

In [None]:
# --- 학습 루프: 최종 정확도만 반환 ---
def simplified_training_loop(model, optimizer, train_data_loader, validation_data_loader, n_epochs, loss_fn):
    """모델 학습/검증 수행 후 최종 검증 정확도 반환 (최고 기록 없음)"""
    print(f"\nStart Training for {n_epochs} epochs...")
    final_validation_accuracy = 0.0 # 마지막 에포크 정확도 저장용

    for epoch in range(1, n_epochs + 1):
        # --- 학습 
        model.train(); loss_train_epoch = 0.0; correct_train_epoch = 0; total_train_samples = 0
        for batch in train_data_loader: # ... (학습 로직 동일) ...
            input_data = batch['input']; target = batch['target']; optimizer.zero_grad(); output_train = model(input_data); loss = loss_fn(output_train, target); loss.backward(); optimizer.step()
            loss_train_epoch += loss.item() * input_data.size(0); _, predicted = torch.max(output_train.data, 1); total_train_samples += target.size(0); correct_train_epoch += (predicted == target).sum().item()
        avg_loss_train = loss_train_epoch / total_train_samples if total_train_samples else 0
        train_accuracy = 100 * correct_train_epoch / total_train_samples if total_train_samples else 0

        # --- 검증 
        model.eval(); loss_val_epoch = 0.0; correct_val_epoch = 0; total_val_samples = 0
        with torch.no_grad():
            for batch in validation_data_loader: # ... (검증 로직 동일) ...
                input_data = batch['input']; target = batch['target']; output_validation = model(input_data); loss = loss_fn(output_validation, target); loss_val_epoch += loss.item() * input_data.size(0)
                _, predicted = torch.max(output_validation.data, 1); total_val_samples += target.size(0); correct_val_epoch += (predicted == target).sum().item()
        avg_loss_validation = loss_val_epoch / total_val_samples if total_val_samples else 0
        validation_accuracy = 100 * correct_val_epoch / total_val_samples if total_val_samples else 0
        final_validation_accuracy = validation_accuracy # 마지막 값 갱신

        # 로깅 및 출력 
        if wandb.run: wandb.log({"Epoch": epoch, "Training loss": avg_loss_train, "Validation loss": avg_loss_validation, "Training accuracy": train_accuracy, "Validation accuracy": validation_accuracy})
        if epoch % 100 == 0 or epoch == 1: print(f"Epoch {epoch:>{len(str(n_epochs))}}/{n_epochs}, Train Loss: {avg_loss_train:.4f}, Acc: {train_accuracy:.2f}% | Val Loss: {avg_loss_validation:.4f}, Acc: {validation_accuracy:.2f}%")

    print(f"\n--- Run Finished --- Final Val Acc: {final_validation_accuracy:.2f}%")
    return final_validation_accuracy # 최종 검증 정확도만 반환

모델 학습과 검증을 지정된 에포크만큼 수행합니다.

In [None]:
# --- 메인 실행 함수 하이퍼파라미터 탐색 및 최적 조합 저장 ---
def find_hyperparameters(args):
    """최적 하이퍼파라미터 '조합'만 찾아 json 파일로 저장"""
    current_time_str = datetime.now().astimezone().strftime('%Y-%m-%d_%H-%M-%S')
    # 실험 설정
    batch_sizes_to_test = [16, 32, 64, 128]
    activation_functions_to_test = {"Sigmoid": nn.Sigmoid, "ReLU": nn.ReLU, "ELU": nn.ELU, "LeakyReLU": nn.LeakyReLU}
    learning_rate = 1e-3; n_hidden_units = [20, 20]; loss_fn = nn.CrossEntropyLoss()
    # 데이터 로딩
    print("Preprocessing data for tuning..."); train_X, train_y = get_preprocessed_data_raw()[:2] # 학습 데이터만 필요
    # 최고 성능 추적
    overall_best_accuracy = -1.0; overall_best_config = {}

    # 실험 루프
    for batch_size in batch_sizes_to_test:
        print(f"\n{'='*25} Testing Batch Size: {batch_size} {'='*25}")
        train_loader, validation_loader = get_dataloaders(train_X, train_y, batch_size)
        for activation_name, activation_fn_class in activation_functions_to_test.items():
            print(f"\n--- Testing Activation: {activation_name} ---")
            config = {'epochs': args.epochs, 'batch_size': batch_size, 'learning_rate': learning_rate, 'n_hidden_unit_list': n_hidden_units, 'activation_function': activation_name}
            run_name = f"Tune_{activation_name}_BS{batch_size}_{current_time_str}"
            run = wandb.init(mode="online" if args.wandb else "disabled", project="titanic_hyperparameter_tuning", name=run_name, config=config, reinit=True)
            print("--- Run Config ---"); print(config)
            model, optimizer = get_model_and_optimizer(activation_fn_class, config['learning_rate'], n_hidden1=config['n_hidden_unit_list'][0], n_hidden2=config['n_hidden_unit_list'][1])

            # 간소화된 학습 루프 실행, 최종 정확도 받기
            final_accuracy_run = simplified_training_loop(model, optimizer, train_loader, validation_loader, config['epochs'], loss_fn)

            # 최고 조합 갱신 확인 (최종 정확도 기준)
            if final_accuracy_run > overall_best_accuracy:
                overall_best_accuracy = final_accuracy_run
                overall_best_config = config # config만 저장
                print(f"🏅 New Best Config Found! Final Val Acc: {overall_best_accuracy:.2f}%, Config: {activation_name}/BS={batch_size}")
            run.finish()

    # 최종 결과 요약 및 저장 (config만)
    print("\n" + "=" * 60); print(" Hyperparameter Tuning Finished (Config Only) "); print("=" * 60)
    if overall_best_config:
        print(f"Overall Best Final Val Acc: {overall_best_accuracy:.2f}%")
        print(f"Best Config Found: {overall_best_config}")
        best_config_path = "best_hyperparameters.json"
        try:
            with open(best_config_path, 'w') as f: json.dump(overall_best_config, f, indent=4)
            print(f"Best hyperparameters saved to {best_config_path}")
        except Exception as e: print(f"Error saving best config: {e}")
    else: print("No successful training run recorded.")
    print("=" * 60 + "\n")

지정된 배치 크기와 활성화 함수들의 모든 조합에 대해 루프를 돕니다. 각 조합마다 simplified_training_loop를 실행하여 최종 검증 정확도를 얻고, 이 정확도가 지금까지 기록된 최고 정확도보다 높으면 해당 조합의 설정을 overall_best_config에 저장합니다.

In [None]:
# --- 명령줄 인자 처리 및 실행 ---
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Find best hyperparameters (config only) for Titanic.")
    parser.add_argument("--wandb", action=argparse.BooleanOptionalAction, default=False, help="Enable WandB logging")
    parser.add_argument("-e", "--epochs", type=int, default=1000, help="Epochs per combination (default: 1000)")

    # Jupyter 환경 감지 및 처리
    if 'ipykernel' in sys.modules:
        print("Running in interactive mode. Using default args: epochs=1000, wandb=False")
        args = argparse.Namespace(wandb=False, epochs=1000) # 기본값 설정
    else:
        args = parser.parse_args()

    find_hyperparameters(args) # 메인 함수 실행

# [요구사항 3]

In [None]:
import os, json, sys, argparse, copy
import pandas as pd
import torch
from torch import nn, optim # optim은 재학습 시 필요
from torch.utils.data import Dataset, DataLoader, random_split # random_split은 재학습 시 필요
from sklearn.preprocessing import LabelEncoder # 전처리 함수에 필요
from pathlib import Path

class TitanicDataset(Dataset):
    def __init__(self, X, y): self.X = torch.FloatTensor(X); self.y = torch.LongTensor(y)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return {'input': self.X[idx], 'target': self.y[idx]}
    def __str__(self): return f"Data Size: {len(self.X)}, Input Shape: {self.X.shape}, Target Shape: {self.y.shape}"

class TitanicTestDataset(Dataset):
    def __init__(self, X): self.X = torch.FloatTensor(X)
    def __len__(self): return len(self.X)
    def __getitem__(self, idx): return {'input': self.X[idx]}
    def __str__(self): return f"Data Size: {len(self.X)}, Input Shape: {self.X.shape}"

In [None]:
# ... 데이터 전처리 보조 함수 get_preprocessed_dataset_1 ~ _6 정의
def get_preprocessed_dataset_1(all_df): # ... (동일) ...
    Fare_mean = all_df[["Pclass", "Fare"]].groupby("Pclass").mean().reset_index(); Fare_mean.columns = ["Pclass", "Fare_mean"]
    all_df = pd.merge(all_df, Fare_mean, on="Pclass", how="left"); all_df.loc[all_df["Fare"].isnull(), "Fare"] = all_df["Fare_mean"]
    return all_df.drop(columns=["Fare_mean"])
# get_preprocessed_dataset_2 ~ _6 정의
def get_preprocessed_dataset_2(all_df):
    name_df = all_df["Name"].str.split("[,.]", n=2, expand=True)
    if name_df.shape[1] == 3:
        name_df.columns = ["family_name", "title", "name"]; name_df["family_name"] = name_df["family_name"].str.strip(); name_df["title"] = name_df["title"].str.strip(); name_df["name"] = name_df["name"].str.strip()
        all_df = pd.concat([all_df, name_df], axis=1)
    else: all_df['title'] = 'unknown'
    return all_df
def get_preprocessed_dataset_3(all_df):
    if 'title' not in all_df.columns:
        if all_df['Age'].isnull().any(): all_df['Age'] = all_df['Age'].fillna(all_df['Age'].median())
        return all_df
    title_age_median = all_df[["title", "Age"]].groupby("title").median().round().reset_index(); title_age_median.columns = ["title", "title_age_median"]
    all_df = pd.merge(all_df, title_age_median, on="title", how="left"); all_df.loc[all_df["Age"].isnull(), "Age"] = all_df["title_age_median"]
    if "title_age_median" in all_df.columns: all_df = all_df.drop(["title_age_median"], axis=1)
    return all_df
def get_preprocessed_dataset_4(all_df):
    all_df["family_num"] = all_df["Parch"] + all_df["SibSp"]; all_df.loc[all_df["family_num"] == 0, "alone"] = 1; all_df["alone"] = all_df["alone"].fillna(0).astype(float)
    cols_to_drop = ["PassengerId", "Name", "family_name", "name", "Ticket", "Cabin"]; existing_cols_to_drop = [col for col in cols_to_drop if col in all_df.columns]
    return all_df.drop(existing_cols_to_drop, axis=1)
def get_preprocessed_dataset_5(all_df):
    if 'title' in all_df.columns: all_df.loc[~((all_df["title"] == "Mr") | (all_df["title"] == "Miss") | (all_df["title"] == "Mrs") | (all_df["title"] == "Master")), "title"] = "other"
    all_df["Embarked"] = all_df["Embarked"].fillna("missing")
    return all_df
def get_preprocessed_dataset_6(all_df):
    category_features = all_df.select_dtypes(include=['object']).columns
    for cat_feat in category_features:
        le = LabelEncoder(); valid_indices = all_df[cat_feat].notna()
        if valid_indices.any():
            all_df.loc[valid_indices, cat_feat] = le.fit_transform(all_df.loc[valid_indices, cat_feat])
            try: all_df[cat_feat] = pd.to_numeric(all_df[cat_feat])
            except ValueError: pass
    return all_df

In [None]:
def get_preprocessed_data_raw():
    """CSV 로드, 전처리, 최종 데이터 배열 반환 (테스트 ID 포함)"""
    try: CURRENT_FILE_PATH = os.path.dirname(os.path.abspath(__file__))
    except NameError: CURRENT_FILE_PATH = os.getcwd()
    train_data_path = os.path.join(CURRENT_FILE_PATH, "train.csv")
    test_data_path = os.path.join(CURRENT_FILE_PATH, "test.csv")
    try:
        train_df = pd.read_csv(train_data_path)
        test_df = pd.read_csv(test_data_path)
    except FileNotFoundError as e: print(f"Error: {e}"); sys.exit(1)

    test_passenger_ids = test_df['PassengerId'] # 테스트 승객 ID 저장
    all_df = pd.concat([train_df, test_df], sort=False)

    # 전처리 단계 적용
    all_df = get_preprocessed_dataset_1(all_df); all_df = get_preprocessed_dataset_2(all_df)
    all_df = get_preprocessed_dataset_3(all_df); all_df = get_preprocessed_dataset_4(all_df)
    all_df = get_preprocessed_dataset_5(all_df); all_df = get_preprocessed_dataset_6(all_df)

    # 학습/테스트 분리
    train_X_df = all_df[~all_df["Survived"].isnull()].drop("Survived", axis=1)
    train_y = train_df["Survived"]
    test_X_df = all_df[all_df["Survived"].isnull()].drop("Survived", axis=1)

    # 최종 데이터 확인 및 NaN 처리 (분리된 데이터프레임에 각각 적용)
    print("\n--- Final Data Check & Fill NaNs ---")
    columns_to_check = train_X_df.columns
    for df in [train_X_df, test_X_df]:
        df_name = "Train" if df is train_X_df else "Test"
        for col in columns_to_check:
            if df[col].dtype == 'object':
                try: df[col] = pd.to_numeric(df[col])
                except ValueError: df[col] = 0 # 변환 불가시 0으로 채움
            if df[col].isnull().any(): df[col] = df[col].fillna(0) # NaN 0으로 채움

    # 최종 인덱스 리셋 및 Numpy 배열 변환
    train_X = train_X_df.reset_index(drop=True).values
    test_X = test_X_df.reset_index(drop=True).values
    train_y = train_y.values # Numpy 배열로 변환

    print("Data preprocessing finished for submission generation.")
    return train_X, train_y, test_X, test_passenger_ids

In [None]:
# --- DataLoader 생성 함수 (get_dataloaders) ---
def get_dataloaders(train_X, train_y, test_X, batch_size_config):
    """학습/검증/테스트 DataLoader 모두 생성"""
    dataset = TitanicDataset(train_X, train_y)
    generator = torch.Generator().manual_seed(42) # 시드 고정
    train_dataset, validation_dataset = random_split(dataset, [0.8, 0.2], generator=generator)
    test_dataset = TitanicTestDataset(test_X)

    print(f"\nTrain size: {len(train_dataset)}, Validation size: {len(validation_dataset)}, Test size: {len(test_dataset)}")

    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size_config, shuffle=True)
    validation_loader = DataLoader(dataset=validation_dataset, batch_size=len(validation_dataset))
    # 테스트 데이터 로더 생성 (비어있지 않을 때만)
    test_loader = DataLoader(dataset=test_dataset, batch_size=len(test_dataset)) if len(test_dataset) > 0 else None

    return train_loader, validation_loader, test_loader

In [None]:
# --- 신경망 모델 정의 (MyModel) ---
class MyModel(nn.Module):
    """활성화 함수와 은닉층 크기를 인자로 받는 MLP 모델"""
    def __init__(self, n_input, n_output, activation_fn_class, n_hidden1=20, n_hidden2=20):
        super().__init__(); activation = activation_fn_class()
        self.model = nn.Sequential(
            nn.Linear(n_input, n_hidden1), activation,
            nn.Linear(n_hidden1, n_hidden2), activation,
            nn.Linear(n_hidden2, n_output)
        )
    def forward(self, x): return self.model(x)

In [None]:
# --- 모델 및 옵티마이저 생성 함수 (get_model_and_optimizer) ---
def get_model_and_optimizer(activation_fn_class, learning_rate, n_hidden1=20, n_hidden2=20):
    """모델 객체와 SGD 옵티마이저 생성"""
    model = MyModel(10, 2, activation_fn_class, n_hidden1, n_hidden2) # 입력 10개, 출력 2개 고정
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    return model, optimizer

In [None]:
# --- 모델 학습 및 검증 루프 ---
def training_loop_with_early_stopping(model, optimizer, train_data_loader, validation_data_loader, n_epochs, loss_fn):
    """모델 학습/검증 수행 및 '최고 성능' 모델 상태 반환 (조기 종료 로직)"""
    best_validation_accuracy = -1.0
    best_model_state = None
    best_epoch = 0

    print(f"\nStart Final Training for {n_epochs} epochs (with early stopping logic)...")
    for epoch in range(1, n_epochs + 1):
        # --- 학습 ---
        model.train(); loss_train_epoch = 0.0; correct_train_epoch = 0; total_train_samples = 0
        for batch in train_data_loader:
            input_data = batch['input']; target = batch['target']; optimizer.zero_grad(); output_train = model(input_data); loss = loss_fn(output_train, target); loss.backward(); optimizer.step()
            loss_train_epoch += loss.item() * input_data.size(0); _, predicted = torch.max(output_train.data, 1); total_train_samples += target.size(0); correct_train_epoch += (predicted == target).sum().item()
        avg_loss_train = loss_train_epoch / total_train_samples if total_train_samples else 0
        train_accuracy = 100 * correct_train_epoch / total_train_samples if total_train_samples else 0

        # --- 검증 ---
        model.eval(); loss_val_epoch = 0.0; correct_val_epoch = 0; total_val_samples = 0
        with torch.no_grad():
            for batch in validation_data_loader:
                input_data = batch['input']; target = batch['target']; output_validation = model(input_data); loss = loss_fn(output_validation, target); loss_val_epoch += loss.item() * input_data.size(0)
                _, predicted = torch.max(output_validation.data, 1); total_val_samples += target.size(0); correct_val_epoch += (predicted == target).sum().item()
        avg_loss_validation = loss_val_epoch / total_val_samples if total_val_samples else 0
        validation_accuracy = 100 * correct_val_epoch / total_val_samples if total_val_samples else 0

        # 최고 성능 갱신 확인,조기 종료 로직
        if validation_accuracy > best_validation_accuracy:
            best_validation_accuracy = validation_accuracy
            best_model_state = copy.deepcopy(model.state_dict())
            best_epoch = epoch
            print(f" New best validation accuracy: {best_validation_accuracy:.2f}% at epoch {epoch}")

        # 주기적 출력
        if epoch % 100 == 0 or epoch == 1:
            print(f"Epoch {epoch:>{len(str(n_epochs))}}/{n_epochs}, Train Loss: {avg_loss_train:.4f}, Acc: {train_accuracy:.2f}% | Val Loss: {avg_loss_validation:.4f}, Acc: {validation_accuracy:.2f}%{' *Best*' if epoch == best_epoch else ''}")

    print(f"\n--- Final Training Summary --- Best Val Acc: {best_validation_accuracy:.2f}% achieved at epoch {best_epoch}")
    # 최고 성능 모델 상태와 에포크 반환
    return best_validation_accuracy, best_model_state, best_epoch

최적 하이퍼파라미터로 모델을 재학습시키는 함수입니다. 학습 중 매 에포크마다 검증 데이터 성능을 확인하고, 최고 성능을 보인 시점의 모델 상태와 에포크 번호를 기록하여 최종적으로 반환합니다.

In [None]:
# --- 테스트 함수 (test_model) ---
# (find_best_hyperparameters.py 와 동일하게 정의)
def test_model(model, test_data_loader):
    """학습된 모델로 테스트 데이터 예측 수행"""
    model.eval(); predictions = []
    with torch.no_grad():
        for batch in test_data_loader: input_data = batch['input']; output = model(input_data); _, predicted = torch.max(output.data, 1); predictions.extend(predicted.cpu().numpy())
    return predictions

In [None]:
# --- Submission 파일 생성 함수 (create_submission) ---
# (find_best_hyperparameters.py 와 동일하게 정의)
def create_submission(predictions, passenger_ids, output_file="submission.csv"):
    """예측 결과와 승객 ID로 submission CSV 파일 생성"""
    if len(predictions) != len(passenger_ids): print(f"Error: Prediction count ({len(predictions)}) != Passenger ID count ({len(passenger_ids)})"); return
    submission_df = pd.DataFrame({"PassengerId": passenger_ids, "Survived": predictions})
    try: submission_df.to_csv(output_file, index=False); print(f"Submission file saved: {output_file}")
    except Exception as e: print(f"Error saving submission file: {e}")

In [None]:
# --- 메인 실행 함수 (최적 모델 로드 및 예측) ---
def generate_submission_main(config_path="best_hyperparameters.json", retrain_epochs=None):
    """최적 설정 로드, 해당 설정으로 재학습(조기종료 적용), 예측, submission 파일 생성"""
    # 최적 설정 로드
    try:
        with open(config_path, 'r') as f: best_config = json.load(f)
    except FileNotFoundError: print(f"Error: Config file '{config_path}' not found."); sys.exit(1)
    except json.JSONDecodeError: print(f"Error: Could not decode JSON from '{config_path}'."); sys.exit(1)

    print("--- Loaded Best Config ---"); print(best_config)

    # 설정값 추출
    best_activation_name = best_config.get('activation_function')
    best_batch_size = best_config.get('batch_size')
    learning_rate = best_config.get('learning_rate', 1e-3)
    n_hidden1 = best_config.get('n_hidden_unit_list', [20, 20])[0]
    n_hidden2 = best_config.get('n_hidden_unit_list', [20, 20])[1]
    # 재학습 에포크 수 결정 (인자 우선 > config 값 > 기본값 1000)
    epochs_to_train = retrain_epochs if retrain_epochs is not None else best_config.get('epochs', 1000)
    loss_fn = nn.CrossEntropyLoss()

    # 활성화 함수 클래스 가져오기
    activation_functions_map = {"Sigmoid": nn.Sigmoid, "ReLU": nn.ReLU, "ELU": nn.ELU, "LeakyReLU": nn.LeakyReLU}
    if not best_activation_name or best_activation_name not in activation_functions_map:
        print(f"Error: Invalid activation function '{best_activation_name}' in config."); sys.exit(1)
    best_activation_class = activation_functions_map[best_activation_name]

    print("\nPreprocessing data for final training and submission...")
    train_X, train_y, test_X, test_passenger_ids = get_preprocessed_data_raw()

    if not best_batch_size: print("Error: 'batch_size' not found in config."); sys.exit(1)
    train_loader, validation_loader, test_loader = get_dataloaders(train_X, train_y, test_X, best_batch_size)
    if test_loader is None: print("Error: Test data is empty, cannot generate submission."); sys.exit(1)

    model, optimizer = get_model_and_optimizer(best_activation_class, learning_rate, n_hidden1, n_hidden2)

    # === 최종 모델 재학습 (조기 종료 로직 적용) ===
    final_best_accuracy, final_best_model_state, final_best_epoch = training_loop_with_early_stopping(
        model, optimizer, train_loader, validation_loader, epochs_to_train, loss_fn
    )

    # 최고 성능 모델 상태 로드
    if final_best_model_state:
        model.load_state_dict(final_best_model_state)
        print(f"\nLoaded model state from epoch {final_best_epoch} with validation accuracy {final_best_accuracy:.2f}%")

        # 예측 수행
        print("Generating predictions with the best epoch model...")
        test_predictions = test_model(model, test_loader)

        # Submission 파일 생성
        submission_filename = f"submission_final_{best_activation_name}_bs{best_batch_size}_epoch{final_best_epoch}.csv"
        create_submission(test_predictions, test_passenger_ids, output_file=submission_filename)
    else:
        print("Final training did not produce a best model state. Cannot generate submission.")

* best_hyperparameters.json 파일을 읽어 최적 조합 설정을 불러옵니다.

* 전체 데이터를 다시 로드하고 최적 배치 크기로 DataLoader를 생성합니다.

* 최적 활성화 함수로 모델과 옵티마이저를 생성합니다.

* training_loop_with_early_stopping 함수를 호출하여 모델을 재학습시키고, 이 과정에서 검증 성능이 가장 좋았던 시점(조기 종료 시점)의 모델 상태와 에포크 번호를 얻습니다.

* 얻어진 최적 시점의 모델 상태를 로드합니다.

* test_model 함수로 테스트 데이터 예측을 수행합니다.

* create_submission 함수로 최종 제출 파일을 생성합니다 (파일 이름에 최적 조합 및 에포크 포함).

In [None]:
# --- 명령줄 인자 처리 및 실행 ---
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Retrain the best model with early stopping and generate submission.")
    parser.add_argument("--config", default="best_hyperparameters.json", help="Path to best hyperparameters JSON")
    # 재학습 에포크 수를 인자로 받을 수 있도록 추가 (선택사항)
    parser.add_argument("-e", "--retrain_epochs", type=int, default=None, help="Number of epochs for retraining (uses config value if not set)")

    if 'ipykernel' in sys.modules:
        print("Running in interactive mode. Using default config path and epochs from config (or 1000).")
        args = parser.parse_args([]) # 기본값 사용
    else:
        args = parser.parse_args()

    generate_submission_main(config_path=args.config, retrain_epochs=args.retrain_epochs) # 메인 함수 실행

![캐글 제출 결](dl_titanic_leaderboard.png)