In [43]:
#https://www.data.go.kr/data/15000563/openapi.do

In [44]:
import configparser
# ConfigParser 초기화
config = configparser.ConfigParser()
# keys.config 파일 읽기
config.read('C:/Users/user/Desktop/24-2/졸업프로젝트/project_ai/keys.config')

['C:/Users/user/Desktop/24-2/졸업프로젝트/project_ai/keys.config']

In [45]:
import redis
import json

redis_client = redis.StrictRedis(
    host=config['EC2_INFO']['host'],  # EC2 퍼블릭 IP
    port=6379,              # Redis 기본 포트
    password=config['EC2_INFO']['password'],  # 설정한 비밀번호
    decode_responses=True   # 문자열로 디코딩
)

In [46]:
# # 테스트
# try:
#     # 값 저장 및 TTL 설정 (300초)
#     redis_client.setex("test", 300, "Hello, Redis with TTL!")

#     # 값 확인
#     value = redis_client.get("test")
#     print(f"저장된 값: {value}")

#     # TTL 확인
#     ttl = redis_client.ttl("test")
#     print(f"TTL: {ttl}초")

#     # 5분 후 데이터 만료 확인 (테스트용으로 sleep을 사용)
#     import time
#     print("5분 대기 중...")
#     time.sleep(300)
#     value_after_ttl = redis_client.get("test")
#     print(f"만료 후 값: {value_after_ttl}")  # None 반환
# except Exception as e:
#     print(f"Redis 연결 오류: {e}")

In [47]:
def extract_stage_from_address(address):
    """
    도로명 주소에서 시도(STAGE1)와 시군구(STAGE2)를 추출합니다.
    :param address: 도로명 주소 (예: "서울특별시 강남구 삼성로 123")
    :return: 시도(STAGE1), 시군구(STAGE2)
    """
    try:
        # 주소를 공백으로 분리
        parts = address.split()
        if len(parts) < 2:
            raise ValueError("주소 형식이 잘못되었습니다. 최소 시도와 시군구가 필요합니다.")
        
        # 시도와 시군구 추출
        stage1 = parts[0]  # 첫 번째는 시도
        stage2 = parts[1]  # 두 번째는 시군구

        return stage1, stage2
    except Exception as e:
        print(f"주소 파싱 오류: {e}")
        return None, None

In [48]:
import math
import requests
import xml.etree.ElementTree as ET

# API 호출 함수
def call_api(url, params):
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.text  # XML 응답
    except requests.exceptions.RequestException as e:
        print(f"API 호출 오류: {e}")
        return None


In [49]:

def get_hospitals_by_condition(stage1, stage2, conditions):
    """
    중증질환 조건에 맞는 병원의 hpid를 Redis 캐싱을 통해 필터링
    :param stage1: 시도 (STAGE1)
    :param stage2: 시군구 (STAGE2)
    :param conditions: 조건 목록 (예: ["mkioskty8", "mkioskty9"])
    :return: hpid 목록
    """

    # Redis 캐싱 키 생성
    redis_key = f"hospitals:{stage1}:{stage2}:{','.join(conditions)}"
    print(f"생성된 Redis 키: {redis_key}")
    
    # Redis에서 데이터 조회
    cached_data = redis_client.get(redis_key)
    print(f"저장된 hpid Redis 캐시 데이터: {cached_data}")
    if cached_data:
        print("Redis 캐시에서 병원 데이터 로드")
        return json.loads(cached_data)  # Redis에서 JSON 디코딩
    
    # Redis에 데이터가 없으면 API 호출
    print("Redis 캐시 없음, API 호출 진행")
    url = "http://apis.data.go.kr/B552657/ErmctInfoInqireService/getSrsillDissAceptncPosblInfoInqire"
    params = {
        "STAGE1": stage1,
        "STAGE2": stage2,
        "pageNo": 1,
        "numOfRows": 100,
        "serviceKey": config['API_KEYS']['public_portal_api_key']  
        }
    hpid_list = []

    # 첫 호출로 totalCount 확인
    data = call_api(url, params)
    if not data:
        return []

    root = ET.fromstring(data)
    total_count = int(root.find("body/totalCount").text)
    total_pages = math.ceil(total_count / params["numOfRows"])

    # 모든 페이지 데이터 수집
    for page in range(1, total_pages + 1):
        params["pageNo"] = page
        page_data = call_api(url, params)
        if not page_data:
            continue
        page_root = ET.fromstring(page_data)
        items = page_root.findall(".//item")
        for item in items:
            # OR 조건으로 병원 필터링
            if any(
                item.find(cond) is not None and item.find(cond).text.strip() == "Y"
                for cond in conditions
            ):
                hpid = item.find("hpid").text
                hpid_list.append(hpid)
    
    # 결과를 Redis에 저장
    try:
        redis_client.setex(redis_key, 300, json.dumps(hpid_list))
    except redis.exceptions.RedisError as e:
        print(f"Redis 데이터 저장 오류: {e}")
        
    return hpid_list


In [50]:

def get_real_time_bed_info(stage1, stage2, hpid_list):
    """
    응급실 실시간 데이터를 조회하고, Redis 캐싱을 활용해 효율성을 높입니다.
    :param stage1: 시도 (STAGE1)
    :param stage2: 시군구 (STAGE2)
    :param hpid_list: 필터링된 병원의 hpid 목록
    :return: 병원 실시간 데이터 목록
    """
    url = "http://apis.data.go.kr/B552657/ErmctInfoInqireService/getEmrrmRltmUsefulSckbdInfoInqire"
    params = {
        "STAGE1": stage1,
        "STAGE2": stage2,
        "pageNo": 1,
        "numOfRows": 100,
        "serviceKey": config['API_KEYS']['public_portal_api_key'],
    }
    result = []

    # Redis에서 데이터 확인 및 수집
    for hpid in hpid_list:
        redis_key = f"real_time_bed_info:{hpid}"  # Redis 키
        print(f"생성된 Redis 키: {redis_key}")
        
        cached_data = redis_client.get(redis_key)
        print(f"저장된 응급실 Redis 캐시 데이터: {cached_data}")
        
        if cached_data:
            print(f"Redis에서 {hpid} 데이터 로드")
            result.append(json.loads(cached_data))  # 캐시된 데이터를 추가
        else:
            print(f"Redis 캐시 없음, {hpid} 데이터 API 호출 진행")
            # 첫 호출로 totalCount 확인
            data = call_api(url, params)
            if not data:
                continue
            
            root = ET.fromstring(data)
            items = root.findall(".//item")
            for item in items:
                if item.find("hpid").text == hpid:  # 해당 병원의 데이터만 처리
                    # hospital_data = {
                    #     "hpid": hpid,
                    #     "phpid": item.find("phpid").text if item.find("phpid") is not None else None,
                    #     "hvidate": item.find("hvidate").text if item.find("hvidate") is not None else None,
                    #     "hvec": item.find("hvec").text if item.find("hvec") is not None else None,
                    #     "hvoc": item.find("hvoc").text if item.find("hvoc") is not None else None,
                    #     "hvcc": item.find("hvcc").text if item.find("hvcc") is not None else None,
                    #     "hvncc": item.find("hvncc").text if item.find("hvncc") is not None else None,
                    #     "hvccc": item.find("hvccc").text if item.find("hvccc") is not None else None,
                    #     "hvicc": item.find("hvicc").text if item.find("hvicc") is not None else None,
                    #     "hvgc": item.find("hvgc").text if item.find("hvgc") is not None else None,
                    #     "hvdnm": item.find("hvdnm").text if item.find("hvdnm") is not None else None,
                    #     "hvctayn": item.find("hvctayn").text if item.find("hvctayn") is not None else None,
                    #     "hvmriayn": item.find("hvmriayn").text if item.find("hvmriayn") is not None else None,
                    #     "hvangioayn": item.find("hvangioayn").text if item.find("hvangioayn") is not None else None,
                    #     "hvventiayn": item.find("hvventiayn").text if item.find("hvventiayn") is not None else None,
                    #     "hvamyn": item.find("hvamyn").text if item.find("hvamyn") is not None else None,
                    #     "hv1": item.find("hv1").text if item.find("hv1") is not None else None,
                    #     "hv2": item.find("hv2").text if item.find("hv2") is not None else None,
                    #     "hv3": item.find("hv3").text if item.find("hv3") is not None else None,
                    #     "hv4": item.find("hv4").text if item.find("hv4") is not None else None,
                    #     "hv5": item.find("hv5").text if item.find("hv5") is not None else None,
                    #     "hv6": item.find("hv6").text if item.find("hv6") is not None else None,
                    #     "hv7": item.find("hv7").text if item.find("hv7") is not None else None,
                    #     "hv8": item.find("hv8").text if item.find("hv8") is not None else None,
                    #     "hv9": item.find("hv9").text if item.find("hv9") is not None else None,
                    #     "hv10": item.find("hv10").text if item.find("hv10") is not None else None,
                    #     "hv11": item.find("hv11").text if item.find("hv11") is not None else None,
                    #     "hv12": item.find("hv12").text if item.find("hv12") is not None else None,
                    #     "dutyname": item.find("dutyName").text if item.find("dutyName") is not None else None,
                    #     "dutytel3": item.find("dutytel3").text if item.find("dutytel3") is not None else None,
                    # }
                    hospital_data = {child.tag: child.text for child in item}  # 모든 태그 그대로 매핑

                    # Redis에 데이터 저장 (5분 TTL 설정)
                    redis_client.setex(redis_key, 300, json.dumps(hospital_data))
                    result.append(hospital_data)
    
    return result

In [52]:
import pandas as pd

In [56]:
import pandas as pd
import json
from utils.geocode import address_to_coords

if __name__ == "__main__":
    # 사용자 입력
    address = "서울특별시 강남구 삼성로 123"

    api_key = config['API_KEYS']['kakao_api_key']
    user_coords = address_to_coords(address, api_key)

    if "error" in user_coords:
        print(f"오류: {user_coords['error']}")
    else:
        user_lat = user_coords["lat"]
        user_lon = user_coords["lon"]
        print(f"사용자 좌표: 위도 {user_lat}, 경도 {user_lon}")

    conditions = ["MKioskTy8", "MKioskTy10"]  # 조산산모, 신생아
    #정신질환자 : MKioskTy9
    #중증화상 : MKioskTy11

    # 주소에서 시도와 시군구 추출
    stage1, stage2 = extract_stage_from_address(address)

    # 1. 중증질환 조건 필터링
    hpid_list = get_hospitals_by_condition(stage1, stage2, conditions)

    # Redis 캐시에 데이터 확인 로그
    if not hpid_list:
        print("조건에 맞는 병원이 없습니다.")
    else:
        print(f"필터링된 병원 목록: {hpid_list}")

    # 2. 실시간 병상 정보 조회
    real_time_data = get_real_time_bed_info(stage1, stage2, hpid_list)

    if real_time_data:
        # Redis에 저장된 데이터 확인
        hospital_data_list = []
        for hpid in hpid_list:
            redis_key = f"real_time_bed_info:{hpid}"
            cached_data = redis_client.get(redis_key)

            if cached_data:
                print(f"Redis에서 데이터 로드: {redis_key}")
                hospital_data = json.loads(cached_data)
                print(f"병원 데이터: {hospital_data}")
                hospital_data_list.append(hospital_data)
            else:
                print(f"Redis에 데이터 없음: {redis_key}")

        # DataFrame으로 변환 및 출력
        df = pd.DataFrame(hospital_data_list)
        priority_columns = [
            "hpid", "phpid", "hvidate", "hvec", "hvoc", "hvcc", "hvncc", "hvccc",
            "hvicc", "hvgc", "hvdnm", "hvctayn", "hvmriayn", "hvangioayn", "hvventiayn",
            "hvamyn", "hv1", "hv2", "hv3", "hv4", "hv5", "hv6", "hv7", "hv8", "hv9",
            "hv10", "hv11", "hv12", "dutyname", "dutytel3"
        ]

        #재정렬
        existing_priority_columns = [col for col in priority_columns if col in df.columns]
        other_columns = [col for col in df.columns if col not in priority_columns]
        sorted_columns = existing_priority_columns + other_columns
        df = df[sorted_columns]

        # DataFrame을 CSV로 저장 (옵션)
        df.to_csv("real_time_bed_info.csv", index=False, encoding="utf-8-sig")
        print("\nDataFrame이 CSV로 저장되었습니다: real_time_bed_info.csv")
    else:
        print("실시간 병상 정보가 없습니다.")

생성된 Redis 키: hospitals:서울특별시:강남구:MKioskTy8,MKioskTy10
저장된 hpid Redis 캐시 데이터: ["A1100015", "A1100010"]
Redis 캐시에서 병원 데이터 로드
필터링된 병원 목록: ['A1100015', 'A1100010']
생성된 Redis 키: real_time_bed_info:A1100015
저장된 응급실 Redis 캐시 데이터: {"dutyName": "\uc5f0\uc138\ub300\ud559\uad50\uc758\uacfc\ub300\ud559 \uac15\ub0a8\uc138\ube0c\ub780\uc2a4\ubcd1\uc6d0", "dutyTel3": "02-2019-3333", "hpid": "A1100015", "hv10": "Y", "hv11": "Y", "hv28": "0", "hv29": "1", "hv30": "1", "hv34": "3", "hv41": "5", "hv42": "Y", "hv5": "Y", "hv6": "4", "hv7": "Y", "hvamyn": "Y", "hvangioayn": "Y", "hvcrrtayn": "Y", "hvctayn": "Y", "hvec": "-3", "hvecmoayn": "Y", "hvgc": "139", "hvhypoayn": "Y", "hvicc": "6", "hvidate": "20241226121616", "hvincuayn": "Y", "hvmriayn": "Y", "hvncc": "4", "hvoc": "0", "hvoxyayn": "N1", "hvs01": "15", "hvs02": "2", "hvs03": "1", "hvs04": "2", "hvs08": "28", "hvs12": "12", "hvs15": "8", "hvs17": "29", "hvs22": "23", "hvs25": "5", "hvs26": "1", "hvs27": "4", "hvs28": "3", "hvs29": "6", "hvs30": "47

In [57]:
df.columns

Index(['hpid', 'phpid', 'hvidate', 'hvec', 'hvoc', 'hvncc', 'hvccc', 'hvicc',
       'hvgc', 'hvctayn', 'hvmriayn', 'hvangioayn', 'hvventiayn', 'hvamyn',
       'hv2', 'hv3', 'hv5', 'hv6', 'hv7', 'hv10', 'hv11', 'dutyName',
       'dutyTel3', 'hv28', 'hv29', 'hv30', 'hv34', 'hv41', 'hv42', 'hvcrrtayn',
       'hvecmoayn', 'hvhypoayn', 'hvincuayn', 'hvoxyayn', 'hvs01', 'hvs02',
       'hvs03', 'hvs04', 'hvs08', 'hvs12', 'hvs15', 'hvs17', 'hvs22', 'hvs25',
       'hvs26', 'hvs27', 'hvs28', 'hvs29', 'hvs30', 'hvs31', 'hvs32', 'hvs33',
       'hvs34', 'hvs35', 'hvs38', 'hvventisoayn', 'rnum', 'hv32', 'hv35',
       'hv40', 'hvs06', 'hvs07', 'hvs09', 'hvs16', 'hvs18', 'hvs24'],
      dtype='object')

In [60]:
df

Unnamed: 0,hpid,phpid,hvidate,hvec,hvoc,hvncc,hvccc,hvicc,hvgc,hvctayn,...,rnum,hv32,hv35,hv40,hvs06,hvs07,hvs09,hvs16,hvs18,hvs24
0,A1100015,A1100015,20241226121616,-3,0,4,,6,139,Y,...,1,,,,,,,,,
1,A1100010,A1100010,20241226121847,-5,22,19,7.0,2,688,Y,...,2,5.0,2.0,16.0,30.0,23.0,15.0,25.0,2.0,27.0


In [100]:
# 조건 및 매핑
conditions = ["MKioskTy8", "MKioskTy10"]  # 조산산모, 신생아
conditions_mapping = {
    "MKioskTy8": ["hpid", "dutyName", "hvamyn", "hvec", "hvoc", "hvncc", "hvventisoayn", "hvincuayn", "hv10", "dutyTel3", "hv36", "hvs05", "hvs08", "hvs26", "hvs30", "hvs31", "hvs32"],  # 조산산모
    "MKioskTy10": ["hpid", "dutyName", "hvamyn", "hvec", "hvoc", "hvncc", "hvventisoayn", "hvincuayn", "hv10", "dutyTel3", "hv36", "hvs05", "hvs08", "hvs26", "hvs30", "hvs31", "hvs32"],  # 신생아
    "MKioskTy9": ["hpid", "dutyName", "hvamyn", "hvec", "hvoc", "hv40", "dutyTel3", "hvs24"],  # 정신질환자
    "MKioskTy11": ["hpid", "dutyName", "hvamyn", "hvec", "hvoc", "hv8", "hv43", "dutyTel3", "hvs13"]   # 중증화상
}

# 조건에 따라 선택된 컬럼 수집
selected_columns = []
for condition in conditions:
    if condition in conditions_mapping:
        selected_columns.extend(conditions_mapping[condition])


# 중복 제거
# selected_columns = list(set(selected_columns))
selected_columns = list(dict.fromkeys(selected_columns))  # 순서 유지하며 중복 제거

# df에 존재하는 컬럼만 선택
existing_columns = [col for col in selected_columns if col in df.columns]

# 존재하지 않는 컬럼 (로그 용)
missing_columns = [col for col in selected_columns if col not in df.columns]

# 출력 로그
print(f"df에 존재하지 않는 컬럼: {missing_columns}")

# df의 실제 컬럼 중 선택된 컬럼만 필터링
filtered_df = df[existing_columns]

# 결과 저장
filtered_df.to_csv("filtered_columns.csv", index=False, encoding="utf-8-sig")
print("필터링된 결과가 'filtered_columns.csv' 파일로 저장되었습니다.")

df에 존재하지 않는 컬럼: ['hvs05']
필터링된 결과가 'filtered_columns.csv' 파일로 저장되었습니다.


In [101]:
filtered_df

Unnamed: 0,hpid,dutyName,hvamyn,hvec,hvoc,hvncc,hvventisoayn,hvincuayn,hv10,dutyTel3,hv36,hvs08,hvs26,hvs30,hvs31,hvs32
0,A1100015,연세대학교의과대학 강남세브란스병원,Y,-3,0,4,Y,Y,Y,02-2019-3333,,28,1,47,10,23
1,A1100010,삼성서울병원,Y,-5,22,19,Y,Y,Y,02-3410-2060,,59,7,130,18,42


# 외상센터 수집 후, 외상센터인지 아닌지 필터링->주소와 경위도

In [104]:
# 1. 외상센터 hpid 수집
def fetch_trauma_center_hpids():
    url = "http://apis.data.go.kr/B552657/ErmctInfoInqireService/getStrmBassInfoInqire"
    service_key = config['API_KEYS']['public_portal_api_key']
    params = {"ServiceKey": service_key}
    
    xml_data = call_api(url, params)
    if not xml_data:
        return []

    root = ET.fromstring(xml_data)
    hpids = [item.find("hpid").text for item in root.findall(".//item")]
    return hpids

# 2. dutyAddr, wgs84Lat, wgs84Lon 수집 및 추가
def fetch_location_data(hpid, is_trauma):
    base_url = "http://apis.data.go.kr/B552657/ErmctInfoInqireService/"
    endpoint = "getStrmBassInfoInqire" if is_trauma else "getEgytBassInfoInqire"
    url = f"{base_url}{endpoint}"
    service_key = config['API_KEYS']['public_portal_api_key']
    params = {"ServiceKey": service_key, "HPID": hpid}
    
    xml_data = call_api(url, params)
    if not xml_data:
        return None, None, None

    root = ET.fromstring(xml_data)
    item = root.find(".//item")
    if item is not None:
        dutyAddr = item.find("dutyAddr").text if item.find("dutyAddr") is not None else None
        wgs84Lat = item.find("wgs84Lat").text if item.find("wgs84Lat") is not None else None
        wgs84Lon = item.find("wgs84Lon").text if item.find("wgs84Lon") is not None else None
        return dutyAddr, wgs84Lat, wgs84Lon
    return None, None, None

# 3. filtered_df에 값 추가
def enrich_filtered_df(filtered_df):
    trauma_hpids = fetch_trauma_center_hpids()
    print(f"외상센터 hpid 수집 완료: {trauma_hpids}")

    # Initialize new columns with None
    filtered_df = filtered_df.copy()  # Avoid SettingWithCopyWarning
    filtered_df["dutyAddr"] = None
    filtered_df["wgs84Lat"] = None
    filtered_df["wgs84Lon"] = None

    # Populate the new columns based on hpid
    for index, row in filtered_df.iterrows():
        hpid = row["hpid"]
        is_trauma = hpid in trauma_hpids
        dutyAddr, wgs84Lat, wgs84Lon = fetch_location_data(hpid, is_trauma)
        filtered_df.at[index, "dutyAddr"] = dutyAddr
        filtered_df.at[index, "wgs84Lat"] = wgs84Lat
        filtered_df.at[index, "wgs84Lon"] = wgs84Lon

    # Rearrange columns to move dutyAddr, wgs84Lat, wgs84Lon after dutyName
    columns = filtered_df.columns.tolist()
    if "dutyName" in columns:
        # Find the index of dutyName
        duty_name_index = columns.index("dutyName")
        # Remove the new columns if they exist in the list
        columns.remove("dutyAddr")
        columns.remove("wgs84Lat")
        columns.remove("wgs84Lon")
        # Insert the new columns after dutyName
        for col in ["dutyAddr", "wgs84Lat", "wgs84Lon"]:
            columns.insert(duty_name_index + 1, col)
            duty_name_index += 1

    # Rearrange DataFrame
    filtered_df = filtered_df[columns]

    return filtered_df

In [105]:
enriched_df = enrich_filtered_df(filtered_df)
enriched_df.to_csv("enriched_filtered_df.csv", index=False, encoding="utf-8-sig")
print("결과가 'enriched_filtered_df.csv' 파일로 저장되었습니다.")

외상센터 hpid 수집 완료: ['A2200001', 'A2300001', 'A2400002', 'A2500001', 'A1100008', 'A1100014', 'A1100017', 'A1100052', 'A1200002', 'A1300002']
결과가 'enriched_filtered_df.csv' 파일로 저장되었습니다.


In [106]:
enriched_df

Unnamed: 0,hpid,dutyName,dutyAddr,wgs84Lat,wgs84Lon,hvamyn,hvec,hvoc,hvncc,hvventisoayn,hvincuayn,hv10,dutyTel3,hv36,hvs08,hvs26,hvs30,hvs31,hvs32
0,A1100015,연세대학교의과대학 강남세브란스병원,"서울특별시 강남구 언주로 211, 강남세브란스병원 (도곡동)",37.49280698464548,127.04631254186796,Y,-3,0,4,Y,Y,Y,02-2019-3333,,28,1,47,10,23
1,A1100010,삼성서울병원,"서울특별시 강남구 일원로 81 (일원동, 삼성의료원)",37.48851613490445,127.08668245340024,Y,-5,22,19,Y,Y,Y,02-3410-2060,,59,7,130,18,42


# 현위치 기준 소요시간 계산

In [108]:
from utils.direction import get_travel_time

# enriched_df에 소요시간 계산 및 정렬
def calculate_travel_time_and_sort(enriched_df, user_lat, user_lon):
    enriched_df = enriched_df.copy()

    # 소요시간 컬럼 추가
    enriched_df["travelTime"] = enriched_df.apply(
        lambda row: get_travel_time(
            user_lat, user_lon,
            float(row["wgs84Lat"]) if row["wgs84Lat"] else None,
            float(row["wgs84Lon"]) if row["wgs84Lon"] else None
        ) if row["wgs84Lat"] and row["wgs84Lon"] else None,
        axis=1
    )

    # 소요시간과 hvec 기준으로 정렬
    enriched_df["hvec_abs"] = enriched_df["hvec"].astype(float).abs()
    enriched_df.sort_values(
        by=["travelTime", "hvec_abs"],
        ascending=[True, True],  # 소요시간: 오름차순, hvec 절대값: 오름차순
        inplace=True
    )

    # hvec_abs 컬럼 삭제 (정렬에만 사용)
    enriched_df.drop(columns=["hvec_abs"], inplace=True)

    return enriched_df

In [None]:
df.to_json(orient="records", force_ascii=False)