# 대구 교통사고 피해 예측 AI 경진대회(https://dacon.io/competitions/official/236193/overview/description)
## 목표: 다양한 데이터들을 통해 test의 ECLO 예측
ECLO(Equivalent Casualty Loss Only) : 인명피해 심각도  
ECLO = 사망자수 * 10 + 중상자수 * 5 + 경상자수 * 3 + 부상자수 * 1  
본 대회에서는 사고의 위험도를 인명피해 심각도로 측정

In [1]:
%pip install catboost

Collecting catboost
  Downloading catboost-1.2.5-cp310-cp310-manylinux2014_x86_64.whl (98.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.2/98.2 MB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: catboost
Successfully installed catboost-1.2.5


### 라이브러리

In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, KFold, cross_val_score, GridSearchCV
from scipy.stats import chi2_contingency
from scipy.stats import pearsonr
import random
import os
from sklearn.metrics import accuracy_score
from sklearn.metrics import mean_squared_error
from catboost import CatBoostRegressor
from sklearn.model_selection import StratifiedKFold

### 시드 고정

In [3]:
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
seed_everything(42)

### 레이블 인코더 변수 생성

In [4]:
label_encoder = LabelEncoder()

### 전국구 데이터 가져오기

In [5]:
countrywide = pd.read_csv('./countrywide_accident.csv')

# '사고일시' 컬럼을 날짜 형식으로 변환
countrywide['사고일시'] = pd.to_datetime(countrywide['사고일시'])

# 사고유형이 '차대차', '차대사람', '차량단독'인 데이터만 필터링
countrywide = countrywide[countrywide['사고유형'].isin(['차대차', '차대사람', '차량단독'])]

countrywide

FileNotFoundError: [Errno 2] No such file or directory: './countrywide_accident.csv'

### train 데이터 가져오기

In [None]:
train = pd.read_csv('./train.csv')

# '사고일시' 컬럼을 날짜 형식으로 변환
train['사고일시'] = pd.to_datetime(train['사고일시'])

train

### 전국구 데이터와 train 데이터 합치기 (total)

In [None]:
train_total = pd.concat([train, countrywide], ignore_index=True)

# train_total['ECLO'] 열의 null 값 확인
null_count = train_total['ECLO'].isna().sum()

print(f"Number of NaN values in 'ECLO': {null_count}")

train_total

### total 데이터를 사분위수로 이상치 확인하기

In [None]:
# 'ECLO' 열의 상자 그림(boxplot)을 그립니다.
train_total['ECLO'].plot(kind='box')
plt.title('ECLO Boxplot')
plt.ylabel('ECLO Values')
plt.show()

### 이상치 제거

In [None]:
# 'ECLO' 열의 이상치를 정의하기 위해 사분위수를 계산합니다.
Q1 = train_total['ECLO'].quantile(0.25)
Q3 = train_total['ECLO'].quantile(0.75)
IQR = Q3 - Q1

# 사분위수를 이용하여 이상치의 상한선과 하한선을 정의합니다.
upper_bound = Q3 + 1.5 * IQR
lower_bound = Q1 - 1.5 * IQR

# 이상치의 개수를 계산합니다.
outliers_index = train_total[(train_total['ECLO'] < lower_bound) | (train_total['ECLO'] > upper_bound)].index
outlier_count = outliers_index.shape[0]

print("이상치 개수:", outlier_count)

# 이상치가 포함된 행을 삭제합니다.
train_total.drop(outliers_index, inplace=True)

### 다시 이상치 확인

In [None]:
# 'ECLO' 열의 상자 그림(boxplot)을 그립니다.
train_total['ECLO'].plot(kind='box')
plt.title('ECLO Boxplot')
plt.ylabel('ECLO Values')
plt.show()

### 이상치 제거

In [None]:
# 'ECLO' 열의 이상치를 정의하기 위해 사분위수를 계산합니다.
Q1 = train_total['ECLO'].quantile(0.25)
Q3 = train_total['ECLO'].quantile(0.75)
IQR = Q3 - Q1

# 사분위수를 이용하여 이상치의 상한선과 하한선을 정의합니다.
upper_bound = Q3 + 1.5 * IQR
lower_bound = Q1 - 1.5 * IQR

# 이상치의 개수를 계산합니다.
outliers_index = train_total[(train_total['ECLO'] < lower_bound) | (train_total['ECLO'] > upper_bound)].index
outlier_count = outliers_index.shape[0]

print("이상치 개수:", outlier_count)

# 이상치가 포함된 행을 삭제합니다.
train_total.drop(outliers_index, inplace=True)

### 이상치 데이터 제거 완료

In [None]:
# 'ECLO' 열의 상자 그림(boxplot)을 그립니다.
train_total['ECLO'].plot(kind='box')
plt.title('ECLO Boxplot')
plt.ylabel('ECLO Values')
plt.show()

### 사고유형별 평균 ECLO값이 높은 데이터를 레이블 인코딩 (내림차순)

In [None]:
# train_total 데이터프레임에서 '사고유형'별 'ECLO'의 평균 계산
mean_eclo_by_type = train_total.groupby('사고유형')['ECLO'].mean().sort_values(ascending=False)

# 레이블 인코딩을 위한 매핑 딕셔너리 생성
label_mapping = {accident_type: label for label, accident_type in enumerate(mean_eclo_by_type.index)}

# '사고유형' 칼럼을 레이블 인코딩하여 새로운 칼럼 추가
train_total['사고유형_label'] = train_total['사고유형'].map(label_mapping)

# 레이블 인코딩 결과 출력
print("레이블 인코딩 결과:")
print(train_total[['사고유형', '사고유형_label']].drop_duplicates().sort_values(by='사고유형_label'))


### total 데이터의 도로형태 컬럼 변환

In [None]:
# '-'를 기준으로 문자열을 분리하여 새로운 컬럼을 만듭니다.
train_total[['도로형태1', '도로형태2']] = train_total['도로형태'].str.split(' - ', expand=True)

### 도로형태별 평균 ECLO값이 높은 데이터를 레이블 인코딩 (내림차순)

In [None]:
# 도로형태1에 대한 레이블 인코딩 수행
train_total['도로형태1_label'] = label_encoder.fit_transform(train_total['도로형태1'])

# 도로형태1_label 값별로 그룹을 형성하여 도로형태2를 레이블 인코딩
for label_value in train_total['도로형태1_label'].unique():
    # 해당 도로형태1_label 값과 일치하는 행 추출
    mask = train_total['도로형태1_label'] == label_value
    # 해당 그룹 내의 ECLO 컬럼의 평균값 계산
    mean_eclo = train_total.loc[mask, 'ECLO'].mean()
    # 해당 그룹 내의 도로형태2를 평균값이 높은 순서대로 숫자 지정하여 레이블 인코딩
    label_dict = {address: idx + 1 for idx, address in enumerate(train_total.loc[mask].sort_values(by='ECLO', ascending=False)['도로형태2'].unique())}
    train_total.loc[mask, '도로형태2_label'] = train_total.loc[mask, '도로형태2'].map(label_dict)

# 도로형태2_label을 정수형으로 변환
train_total['도로형태2_label'] = train_total['도로형태2_label'].astype(int)

### test데이터 가져오기

In [None]:
test = pd.read_csv('./test.csv')
test

### 사고유형별 평균 ECLO값이 높은 데이터를 레이블 인코딩 (내림차순)

In [None]:
# train_total 데이터로부터 사고유형에 대한 레이블 인코딩 매핑 생성
accident_type_mapping = {}
for idx, acc_type in enumerate(train_total['사고유형'].unique()):
    accident_type_mapping[acc_type] = idx

# train_total 데이터에 사고유형_label 컬럼 추가
train_total['사고유형_label'] = train_total['사고유형'].map(accident_type_mapping)

# test 데이터에 사고유형_label 컬럼 추가
test['사고유형_label'] = test['사고유형'].map(lambda x: accident_type_mapping.get(x, -1))

### test의 도로형태 데이터 변환

In [None]:
# '-'를 기준으로 문자열을 분리하여 새로운 컬럼을 만듭니다.
test[['도로형태1', '도로형태2']] = test['도로형태'].str.split(' - ', expand=True)

### 도로형태별 평균 ECLO값이 높은 데이터를 레이블 인코딩 (내림차순)

In [None]:
# test 데이터프레임에 '도로형태1_label'과 '도로형태2_label' 컬럼을 추가합니다.
test['도로형태1_label'] = None
test['도로형태2_label'] = None

# train 데이터프레임에서 '도로형태1'과 '도로형태1_label' 사이의 매핑을 딕셔너리 형태로 생성합니다.
address1_mapping = dict(zip(train_total['도로형태1'], train_total['도로형태1_label']))

# train 데이터프레임에서 '도로형태2'와 '도로형태2_label' 사이의 매핑을 딕셔너리 형태로 생성합니다.
address2_mapping = dict(zip(train_total['도로형태2'], train_total['도로형태2_label']))

# test 데이터프레임의 각 행에 대해 반복하여 매핑된 값을 할당합니다.
for i, row in test.iterrows():
    # 도로형태1 train 데이터프레임에 있는 경우 해당하는 도로형태1_label 값을 할당합니다.
    if row['도로형태1'] in address1_mapping:
        test.at[i, '도로형태1_label'] = address1_mapping[row['도로형태1']]
    # 도로형태2가 train 데이터프레임에 있는 경우 해당하는 도로형태2_label 값을 할당합니다.
    if row['도로형태2'] in address2_mapping:
        test.at[i, '도로형태2_label'] = address2_mapping[row['도로형태2']]

test

### Catboost의 회귀 모델을 만들고 교차검증을 통해 학습하기(약 10분 소요)

In [None]:
# 모델1에 대한 데이터셋
X1 = train_total[['도로형태1_label', '도로형태2_label']]
y1 = train_total['ECLO']

# 모델2에 대한 데이터셋
X2 = train_total[['사고유형_label']]
y2 = train_total['ECLO']

# 교차 검증을 위한 KFold 객체 생성 (6-겹 교차검증, 랜덤 시드 고정)
kf = KFold(n_splits=6, shuffle=True, random_state=42)

# 모델1에 대한 교차 검증을 수행하고 평균 성능을 출력
cv_scores1 = []
for train_index, val_index in kf.split(X1):
    X_fold_train, X_fold_val = X1.iloc[train_index], X1.iloc[val_index]
    y_fold_train, y_fold_val = y1.iloc[train_index], y1.iloc[val_index]

    model1 = CatBoostRegressor(random_state=42, verbose=0)
    model1.fit(X_fold_train, y_fold_train)
    y_pred = model1.predict(X_fold_val)
    cv_scores1.append(mean_squared_error(y_fold_val, y_pred))

print("모델1 교차 검증 결과:")
print(cv_scores1)
print("모델1 평균 교차 검증 MSE:", np.mean(cv_scores1))

# 모델2에 대한 교차 검증을 수행하고 평균 성능을 출력
cv_scores2 = []
for train_index, val_index in kf.split(X2):
    X_fold_train, X_fold_val = X2.iloc[train_index], X2.iloc[val_index]
    y_fold_train, y_fold_val = y2.iloc[train_index], y2.iloc[val_index]

    model2 = CatBoostRegressor(random_state=42, verbose=0)
    model2.fit(X_fold_train, y_fold_train)
    y_pred = model2.predict(X_fold_val)
    cv_scores2.append(mean_squared_error(y_fold_val, y_pred))

print("모델2 교차 검증 결과:")
print(cv_scores2)
print("모델2 평균 교차 검증 MSE:", np.mean(cv_scores2))

# 새로운 특성 생성
X3 = np.column_stack((model1.predict(X1), model2.predict(X2)))

# 새로운 모델 정의
model_final = CatBoostRegressor(random_state=42, verbose=0)

# 모델 final에 대한 교차 검증을 수행하고 평균 성능을 출력
cv_scores_final = []
for train_index, val_index in kf.split(X3):
    X_fold_train, X_fold_val = X3[train_index], X3[val_index]
    y_fold_train, y_fold_val = y1.iloc[train_index], y1.iloc[val_index]

    model_final.fit(X_fold_train, y_fold_train)
    y_pred = model_final.predict(X_fold_val)
    cv_scores_final.append(mean_squared_error(y_fold_val, y_pred))

print("모델 final 교차 검증 결과:")
print(cv_scores_final)
print("모델 final 평균 교차 검증 MSE:", np.mean(cv_scores_final))

### test의 도로형태, 사고유형별로 변수 생성

In [None]:
# 테스트 데이터에 대한 예측 생성
X1_test = test[['도로형태1_label', '도로형태2_label']]
X2_test = test[['사고유형_label']]

### 모델에 test의 도로형태 데이터를 넣어 ECLO값 예측하기(model1)

In [None]:
# 테스트 데이터에 대한 예측 수행
y_pred_test = model1.predict(X1_test)

### 모델에 test의 사고유형 데이터를 넣어 ECLO값 예측하기(model2)

In [None]:
# 테스트 데이터에 대한 예측 수행
y_pred_test = model2.predict(X2_test)

### model1과 model2를 통해 만들어진 모델로 다시 학습시키기(model_final)

In [None]:
# 모델1과 모델2의 예측값을 특성으로 사용하여 새로운 특성 생성
X3_test = np.column_stack((model1.predict(X1_test), model2.predict(X2_test)))

# 테스트 데이터에 대한 예측 수행
y_pred_test = model_final.predict(X3_test)

# 테스트 예측값 출력
print("테스트 예측값:", y_pred_test)


### sample_submission파일 가져와 예측값 저장

In [None]:
submission = pd.read_csv('./sample_submission.csv')
submission['ECLO'] = y_pred_test

submission

### 예측값 csv파일로 출력

In [None]:
submission.to_csv('./submission.csv', index=False)