#### 1. 변수 중요도 - SHAP

In [None]:
###----- 모델 로드 -----###
model_path = model_path = ###### 모델 경로 ######
xgb_model = joblib.load(model_path)

# 모델 만들 때 사용한 x
x_train_up = pd.read_csv(###### 모델 학습 시 사용한 train set 경로 ######)

# 살펴볼 데이터
x_test = pd.read_csv(###### 중요도를 확인할 데이터 경로 #######)
x_test.drop(['Unnamed: 0'], axis = 1, inplace = True)

###----- SHAP -----###
shap_explainer = shap.Explainer(xgb_model, x_train_up)
shap_values_ex = shap_explainer(x_test) # 각 SHAP value에 대한 정보를 담은 Explainer

# 보고서 예시 1 - 전체 feature importance
shap.plots.bar(shap_values_ex)

# 보고서 예시 2 - 평균 feature importance
shap.plots.bar(shap_values_ex.mean(axis = 0))

# 보고서 예시 3 - 특정 feature 하나 (scatter)
shap.plots.scatter(shap_values_ex[:, "승용차"]) # '승용차' 변수

#### 2. 예측 리포트

In [None]:
###----- 함수 정의 -----###
### 지오코딩 - 위도, 경도, 자치구, 행정동 찾아오기
def geo_coding(df):
    ### 위도, 경도
    gk.add_coordinates_to_dataframe(df, '주소')
    df = df.rename(columns = {"decimalLatitude" : "위도", "decimalLongitude" : "경도"})

    ### 자치구, 행정동
    # 동 : ~동, ~가 부분 찾아오기
    def get_dong(address):
        find = re.search(r"\b(\w+[동가])\b", address)
        if find:
            dong = find.group(1)
            return(dong)
        else:
            return(np.nan)

    # 구 : ~구 부분 찾아오기
    def get_gu(address):
        find = re.search(r"\b(\w+구)\b", address)
        if find:
            gu = find.group(1)
            return(gu)
        else:
            return(np.nan)

    # DataFrame 추가
    def add_dong_gu_to_dataframe(df):
        dongs = []
        gus = []

        for i in range(len(df)):
            address = df.loc[i, '주소']

            if address:
                try:
                    dong = get_dong(address)
                except:
                    dong = np.nan
                try:
                    gu = get_gu(address)
                except:
                    gu = np.nan
            else:
                dong = np.nan
                gu = np.nan

            dongs.append(dong)
            gus.append(gu)

        df["행정동"] = dongs
        df["자치구"] = gus
        return df

    df = add_dong_gu_to_dataframe(df)
    return df

### ----------------------------------------------------------------------------------

### 교통량 데이터 추가하기
def traffic(df):
    # 도로 데이터
    gdf_links = gpd.read_file(###### 도로 데이터 경로 ######)
    gdf_links['LINK_ID'] = gdf_links['LINK_ID'].astype(int)

    # 새로운 데이터 공간 데이터로 변환
    gdf_potholes = gpd.GeoDataFrame(
        df,
        geometry = gpd.points_from_xy(df['경도'], df['위도']),
        crs="EPSG:4326")  # 위/경도 WGS84 좌표계로 설정

    # 새로운 데이터 좌표계 통일
    if gdf_links.crs is not None:
        gdf_potholes = gdf_potholes.to_crs(gdf_links.crs)

    # 새로운 데이터의 위치와 가까운 도로 번호(+도로정보) 매칭
    gdf_nearest = gpd.sjoin_nearest(
        gdf_potholes, gdf_links,
        how='left',
        distance_col='distance')  # 계산한 거리(m)

    ### 교통량 데이터
    traffic_d = pd.read_excel(###### 교통량 데이터 경로 ######)

    # 전처리
    traffic_d.columns = [
        f"{upper}" if 'Unnamed' in str(lower) else f"{upper}_{lower}"
        for upper, lower in traffic_d.columns]
    traffic_d.rename(columns={
        'ITS LINK ID': 'ITS_LINK_ID',
        '승용차-평일_전일': '승용차',
        '버스-평일_전일': '버스',
        '트럭-평일_전일': '트럭'}, inplace=True)

    traffic_d['ITS_LINK_ID'] = traffic_d['ITS_LINK_ID'].astype(str).str.split(',')
    traffic_d = traffic_d.explode('ITS_LINK_ID')
    traffic_d['LINK_ID'] = traffic_d['ITS_LINK_ID'].str.strip().astype(int)
    traffic_df = traffic_d[['LINK_ID', '도로명', '차선수', '승용차', '버스', '트럭']].copy()
    traffic_df = (traffic_df.dropna(subset=['LINK_ID']).drop_duplicates('LINK_ID').reset_index(drop=True).astype({'LINK_ID': 'int'}))

    # 교통량 join
    pothole_traffic = gdf_nearest.merge(traffic_df, on='LINK_ID', how='left')
    pothole_output = pothole_traffic[['날짜', '주소', '위도', '경도', '행정동', '자치구', 'LINK_ID', '도로명', '차선수', '승용차', '버스', '트럭']]
    pothole_output['총교통량'] = pothole_output['승용차'] + pothole_output['버스'] + pothole_output['트럭']
    pothole_output['중대형차량 교통량'] = (pothole_output['버스'] + pothole_output['트럭']) / pothole_output['총교통량']


    return pothole_output

### ----------------------------------------------------------------------------------

### 건물 평균 연령, 배수등급, 경사도 추가
def nature(new_pothole_traffic):

    ### 건물별 평균 연령
    old = pd.read_csv(###### 건물별 연령 데이터 경로 ######)
    bup = pd.read_csv(###### 법정동 코드 데이터 경로 ######)
    bup['법정동명'] = bup['법정동명'].astype(str)

    # 법정동 만들기
    new_pothole_traffic['법정동명'] = '서울특별시 ' + new_pothole_traffic['자치구'] + ' ' + new_pothole_traffic['행정동']
    new_pothole_traffic['법정동명'] = new_pothole_traffic['법정동명'].astype(str)

    # join
    new_pothole_building = new_pothole_traffic.merge(bup[['법정동코드', '법정동명']], on = '법정동명', how = 'left')
    new_pothole_building = new_pothole_building.merge(old[['법정동코드', '평균_건물연령']], on = '법정동코드', how = 'left')

    ### 배수등급
    # 공간 데이터로 변환
    gdf_pothole2 = gpd.GeoDataFrame(
        new_pothole_building,
        geometry=gpd.points_from_xy(new_pothole_building['경도'], new_pothole_building['위도']),
        crs="EPSG:4326")

    # 포트홀 좌표계 → EPSG:5174 로 변환 (shp에 맞추기)
    gdf_pothole2 = gdf_pothole2.to_crs("EPSG:5174")

    # 배수등급 로드
    soil_df = gpd.read_file(###### 배수등급 데이터 경로 ######)

    # join
    new_pothole_soil = gpd.sjoin(gdf_pothole2, soil_df[['SOILDRA', 'geometry']], how='left', predicate='within')

    ### 토양 경사도
    # 데이터 로드
    slope_df = gpd.read_file(###### 토양 경사도 데이터 경로 ######)
    new_pothole_soil.drop(['index_right'], axis=1, inplace=True)

    # join
    new_pothole_slope = gpd.sjoin(new_pothole_soil, slope_df[['SOILSLOPE', 'geometry']], how='left', predicate='within')

    # 열 이름 수정
    new_pothole_slope.rename(columns = {'SOILDRA' : '배수등급', 'SOILSLOPE' : '경사도'}, inplace = True)
    new_pothole_done = new_pothole_slope[['날짜', '주소', '위도', '경도', '자치구', '행정동', '도로명', '차선수', '승용차', '버스', '트럭', '총교통량', '중대형차량 교통량', '평균_건물연령', '배수등급' ,'경사도']]

    return new_pothole_done

### ----------------------------------------------------------------------------------

def join_gu(new_pothole_done):
    ### 자치구별 데이터 로드
    people = pd.read_pickle(###### 자치구별 인구 수 데이터 경로 ######)
    people['자치구'] = people['자치구'].str.replace("\u3000","",regex = False)

    new_pothole_done.rename(columns = {'날짜' : '발생일'}, inplace = True)
    new_pothole_done['발생일'] = pd.to_datetime(new_pothole_done['발생일'])

    ### 인구 수
    new_pothole = pd.merge(new_pothole_done, people, on = '자치구', how = 'left')

    return new_pothole

### ----------------------------------------------------------------------------------

def prediction(new_pothole, transformer_path, scaler_path, model_path):
    ## x 할당
    new_x = new_pothole[['차선수', '승용차', '버스', '트럭', '총교통량', '중대형차량 교통량', '평균_건물연령', '인구 수', '배수등급', '경사도']]

    # 저장된 변환기, 스케일러 불러오기
    transformer = joblib.load(transformer_path)
    scaler = joblib.load(scaler_path)
    # 새로운 데이터 변환
    cols = ['승용차', '버스', '트럭', '총교통량', '중대형차량 교통량', '평균_건물연령', '인구 수']
    arr = new_x[cols].values + 1e-6
    bc = transformer.transform(arr)
    bc_std = scaler.transform(bc)
    new_x[cols] = bc_std

    ### 배수등급, 경사도 변수 처리
    # 경사도
    slope_encoding = {
        '0-2%': 0,
        '2-7%': 1,
        '7-15%': 2,
        '15-30%': 3,
        '30-60%': 4,
        '60-100%': 5}
    # 배수등급
    drain_encoding = {
        '매우양호': 5,
        '양호': 4,
        '약간양호': 3,
        '약간불량': 2,
        '불량': 1,
        '매우불량': 0}
    # 매핑
    new_x['경사도'] = new_x['경사도'].map(slope_encoding)
    new_x['배수등급'] = new_x['배수등급'].map(drain_encoding)

    ### 모델 로드
    xgb_model = joblib.load(model_path)
    y_pred = xgb_model.predict(new_x)
    y_pred_prob = xgb_model.predict_proba(new_x)[:, 1]

    return new_pothole, new_x, y_pred, y_pred_prob ## 원본 데이터, x를 살펴보기 위해 따로 받아옴 !

In [None]:
def prediction_without_env(new_data, transformer_path, scaler_path, model_path):
    ### 마지막에 출력할 output
    output_df = new_data.copy()
    ### x 만들기
    geo = geo_coding(new_data)
    traffic_df = traffic(geo)
    nature_df = nature(traffic_df)
    new_potholes = join_gu(nature_df)
    ### 예측
    new_data_org, new_x, y_pred, y_pred_prob = prediction(new_potholes, transformer_path, scaler_path, model_path)
    ### output
    output_df['예측'] = y_pred
    output_df['예측 확률'] = y_pred_prob
    return new_data_org, new_x, output_df

In [None]:
### 새롭게 확인해볼 데이터 목록 (임의로 생성)
input = ["서울특별시 강남구 대치동 507",
         "서울특별시 동대문구 제기동 137-418",
         "서울특별시 서초구 방배동 756-4",
         "서울특별시 용산구 동빙고동 90-1",
         "서울특별시 성북구 보문동5가 235",
         ]
date = ['2024-07-28', '2023-06-23', '2021-10-29', '2024-01-05', '2022-04-29']
new_data = pd.DataFrame({'날짜' : date, '주소' : input})

### Model, BoxCox Transformer, Scaler Path
model_path = ###### 모델 경로 ######
transformer_path = ###### BoxCox Transformer 경로 ######
scaler_path = ###### Standard Scaler 경로 ######

### Prediction
new_data_org, new_x, output_df = prediction_without_env(new_data, transformer_path, scaler_path, model_path)

# 리포트 예시 - 최종 데이터 프레임
output_df

#### 3. 특정 샘플 변수 중요도 - SHAP

In [None]:
###----- 모델 로드 -----###
model_path = model_path = ###### 모델 경로 ######
xgb_model = joblib.load(model_path)

# 모델 만들 때 사용한 x
x_train_up = pd.read_csv(###### 모델 학습 시 사용한 train set 경로 ######)

# 살펴볼 데이터 로드
new_places_x = pd.read_csv(###### 새롭게 입력한 데이터의 x 변환 결과 경로 #######)
new_places_x.drop(['Unnamed: 0'], axis = 1, inplace = True)

###----- SHAP -----###
shap_explainer = shap.Explainer(xgb_model, x_train_up)
shap_values_ex = shap_explainer(new_places_x) # 각 SHAP value에 대한 정보를 담은 Explainer

# 보고서 예시 - 특정 샘플(0번째 데이터) 설명
shap.plots.waterfall(shap_values_ex[0])

#### 4. 개선 방향 제시 - DiME

In [None]:
###----- 역변환 함수 -----###
def inverse_scaling_boxcox(transformed_data, transformer_path, scaler_path):

    # 역변환 대상 변수
    cols = ['승용차', '버스', '트럭', '총교통량', '중대형차량 교통량', '평균_건물연령', '인구 수']
    cols_idx = [transformed_data.columns.get_loc(col) for col in cols]

    # scaler, transformer
    scaler = joblib.load(scaler_path)
    transformer = joblib.load(transformer_path)

    # 변환된 5개 변수만 추출
    transformed = transformed_data.iloc[:, cols_idx]

    # 역표준화
    un_scaled = scaler.inverse_transform(transformed)

    # 역변환
    un_transformed = np.array([inv_boxcox(un_scaled[:, i], transformer.lambdas_[i]) for i in range(un_scaled.shape[1])]).T

    # 전체 복원
    original = transformed_data.copy()
    for j, col in enumerate(cols):
        original[col] = un_transformed[:,j]

    ### 배수등급, 경사도 변수 처리
    # 경사도
    slope_encoding = {'0-2%': 0, '2-7%': 1, '7-15%': 2, '15-30%': 3, '30-60%': 4, '60-100%': 5}
    # 배수등급
    drain_encoding = {'매우양호': 5, '양호': 4, '약간양호': 3, '약간불량': 2, '불량': 1, '매우불량': 0}
    # 인코딩 딕셔너리 뒤집기
    slope_decoding = {v: k for k, v in slope_encoding.items()}
    drain_decoding = {v: k for k, v in drain_encoding.items()}

    # 역매핑
    original['경사도'] = original['경사도'].map(slope_decoding)
    original['배수등급'] = original['배수등급'].map(drain_decoding)

    return original

In [None]:
###----- DiCE 활용, Counterfactual 생성 -----###
def dice(x_test, model_path, transformer_path, scaler_path): # x_test : 예측이 1로 나오는 예시 샘플 하나
    xgb_model = joblib.load(model_path)

    # 혹시 모르니까 한 번 더 변환해주고
    x_test = x_test.astype('float64')

    ### DiCE
    continuous = ['승용차', '버스', '트럭', '총교통량', '중대형차량 교통량', '평균_건물연령', '인구 수']
    d = dice_ml.Data(dataframe = x_test.assign(발생여부 = 0),  # dummy target
                     continuous_features = continuous,
                     outcome_name = "발생여부")
    m = dice_ml.Model(model = xgb_model, backend = "sklearn")
    exp = dice_ml.Dice(d, m)

    ### counterfactual 생성
    # 값을 바꿀 변수들만 지정
    features_to_change = ['승용차', '버스', '트럭', '총교통량', '중대형차량 교통량', '평균_건물연령']
    query_instance = x_test
    query_instance = query_instance.astype('float64')
    cf = exp.generate_counterfactuals(query_instance, total_CFs=1, desired_class="opposite",features_to_vary=features_to_change)

    ### 데이터프레임 저장
    # 기존 데이터
    original_one = cf.cf_examples_list[0].test_instance_df
    # 바뀐 데이터
    changed_zero = cf.cf_examples_list[0].final_cfs_df

    original_one = inverse_scaling_boxcox(original_one, transformer_path, scaler_path)
    changed_zero = inverse_scaling_boxcox(changed_zero, transformer_path, scaler_path)

    return original_one, changed_zero

In [None]:
### Model, BoxCox Transformer, Scaler Path
model_path = ###### 모델 경로 ######
transformer_path = ###### BoxCox Transformer 경로 ######
scaler_path = ###### Standard Scaler 경로 ######

### DiCE로 Counterfactual 생성
new_x = pd.read_csv(###### 새롭게 확인하고자 하는 데이터 ######)
new_x.drop(['Unnamed: 0'], axis = 1, inplace = True)
new_x_first = new_x.loc[[3]] # 세 번째 데이터 확인
new_x_first = new_x_first.astype('float64')

original_one, changed_zero = dice(new_x_first, model_path, transformer_path, scaler_path)
# 리포트 예시 - original_one과 changed_zero 데이터프레임 출력
display(original_one)
display(changed_zero)