## 1. 데이터 불러오기 및 SHAP 요소를 통한 가중치검증

In [20]:
import os
import math
import httpx 
import json
import shap
import random
import folium
import requests
import colorsys
import pandas as pd
import numpy as np
import xgboost as xgb
from dotenv import load_dotenv
from branca.element import Element, MacroElement
from jinja2 import Template

In [21]:
# 필요함수정의
def get_coords_from_keyword(keyword: str, api_key: str):
    '''
    📍키워드(주소, 지명 등)로 좌표(경도, 위도)를 반환하는 함수 (카카오 API 사용)
    '''
    url = "https://dapi.kakao.com/v2/local/search/keyword.json"
    headers = {"Authorization": f"KakaoAK {api_key}"}
    params = {"query": keyword}
    
    response = requests.get(url, headers=headers, params=params)
    data = response.json()
        
    if data.get("documents"):
        first_match = data["documents"][0]
        x = float(first_match["x"])  # 경도
        y = float(first_match["y"])  # 위도
        return {"x": x, "y": y}
    else:
        raise ValueError(f"🔍 '{keyword}'에 대한 결과가 없습니다.")

def extract_coord_list(df,lat_col="위도",lng_col="경도", accident_count=False):
    '''
    📍DataFrame에서 위도/경도(선택적: 사고 건수 포함) 리스트로 변환
    '''
    if accident_count:
        return list(zip(df[lat_col], df[lng_col], df["accident_count"])) 
    return list(zip(df[lat_col], df[lng_col]))  

def haversine(lat1, lon1, lat2, lon2):
    '''
    📍두 좌표 간의 거리(미터)를 계산하는 하버사인(Haversine) 함수
    '''
    R = 6371000
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    d_phi = math.radians(lat2 - lat1)
    d_lambda = math.radians(lon2 - lon1)

    a = math.sin(d_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(d_lambda / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c  # 거리 (m)

def interpolate_points(lat1, lon1, lat2, lon2, interval=5):
    '''
    📍두 좌표 사이 일정 간격(interval, m)의 중간 점들을 보간하여 생성
    '''
    distance = haversine(lat1, lon1, lat2, lon2)
    steps = max(1, int(distance // interval))
    lat_points = [lat1 + (lat2 - lat1) * i / steps for i in range(1, steps)]
    lon_points = [lon1 + (lon2 - lon1) * i / steps for i in range(1, steps)]
    return list(zip(lat_points, lon_points))

def is_within_haversine(center, coord_list, radius_m):
    '''
    📍중심 좌표에서 반경 내에 있는 좌표 리스트를 필터링 (하버사인 거리 기준)
    '''
    return [coord for coord in coord_list if haversine(center[0], center[1], coord[0], coord[1]) <= radius_m]

def is_circle_overlap(center1, radius1, center2, radius2=50):
    '''
    📍두 원의 중심과 반지름을 기준으로 겹치는지 여부를 판단 / 사고다발지의 반경 50m이내에 경로가 포함되는지 확인
    '''
    dist = haversine(center1[0], center1[1], center2[0], center2[1])
    return dist <= (radius1 + radius2)

def get_hotspot_score(count, ranges):
    '''
    📍사고 건수(count)를 기준으로 사전에 정의된 구간별 점수를 계산
    '''
    for (start, end, score) in ranges:
        if start <= count <= end:
            return score
    return 0.0  # // Changed

def is_relevant_type(t):
    '''
    📍최종점수 가점(우회전) 계산 
        ㄴ 2 : 우회전
        ㄴ 6 : 오른쪽 방향
        ㄴ 18 ~ 22 : 오른쪽 방향
        ㄴ 30 ~ 34 : 로터리 오른쪽
        ㄴ 70 ~ 74 : 회전교차로 오른쪽
    '''
    return (
        t == 2 or
        t == 6 or
        18 <= t <= 22 or
        30 <= t <= 34 or
        70 <= t <= 74 
    )

def risk_color(score):
    '''
    📍위험 점수에 따라 지도 마커 색상을 결정 (green/orange/red)
    '''
    if score <= 0.4:
        return "green"
    elif score <= 0.7:
        return "orange"
    else:
        return "red"
    
## 점수 산정 방식 1. 로그 합산 방식 (log(1 + 위험점수) 기반)    
def log_aggregated_score(risk_scores):
    '''
    위험한 구간일수록 기하급수적으로 영향력 증가
    안전한 구간은 영향이 낮아짐 (희석)
    '''
    log_sum = np.sum(np.log1p(risk_scores))  # log(1 + x)
    return log_sum / len(risk_scores)

## 점수 산정 방식 2. 상위 위험 구간 보정식 방식
def high_risk_boosted_score(risk_scores, top_percent=0.2, alpha=0.3):
    '''
    전체 점수는 평균으로 계산하되, 상위 위험 구간의 평균을 보정값으로 추가
    `a` 파라미터로 조정 가능 (보통 0.2 ~ 0.5)
    '''
    sorted_scores = sorted(risk_scores, reverse=True)
    n_top = max(1, int(len(sorted_scores) * top_percent))
    top_avg = np.mean(sorted_scores[:n_top])
    mean_score = np.mean(risk_scores)
    boosted_score = mean_score + alpha * (top_avg - mean_score)
    return boosted_score

## 점수 산정 방식 3. 상위 위험 구간 보정식 방식
def combined_weighted_boosted_score(risk_scores, all_radii, alpha=0.3, top_percent=0.2):
    """
    고위험 구간 민감도 반영한 최종 위험 점수 계산 함수

    Parameters:
        risk_scores (list or np.array): 구간별 위험 점수 리스트
        all_radii (list or np.array): 구간별 거리 리스트 (같은 순서)
        alpha (float): 상위 위험 구간 평균 보정 비율 (기본 0.3)
        top_percent (float): 상위 위험 구간 비율 (기본 0.2 → 상위 20%)

    Returns:
        float: 최종 점수 (거리 가중 평균 + 상위 위험 보정)
    """
    risk_scores = np.array(risk_scores)
    all_radii = np.array(all_radii)

    # 거리 가중 평균 계산
    weighted_mean = np.sum(risk_scores * all_radii) / np.sum(all_radii)

    # 상위 위험 구간 평균 계산
    sorted_indices = np.argsort(risk_scores)[::-1]
    n_top = max(1, int(len(risk_scores) * top_percent))
    top_avg = np.mean(risk_scores[sorted_indices[:n_top]])

    # 보정된 최종 점수 계산
    boosted_score = weighted_mean + alpha * (top_avg - weighted_mean)

    return round(boosted_score, 2)

    

In [43]:
load_dotenv()
KAKAO_API_KEY = os.getenv("KAKAO_API_KEY")

origin_keyword = "송파경찰서"
destination_keyword = "잠실역 2호선"

origin = get_coords_from_keyword(origin_keyword, KAKAO_API_KEY)
destination = get_coords_from_keyword(destination_keyword, KAKAO_API_KEY)

priority = "RECOMMEND"

url = "https://apis-navi.kakaomobility.com/v1/directions"
headers = {
    "Authorization": f"KakaoAK {KAKAO_API_KEY}",
    "Content-Type": "application/json"
}
data = {
    "origin": origin,
    "destination": destination,
    "priority": priority
}

async def fetch_route():
    async with httpx.AsyncClient() as client:
        params = {
            "origin": f"{origin['x']},{origin['y']}",
            "destination": f"{destination['x']},{destination['y']}",
            "priority": priority
        }

        response = await client.get(
            url, headers=headers, params=params
        )
        response.raise_for_status()
        return response.json()

route_data = await fetch_route()

roads = route_data["routes"][0]["sections"][0]["roads"]
guides = route_data["routes"][0]["sections"][0]["guides"]

# 파일 로드
protection_df = pd.read_csv("../data/external/protection_zone_data.csv")
crosswalk_df = pd.read_csv("../data/external/crosswalk_data.csv")
traffic_light_df = pd.read_csv("../data/external/traffic_light_data.csv")
senior_hotsopt_df = pd.read_csv("../data/raw/hotspot_info_senior.csv")
non_senior_hotspot_df = pd.read_csv("../data/raw/hotspot_info_non_senior.csv")
intersection_df = pd.read_csv("../data/external/intersection_data.csv")
intersection_df = intersection_df.dropna(subset=["위도", "경도"])

protection_coords = extract_coord_list(protection_df)  
crosswalk_coords = extract_coord_list(crosswalk_df)  
traffic_light_coords = extract_coord_list(traffic_light_df)  
intersection_coords = extract_coord_list(intersection_df)
senior_hotspot_coords = extract_coord_list(senior_hotsopt_df, lat_col="hotspot_center_lat", lng_col="hotspot_center_lng", accident_count=True)  
non_senior_hotspot_coords = extract_coord_list(non_senior_hotspot_df, lat_col="hotspot_center_lat", lng_col="hotspot_center_lng", accident_count = True)  


In [60]:
points = []

for road in roads:
    v = road["vertexes"]
    speed = road.get("traffic_speed", None)
    state = road.get("traffic_state", None)

    for i in range(0, len(v) - 2, 2):
        lat1, lon1 = v[i+1], v[i]
        lat2, lon2 = v[i+3], v[i+2]

        # 시작점
        points.append({
            "lat": lat1,
            "lon": lon1,
            "traffic_speed": speed,
            "traffic_state": state
        })

        # 중간점
        mids = interpolate_points(lat1, lon1, lat2, lon2, interval=25)
        for lat, lon in mids:
            points.append({
                "lat": lat,
                "lon": lon,
                "traffic_speed": speed,
                "traffic_state": state
            })

    # 마지막 점
    points.append({
        "lat": v[-1],
        "lon": v[-2],
        "traffic_speed": speed,
        "traffic_state": state
    })

mid = (points[len(points)//2]["lat"], points[len(points)//2]["lon"])
fmap = folium.Map(location=mid, zoom_start=16)

all_scores = []
all_radii = []
risk_levels = {"safe": 0, "caution": 0, "danger": 0}

# 1. 점-점 사이 원 그리고 색상 표시
for i in range(len(points) - 1):
    lat1, lon1 = points[i]["lat"], points[i]["lon"]
    lat2, lon2 = points[i + 1]["lat"], points[i + 1]["lon"]
    velocity = points[i]["traffic_speed"]
    standard_velocity = 22
    volume = points[i]["traffic_state"]
    
    center = ((lat1 + lat2) / 2, (lon1 + lon2) / 2)
    radius = haversine(lat1, lon1, lat2, lon2) / 2
    
    #  위험요소 탐지
    in_intersection = is_within_haversine(center, intersection_coords, radius)
    in_protection = is_within_haversine(center, protection_coords, radius)
    in_crosswalk = is_within_haversine(center, crosswalk_coords, radius)
    in_traffic = is_within_haversine(center, traffic_light_coords, radius)
    
    senior_score_ranges = [
        (5, 7, 0.4),
        (8, 10, 0.45),
        (11, float('inf'), 0.5)
    ]

    non_senior_score_ranges = [
        (10, 12, 0.2),
        (13, 15, 0.25),
        (16, float('inf'), 0.3)
    ]
    
    in_senior = [coord for coord in senior_hotspot_coords if is_circle_overlap(center, radius, coord)]
    if bool(in_senior):        
        senior_cnt = sum(coord[2] for coord in in_senior)
        senior_score = get_hotspot_score(senior_cnt, senior_score_ranges)
    else:
        senior_score = 0
        
    in_non_senior = [coord for coord in non_senior_hotspot_coords if is_circle_overlap(center, radius, coord)]
    if bool(in_non_senior):
        non_senior_cnt = sum(coord[2] for coord in in_non_senior)
        non_senior_score = get_hotspot_score(non_senior_cnt, non_senior_score_ranges)
    else: 
        non_senior_score = 0
        
    lanes = 8
    in_velocity = standard_velocity < velocity
    in_volume = volume < 3
    
    score = (
        0.062 * bool(in_crosswalk) +
        0.100 * bool(in_traffic) +
        0.145 * bool(in_intersection) +
        0.017 * bool(in_protection) +
        0.0681 * (lanes/2) +
        0.057 * bool(in_velocity) + 
        0.037 * bool(in_volume) +
        1.0 * senior_score +          
        1.0 * non_senior_score 
    )
        
    #  점수 저장
    all_scores.append(score)
    all_radii.append(radius)
        
    color = risk_color(score)
    
    if score < 0.4:
        risk_levels["safe"] += 1
    elif score < 0.7:
        risk_levels["caution"] += 1
    else:
        risk_levels["danger"] += 1

    tooltip_lines = [f"🧮 총 위험 점수: <b>{score*100:.1f}</b>"]

    # 요소별 점수 계산
    crosswalk_score = 0.0889 * bool(in_crosswalk)*100
    traffic_score = 0.1000 * bool(in_traffic)*100
    intersection_score = 0.0822 * bool(in_intersection)*100
    protection_score = 0.0672 * bool(in_protection)*100
    lane_score = 0.0681 * lanes *100  # lanes=2면 항상 고정
    velocity_score = 0.057 * bool(in_velocity) *100 # 향후 반영용
    volume_score = 0.037 * bool(in_volume) *100

    # 고령/일반 사고 점수는 위에서 계산됨
    tooltip_lines += [
        f"🚸 횡단보도 점수: {crosswalk_score:.1f}",
        f"🚦 신호등 점수: {traffic_score:.1f}",
        f"🛑 교차로 점수: {intersection_score:.1f}",
        f"🛡️ 보호구역 점수: {protection_score:.1f}",
        f"🛣️ 차로 수 점수: {lane_score:.1f}",
        f"👴 고령자 사고 점수: {senior_score:.1f}",
        f"🚗 일반 사고 점수: {non_senior_score:.1f}",
        f"📈 속도 점수: {velocity_score:.1f} / {velocity}",
        f"📊 통행량 점수: {volume_score:.1f} / {volume}"
    ]
            
    tooltip_text = "<br>".join(tooltip_lines)

    # 원 그리기
    folium.Circle(
        location=center,
        radius=radius,
        color=color,
        fill=True,
        fill_color=color,
        fill_opacity=0.3,
        tooltip=tooltip_text
    ).add_to(fmap)
    
count = sum(1 for guide in guides if is_relevant_type(guide["type"]))
print(count)
    
#  평균 위험 점수
mean_score = sum(all_scores) / len(all_scores)
print(f"▶ 평균 위험 점수         : {mean_score*100:.1f}")
mean_score *= (1 + count / 100)
print(f"▶ 평균 위험 점수(가점 추가) : {mean_score*100:.1f}")


#  거리 가중 평균 점수
weighted_score = sum(s * r for s, r in zip(all_scores, all_radii)) / sum(all_radii)
print(f"▶ 거리 가중 평균 점수   : {weighted_score*100:.1f}")
weighted_score *= (1+count/100)
print(f"▶ 거리 가중 평균 점수 (가점추가)  : {weighted_score*100:.1f}")


#  비율 계산
total = sum(risk_levels.values())
safe_pct = risk_levels["safe"] / total * 100
caution_pct = risk_levels["caution"] / total * 100
danger_pct = risk_levels["danger"] / total * 100

print("📊 경로 위험 분석 결과")
print("-" * 30)
print(f"🟢 안전 구간 비율 (%)     : {safe_pct:.1f}%")
print(f"🟡 주의 구간 비율 (%)     : {caution_pct:.1f}%")
print(f"🔴 위험 구간 비율 (%)     : {danger_pct:.1f}%")

print("")
print("단순 평균:", np.mean(all_scores) * 100 )
print("로그 합산 점수:", log_aggregated_score(all_scores) * 100)
print("고위험 보정 점수:", high_risk_boosted_score(all_scores, alpha=0.4) * 100)
print("거리 기반, 고위험도 위험 보정점수 : ", combined_weighted_boosted_score(all_scores, all_radii, alpha=0.3, top_percent=0.2)*100) ## 내용쓰기

# 요약 정보 텍스트 (HTML 구성)
summary_html = f"""
<div style="position: fixed; 
            bottom: 300px; left: 300px; width: 260px; z-index:9999;  
            background-color: white; padding: 15px; border: 2px solid gray; border-radius: 8px; 
            box-shadow: 2px 2px 6px rgba(0,0,0,0.3); font-size: 14px;">
    <b>🧮 도로 위험도 분석 요약</b><br><br>
    ▶ 통합 위험 점수: <b>{combined_weighted_boosted_score(all_scores, all_radii, alpha=0.3)*100:.1f}점</b><br><br>
    ▶ 🟢 안전 구간 비율 (%) : <b>{safe_pct:.1f}%</b><br>
    ▶ 🟡 주의 구간 비율 (%): <b>{caution_pct:.1f}%</b><br>
    ▶ 🔴 위험 구간 비율 (%): <b>{danger_pct:.1f}%</b><br>
</div>
"""

# Element로 감싸서 추가
summary_element = Element(summary_html)
fmap.get_root().html.add_child(summary_element)
    
fmap

2
▶ 평균 위험 점수         : 49.3
▶ 평균 위험 점수(가점 추가) : 50.3
▶ 거리 가중 평균 점수   : 49.7
▶ 거리 가중 평균 점수 (가점추가)  : 50.7
📊 경로 위험 분석 결과
------------------------------
🟢 안전 구간 비율 (%)     : 47.1%
🟡 주의 구간 비율 (%)     : 37.9%
🔴 위험 구간 비율 (%)     : 15.0%

단순 평균: 49.32692810457516
로그 합산 점수: 38.931364009281175
고위험 보정 점수: 64.71215686274509
거리 기반, 고위험도 위험 보정점수 :  61.0


In [61]:
'''
⚠️ 사고다발지 보고 싶으면 여기 주석 풀고보세요
'''
for _, row in senior_hotsopt_df.iterrows():
    lat, lon = row["hotspot_center_lat"], row["hotspot_center_lng"]

    # 1. 실제 반경 100m 원 그리기
    folium.Circle(
        location=(lat, lon),
        radius=50,  #  미터 단위
        color="pink",
        fill=True,
        fill_color="pink",
        fill_opacity=0.5
    ).add_to(fmap)

    # 2. 중심점 점 마커도 추가 (원 안에 표시)
    folium.CircleMarker(
        location=(lat, lon),
        radius=4,
        color="red",
        fill=True,
        fill_color="red",
        fill_opacity=0.9,
    ).add_to(fmap)  

for _, row in non_senior_hotspot_df.iterrows():
    lat, lon = row["hotspot_center_lat"], row["hotspot_center_lng"]

    # 1. 실제 반경 100m 원 그리기
    folium.Circle(
        location=(lat, lon),
        radius=50,  #  미터 단위
        color="skyblue",
        fill=True,
        fill_color="skyblue",
        fill_opacity=0.5
    ).add_to(fmap)

    # 2. 중심점 점 마커도 추가 (원 안에 표시)
    folium.CircleMarker(
        location=(lat, lon),
        radius=4,
        color="skyblue",
        fill=True,
        fill_color="skyblue",
        fill_opacity=0.9,
    ).add_to(fmap)  

fmap