# Modular 구조

```python

modular_model/ # 프로젝트 폴더
├── service/
│   ├── utils.py
│   ├── dataset.py # 데이터
│   ├── model.py # 모델
│   └── run.py # 학습 프로세스
└── data/ # 학습 데이터
    ├── train.csv
    ├── test.csv
    └── submission.csv
```

## 폴더 생성

In [1]:
import os

for dir in ['service', 'data']:
    # 만약 해당 폴더가 없다면,
    if not os.path.exists(dir):
        # 해당 폴더를 만들어줘
        os.makedirs(dir)

In [2]:
%%writefile service/config.py

import os

BASE_DIR = os.getcwd()
DATA_DIR = os.path.join(BASE_DIR, 'data')
MODEL_DIR = os.path.join(DATA_DIR, 'models')

# 모델 설정
MODEL_VERSION = 'v1_rf'
MODEL_FILENAME = f'model_{MODEL_VERSION}.pkl'
MODEL_PATH = os.path.join(MODEL_DIR, MODEL_FILENAME)

DATA_PATH = os.path.join(DATA_DIR, 'raw.csv')
#TEST_PATH = os.path.join(DATA_DIR, 'test.csv')
#SUBMISSION_PATH = os.path.join(DATA_DIR, f'submission_{MODEL_VERSION}.csv')

N_FOLDS = 5

Overwriting service/config.py


## utils 생성

In [3]:
%%writefile service/utils.py

import os
import random
import numpy as np
import torch

def reset_seeds(func, seed=42):
  random.seed(seed)
  os.environ['PYTHONHASHSEED'] = str(seed)    # 파이썬 환경변수 시드 고정
  np.random.seed(seed)
  torch.manual_seed(seed) # cpu 연산 무작위 고정
  torch.cuda.manual_seed(seed) # gpu 연산 무작위 고정
  torch.backends.cudnn.deterministic = True  # cuda 라이브러리에서 Deterministic(결정론적)으로 예측하기 (예측에 대한 불확실성 제거 )

  def wrapper_func(*args, **kwargs):
    return func(*args, **kwargs)

  return wrapper_func


Overwriting service/utils.py


## dataset 생성

In [4]:
%%writefile service/dataset.py

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from utils import reset_seeds
from config import DATA_PATH
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder

# 트레인 데이터 로드
def __load_data() -> pd.DataFrame:
    data = pd.read_csv(DATA_PATH)
    data = data.replace(r'^\s*$', np.nan, regex=True)
    return data

def __process_drop(train, val, test):
    drop_cols = ['customerID','StreamingTV', 'StreamingMovies',
                 'OnlineSecurity', 'OnlineBackup','DeviceProtection',]

    train.drop(drop_cols, axis=1, inplace=True) # 모델이 학습하는데 사용하는 데이터
    val.drop(drop_cols, axis=1, inplace=True) # 모델의 학습을 평가(잘했는지?? 못했는지??)하기 위한 데이터
    test.drop(drop_cols, axis=1, inplace=True)

def __fill_na(train, val, test):
    null_cols = ['TotalCharges']
    for df in [train, val, test]:
      for col in null_cols:
        df[col] = df[col].astype(float)
        df[col] = df[col].fillna(df[col].mean())
    return train, val, test

def __preprocess_resample(train, val, test):
    print("__preprocess_resample start")
    print(f"train.shape: {train.shape} / test.shape: {val.shape}")
    X_train, y_train = SMOTE().fit_resample(train.drop(['Churn'], axis=1), train['Churn'])
    X_val, y_val = SMOTE().fit_resample(val.drop(['Churn'], axis=1), val['Churn'])
    X_test = test.drop(['Churn'], axis=1)
    y_test = test['Churn']

    print("__preprocess_resample end")
    print(f"X_train.shape: {X_train.shape} / X_test.shape: {X_val.shape}")
    return X_train, X_val, y_train, y_val, X_test, y_test

def __preprocess_label_encoding(train, val, test):
    results = []

    cat_features = []

    # Remove categorical features from normal columns
    normal_cols = list(set(train.columns) - set(cat_features))

    # Initialize dictionary to store label encoders
    label_encoders = {}

    # Fit label encoders on training data and transform all datasets
    encoded_features = {}
    for feature in cat_features:
        label_encoders[feature] = LabelEncoder()
        # Fit on training data
        encoded_features[feature] = label_encoders[feature].fit_transform(train[feature])

    pd_list = [train, val, test]
    for i, df in enumerate(pd_list):
        # Create a copy of the dataframe
        temp_df = df.copy()

        # Transform categorical features
        for feature in cat_features:
            try:
                temp_df[feature] = label_encoders[feature].transform(df[feature])
            except ValueError as e:
                # Handle unseen categories in validation/test set
                print(f"Warning: Found unseen labels in {feature} for dataset {i+1}")
                # Get unique values in current dataset
                unique_vals = df[feature].unique()
                # Find values not in training set
                unseen_vals = [x for x in unique_vals if x not in label_encoders[feature].classes_]
                if unseen_vals:
                    print(f"Unseen values in {feature}: {unseen_vals}")
                    # Replace unseen values with the most frequent value from training
                    most_frequent = train[feature].mode()[0]
                    temp_df.loc[df[feature].isin(unseen_vals), feature] = most_frequent
                    # Transform again after replacing unseen values
                    temp_df[feature] = label_encoders[feature].transform(temp_df[feature])

        # Only select columns that exist in current dataframe
        available_cols = sorted([col for col in normal_cols if col in df.columns])

        # Combine all features
        result_df = temp_df[available_cols + cat_features].copy()
        results.append(result_df.reset_index(drop=True))

    return results[0], results[1], results[2]


def __preprocess_dummy_encoding(train, val, test):
    results = []

    cat_features = ['PaymentMethod',
                    'MultipleLines', 'InternetService', 'Contract',
                    'TechSupport', ]

    # Remove target from normal_cols calculation
    normal_cols = list(set(train.columns) - set(cat_features))

    # Get dummy variables for categorical features in training set
    dummies_train = pd.get_dummies(train[cat_features], prefix=cat_features)

    # Get dummy column names from training set for consistent columns across sets
    dummy_columns = dummies_train.columns

    pd_list = [train, val, test]
    for i, df in enumerate(pd_list, start=1):
        # Create dummies with only columns that were in training data
        dummies_df = pd.get_dummies(df[cat_features], prefix=cat_features)

        # Ensure all dummy columns from training are present
        for col in dummy_columns:
            if col not in dummies_df.columns:
                dummies_df[col] = 0

        # Keep only dummy columns from training (in case test has categories not in train)
        dummies_df = dummies_df[dummy_columns]

        # Only select columns that exist in current dataframe
        available_cols = sorted([col for col in normal_cols if col in df.columns])

        # Concatenate original features with dummy variables
        results.append(
            pd.concat(
                [df[available_cols].reset_index(drop=True), dummies_df.reset_index(drop=True)],
                axis=1
            ).reset_index(drop=True)
        )


    return results[0], results[1], results[2]



def __preprocess_yn (train, val, test):
  yn_categories = ['gender', 'Partner', 'Dependents','PhoneService', 'PaperlessBilling','Churn']
  for category in yn_categories:
    train[category] = train[category].apply(lambda x: 1 if x in ['Yes','Female'] else 0)
    val[category] = val[category].apply(lambda x: 1 if x in ['Yes','Female'] else 0)
    test[category] = test[category].apply(lambda x: 1 if x in ['Yes','Female'] else 0)

  return train, val, test

def __preprocess_data(train, val, test):
    print(f'before: {train.shape} / {test.shape}')
    # 필요없는 컬럼 제거
    __process_drop(train, val, test)
    train, val, test = __fill_na(train,val,test)

    # 범주형 처리
    #train, val, test = __preprocess_label_encoding(train, val, test)
    return __preprocess_dummy_encoding(train, val, test)


@reset_seeds
def preprocess_dataset():
    # 데이터 로드
    df_raw = __load_data()
    # 데이터 분리
    train_val, test = train_test_split(df_raw, test_size=0.1, stratify=df_raw['Churn'])
    train, val = train_test_split(train_val, test_size=0.1, stratify=train_val['Churn'])
    # 데이터 전처리
    train, val, test = __preprocess_yn(train, val, test)
    train, val, test = __preprocess_data(train, val, test)

    # features, target 분리
    return __preprocess_resample(train, val, test)


Overwriting service/dataset.py


## model 생성

In [5]:
%%writefile service/model.py

from lightgbm import LGBMClassifier, plot_importance
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from utils import reset_seeds

# 모델 생성 후 리턴
@reset_seeds
def get_model(hp:dict=None, model_nm:str=None):
    if not hp:
        hp = {"verbose":-1} # warning 로그 제거

    if not model_nm:
        return RandomForestClassifier(verbose = False, max_depth = 10, min_samples_split = 10)
    elif model_nm == "LGBMClassifier":
        return LGBMClassifier(**hp)
    elif model_nm == "RandomForestClassifier":
        return RandomForestClassifier(verbose = False)
    elif model_nm == "XGBoost":
        return XGBClassifier()

Overwriting service/model.py


## run_model 생성

In [6]:
%%writefile service/run_model.py

import numpy as np
import pandas as pd
import pickle
import os

from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.metrics import f1_score

from dataset import preprocess_dataset
from model import get_model
from utils import reset_seeds
from config import MODEL_PATH, MODEL_DIR, N_FOLDS

def get_cross_validation(shuffle:bool=True, is_kfold:bool=True, n_splits:int=N_FOLDS):
    if is_kfold:
      return KFold(n_splits=n_splits, shuffle=shuffle)
    else:
      return StratifiedKFold(n_splits=n_splits, shuffle=shuffle)

def run_cross_validation(my_model, x_train, y_train, cv, is_kfold:bool=True):
    n_iter = 0
    f1_lst = []
    if is_kfold:
        cross_validation = cv.split(x_train)
    else:
        cross_validation = cv.split(x_train, y_train)

    for train_index, valid_index in cross_validation:
      n_iter += 1
      # 학습용, 검증용 데이터 구성
      train_x, valid_x = x_train.iloc[train_index], x_train.iloc[valid_index]
      train_y, valid_y = y_train.iloc[train_index], y_train.iloc[valid_index]
      # 학습
      my_model.fit(train_x, train_y)
      # 예측
      y_pred = my_model.predict(valid_x)
      # 평가
      f1 = np.round(f1_score(valid_y, y_pred), 4)
      f1_lst.append(f1)
      print(f'{n_iter} 번째 K-fold F1: {f1}, 학습데이터 크기: {train_x.shape}, 검증데이터 크기: {valid_x.shape}')

    return np.mean(f1_lst)

def print_feature_importance(my_model, data):
  feature_importance = my_model.feature_importances_
  indices = np.argsort(feature_importance)[::-1]
  print("Feature Ranking")
  for f in range(data.shape[1]):
    print(f"{data.columns[indices][f]} : {feature_importance[indices][f]}")

@reset_seeds
def main():
    # 데이터 로드 및 분류
    X_train, X_test, y_train, y_test, _, _ = preprocess_dataset()
    # 모델 생성
    my_model = get_model()
    # 교차 검증
    is_Regression = False
    my_cv = get_cross_validation(is_kfold=is_Regression)
    # 모델 학습
    f1 = run_cross_validation(my_model, X_train, y_train, my_cv, is_kfold=is_Regression)

    # 피쳐 중요도
    print_feature_importance(my_model, X_train)

    # 테스트 데이터 예측
    y_pred_main = my_model.predict(X_test)

    # 모델 저장
    os.makedirs(MODEL_DIR, exist_ok=True)
    with open(MODEL_PATH, 'wb') as f:
        pickle.dump(my_model, f)

    print(f"모델이 {MODEL_PATH}에 저장되었습니다.")

    return f1_score(y_test, y_pred_main)

if __name__=="__main__":
  result = main()
  print(f"테스트 스코어는 {result}")

Overwriting service/run_model.py


## inference 생성

In [7]:
%%writefile service/inference.py
import pickle
import pandas as pd
from dataset import preprocess_dataset
from sklearn.metrics import f1_score
from config import MODEL_PATH


def load_model():
    """저장된 모델을 로드합니다."""
    try:
        with open(MODEL_PATH, 'rb') as f:
            model = pickle.load(f)
        print(f"모델을 {MODEL_PATH}에서 로드했습니다.")
        return model
    except Exception as e:
        print(f"모델 로드 실패: {e}")
        return None

def predict_and_submit():
    """테스트 데이터에 대한 예측을 수행하고 제출 파일을 생성합니다."""
    # 데이터 전처리
    _, _, _, _, X_test, y_test = preprocess_dataset()

    # 모델 로드
    model = load_model()
    if model is None:
        return False

    # 예측 수행
    y_pred = model.predict(X_test)
    f1 = f1_score(y_test, y_pred)

    print(f'test f1 score : {f1}')




    return True

if __name__ == "__main__":
    predict_and_submit()

Overwriting service/inference.py


## 모델 실행

In [10]:
!python3 service/run_model.py

Python


In [11]:
!python3 service/inference.py

Python
