In [None]:
# 날짜 파생 변수 생성
import pandas as pd

# 데이터 로드
train = pd.read_csv('../data/train.csv')
test = pd.read_csv('../data/test.csv')

# 날짜 컬럼을 datetime 형식으로 변환
train['base_date'] = pd.to_datetime(train['base_date'].astype(str))
test['base_date'] = pd.to_datetime(test['base_date'].astype(str))

# 날짜 기반 파생 변수 생성
for df in [train, test]:
    df['year'] = df['base_date'].dt.year
    df['month'] = df['base_date'].dt.month
    df['day'] = df['base_date'].dt.day
    df['weekday'] = df['base_date'].dt.weekday  # 0=월요일, 6=일요일
    df['is_weekend'] = df['weekday'].apply(lambda x: 1 if x >= 5 else 0)

    # season (봄: 3~5, 여름: 6~8, 가을: 9~11, 겨울: 12~2)
    df['season'] = df['month'].map({
        3: 'spring', 4: 'spring', 5: 'spring',
        6: 'summer', 7: 'summer', 8: 'summer',
        9: 'fall', 10: 'fall', 11: 'fall',
        12: 'winter', 1: 'winter', 2: 'winter'
    })

    # 시간대 구간화 (출퇴근/심야/일반)
    df['time_type'] = df['base_hour'].apply(lambda x:
        'commute' if x in [7,8,17,18] else (
        'late_night' if x in [0,1,2,3,4,5] else 'normal')
    )


In [None]:
# 도로 및 위치 기반 파생 변수 처리
print(train.groupby('maximum_speed_limit')['target'].mean())

# 차로 수가 많을수록 빠를까..? --> 시각화
print(train.groupby('lane_count')['target'].mean())

# 도로 등급 (103,106,107) 정제
print(train['road_rating'].value_counts())

In [None]:
# 불필요한 변수 제거
# vehicle_restricted, height_restricted: 모든 값이 0 --> 제거
drop_cols = ['vehicle_restricted', 'height_restricted']

# multi_linked: 거의 전부 0 --> 제거
drop_cols += ['multi_linked']

# connect_code: 불균형, 파생 정보 약함 --> 제거
drop_cols += ['connect_code']

train = train.drop(columns=drop_cols)
test = test.drop(columns=drop_cols)

In [None]:
# 거리 계산: 시작점과 도착점 간 haversine 거리
import numpy as np

def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius (km)
    lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    return R * 2 * np.arcsin(np.sqrt(a))

train['distance'] = haversine(train['start_latitude'], train['start_longitude'],
                              train['end_latitude'], train['end_longitude'])

test['distance'] = haversine(test['start_latitude'], test['start_longitude'],
                             test['end_latitude'], test['end_longitude'])


In [None]:
# start_turn_restricted / end_turn_restricted: 0,1 변환
turn_map = {'없음': 0, '있음': 1}
train['start_turn_restricted'] = train['start_turn_restricted'].map(turn_map)
test['start_turn_restricted'] = test['start_turn_restricted'].map(turn_map)
train['end_turn_restricted'] = train['end_turn_restricted'].map(turn_map)
test['end_turn_restricted'] = test['end_turn_restricted'].map(turn_map)

In [None]:
# road_name 컬럼 정제
# 1. 결측치 확인 및 대체
# road_name이 '-'인 경우 확인
print("train 결측 road_name 수:", (train['road_name'] == '-').sum())
print("test 결측 road_name 수:", (test['road_name'] == '-').sum())

In [None]:
# 남은 '-' 값을 '기타'로 치환
train['road_name'] = train['road_name'].replace('-', '기타')
test['road_name'] = test['road_name'].replace('-', '기타')


In [None]:
# 2. road_name 분포 확인
# 상위 도로명 확인
print(train['road_name'].value_counts().head())

In [None]:
# 3. Label Encoding
from sklearn.preprocessing import LabelEncoder

le_road = LabelEncoder()
le_road.fit(train['road_name'])

train['road_name'] = le_road.transform(train['road_name'])

for label in np.unique(test['road_name']):
    if label not in le_road.classes_:
        le_road.classes_ = np.append(le_road.classes_, label)

test['road_name'] = le_road.transform(test['road_name'])

In [None]:
# start_node_name, end_node_name 정제
# 1. 유니크 값 개수 확인
print("start_node_name 종류 수:", train['start_node_name'].nunique())
print("end_node_name 종류 수:", train['end_node_name'].nunique())

In [None]:
# 2. Top-N만 보존, 나머지는 '기타'로 묶기
# 상위 20개 노드만 유지 나머지는 '기타'로
top_n = 20

top_start = train['start_node_name'].value_counts().nlargest(top_n).index
top_end = train['end_node_name'].value_counts().nlargest(top_n).index

train['start_node_name'] = train['start_node_name'].apply(lambda x: x if x in top_start else '기타')
test['start_node_name'] = test['start_node_name'].apply(lambda x: x if x in top_start else '기타')

train['end_node_name'] = train['end_node_name'].apply(lambda x: x if x in top_end else '기타')
test['end_node_name'] = test['end_node_name'].apply(lambda x: x if x in top_end else '기타')


In [None]:
# 3. Label Encoding 적용
le_start = LabelEncoder()
le_end = LabelEncoder()

train['start_node_name'] = le_start.fit_transform(train['start_node_name'])
test['start_node_name'] = le_start.transform(test['start_node_name'])

train['end_node_name'] = le_end.fit_transform(train['end_node_name'])
test['end_node_name'] = le_end.transform(test['end_node_name'])


In [None]:
# adjacent_august (7~9월이면 Y, 아니면 N)
train['adjacent_august'] = train['month'].apply(lambda x: 'Y' if x in [7,8,9] else 'N')
test['adjacent_august'] = test['month'].apply(lambda x: 'Y' if x in [7,8,9] else 'N')

# 시간대 worktime(08~20), resttime(나머지)
train['시간대'] = train['base_hour'].apply(lambda x: 'worktime' if 8 <= x <= 20 else 'resttime')
test['시간대'] = test['base_hour'].apply(lambda x: 'worktime' if 8 <= x <= 20 else 'resttime')

# 계절 봄(3~5), 여름(6~8), 가을(9~11), 겨울(12~2)
def get_season(month):
    if month in [3,4,5]:
        return '봄'
    elif month in [6,7,8]:
        return '여름'
    elif month in [9,10,11]:
        return '가을'
    else:
        return '겨울'

train['season'] = train['month'].apply(get_season)
test['season'] = test['month'].apply(get_season)

In [None]:
from sklearn.preprocessing import LabelEncoder

# 이미 처리한 두 컬럼 제외
categorical_features = [
    'road_name', 'road_rating', 'maximum_speed_limit', 'weight_restricted',
    'road_type', 'start_turn_restricted', 'end_turn_restricted',
    'adjacent_august', '시간대', 'season'
]

for col in categorical_features:
    le = LabelEncoder()
    le.fit(train[col])

    # 테스트셋에 없는 클래스가 있는 경우 대비
    le_classes = list(le.classes_)
    for label in test[col].unique():
        if label not in le.classes_:
            le_classes.append(label)
    le.classes_ = np.array(le_classes)

    train[col] = le.transform(train[col])
    test[col] = le.transform(test[col])


In [None]:
# 학습/검증 데이터 분리
from sklearn.model_selection import train_test_split

# target 분리
y = train['target']
X = train.drop(columns=['target', 'id'])  # id는 필요 없는 식별자 컬럼
test_X = test.drop(columns=['id'])

# train/val 분리
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print("훈련셋 크기:", X_train.shape)
print("검증셋 크기:", X_val.shape)

In [None]:
import os
import joblib

os.makedirs('../output', exist_ok=True)

# 파일 저장
joblib.dump(X_train, '../output/X_train.pkl')
joblib.dump(y_train, '../output/y_train.pkl')
joblib.dump(X_val, '../output/X_val.pkl')
joblib.dump(y_val, '../output/y_val.pkl')
joblib.dump(test_X, '../output/test_X.pkl')

In [None]:
import joblib

# 도로명 인코더 저장
joblib.dump(le_road, '../output/le_road.pkl')

# 출발/도착 노드명 인코더 저장
joblib.dump(le_start, '../output/le_start.pkl')
joblib.dump(le_end, '../output/le_end.pkl')

# 시간대 인코더
le_time = LabelEncoder()
le_time.fit(train['시간대'])
joblib.dump(le_time, '../output/le_시간대.pkl')

# 계절 인코더
le_season = LabelEncoder()
le_season.fit(train['season'])
joblib.dump(le_season, '../output/le_season.pkl')

# adjacent_august 인코더
le_august = LabelEncoder()
le_august.fit(train['adjacent_august'])
joblib.dump(le_august, '../output/le_adjacent_august.pkl')

In [None]:
import joblib

le_road = joblib.load('../output/le_road.pkl')
le_time = joblib.load('../output/le_시간대.pkl')
le_season = joblib.load('../output/le_season.pkl')
le_august = joblib.load('../output/le_adjacent_august.pkl')

# 딕셔너리 형태로 묶어서 저장
label_encoders = {
    "le_road": le_road,
    "le_time": le_time,
    "le_season": le_season,
    "le_august": le_august
}

joblib.dump(label_encoders, '../output/label_encoders.pkl')
print("label_encoders.pkl 저장")