In [8]:
# 패키지 설치 (필요한 경우만 실행)
# !pip install tensorflow pandas scikit-learn matplotlib

# 라이브러리 임포트
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

In [9]:
def load_data():
    승하차_파일 = "../../data/결과/승하차/통합/2호선_승하차인원_통합.csv"
    혼잡도_파일 = "../../data/결과/혼잡도/통합/2호선_혼잡도_통합.csv"
    시간표_파일 = "../../data/결과/운행시간표/통합/2호선_열차운행시각표.csv"

    승하차_df = pd.read_csv(승하차_파일, encoding="euc-kr")
    혼잡도_df = pd.read_csv(혼잡도_파일, encoding="euc-kr")
    시간표_df = pd.read_csv(시간표_파일, encoding="euc-kr")

    return 승하차_df, 혼잡도_df, 시간표_df

In [10]:
def get_user_input():
    역명 = input("역명을 입력하세요 (예: 강남역): ")
    방향 = input("방향을 입력하세요 (상행/하행): ")
    평일주말 = input("요일을 입력하세요 (평일/주말): ")
    시간 = input("시간을 입력하세요 (형식 HH:MM): ")
    return 역명.strip(), 방향.strip(), 평일주말.strip(), 시간.strip()


def convert_time_to_column(time_str):
    return f"{int(time_str.split(':')[0])}:00"


def get_station_number(df, 역명):
    row = df[df["역명"] == 역명].iloc[0]
    return row["역번호"]

In [11]:
def get_train_count_in_hour(시간표, 평일주말, 방향, hour_str):
    try:
        hour = int(hour_str.split(":")[0])
        시간표_필터 = 시간표[
            (시간표["평일주말"] == 평일주말) & (시간표["구분"] == 방향)
        ].copy()

        시간표_필터["열차도착시간_dt"] = pd.to_datetime(
            시간표_필터["열차도착시간"], format="%H:%M:%S", errors="coerce"
        )

        # 열차코드 + 역번호 기준 중복 제거 (1일 기준)
        하루치 = 시간표_필터.drop_duplicates(subset=["열차코드", "역번호"])
        해당시간_열차 = 하루치[하루치["열차도착시간_dt"].dt.hour == hour]
        열차수 = 해당시간_열차["열차코드"].nunique()

        return 열차수 if 열차수 > 0 else 1
    except:
        return 1

In [12]:
def estimate_train_occupancy(
    역번호, 방향, 평일주말, 시간대, 승하차, 혼잡도, 시간표, 열차_정원=2000
):
    시간컬럼 = convert_time_to_column(시간대)

    relevant = (
        승하차[
            (승하차["평일주말"] == 평일주말) & (승하차["구분"].isin(["승차", "하차"]))
        ]
        .drop_duplicates(subset="역번호")
        .sort_values("역번호")
    )

    if 방향 == "상행":
        relevant = relevant.sort_values("역번호", ascending=True)
    else:
        relevant = relevant.sort_values("역번호", ascending=False)

    역번호_리스트 = relevant["역번호"].tolist()
    if 역번호 not in 역번호_리스트:
        print("❗ 역번호 없음")
        return None

    idx = 역번호_리스트.index(역번호)
    경로역 = 역번호_리스트[: idx + 1]

    열차수 = get_train_count_in_hour(시간표, 평일주말, 방향, 시간대)

    총_승차 = 총_하차 = 0
    for 역 in 경로역:
        승_df = 승하차[
            (승하차["역번호"] == 역)
            & (승하차["평일주말"] == 평일주말)
            & (승하차["구분"] == "승차")
        ]
        하_df = 승하차[
            (승하차["역번호"] == 역)
            & (승하차["평일주말"] == 평일주말)
            & (승하차["구분"] == "하차")
        ]
        if not 승_df.empty and 시간컬럼 in 승_df.columns:
            총_승차 += float(승_df[시간컬럼].values[0])
        if not 하_df.empty and 시간컬럼 in 하_df.columns:
            총_하차 += float(하_df[시간컬럼].values[0])

    평균_승차 = 총_승차 / 열차수
    평균_하차 = 총_하차 / 열차수
    누적_인원_추정 = 평균_승차 - 평균_하차
    누적_인원_보정 = max(0, 누적_인원_추정)

    # 혼잡도
    혼잡도_row = 혼잡도[
        (혼잡도["역번호"] == 역번호)
        & (혼잡도["평일주말"] == 평일주말)
        & (혼잡도["구분"] == 방향)
    ]
    if not 혼잡도_row.empty and 시간컬럼 in 혼잡도_row.columns:
        혼잡도값 = float(혼잡도_row[시간컬럼].values[0])
        혼잡도_예측 = 열차_정원 * (혼잡도값 / 100)
    else:
        혼잡도값 = None
        혼잡도_예측 = None

    # 보정 로직
    if 혼잡도_예측 is not None:
        if 누적_인원_보정 == 0 and 혼잡도값 > 50:
            최종 = int(혼잡도_예측 * 0.7)
            신뢰 = (int(혼잡도_예측 * 0.65), int(혼잡도_예측 * 1.05))
        else:
            최종 = int((누적_인원_보정 + 혼잡도_예측) / 2)
            신뢰 = (min(누적_인원_보정, 혼잡도_예측), max(누적_인원_보정, 혼잡도_예측))
    else:
        최종 = int(누적_인원_보정)
        신뢰 = (최종, 최종)

    return {
        "열차수": 열차수,
        "승하차기반": int(누적_인원_보정),
        "혼잡도값": 혼잡도값,
        "혼잡도기반": int(혼잡도_예측) if 혼잡도_예측 is not None else None,
        "최종보정값": 최종,
        "신뢰구간": tuple(map(int, 신뢰)),
    }

In [13]:
def save_result_to_csv(역명, 방향, 평일주말, 시간, 결과, 저장경로="./예측결과.csv"):
    row = {
        "날짜": pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S"),
        "역명": 역명,
        "방향": 방향,
        "평일/주말": 평일주말,
        "시간": 시간,
        "열차수": 결과["열차수"],
        "승하차기반": 결과["승하차기반"],
        "혼잡도값": 결과["혼잡도값"],
        "혼잡도기반": 결과["혼잡도기반"],
        "최종보정값": 결과["최종보정값"],
        "신뢰구간_min": 결과["신뢰구간"][0],
        "신뢰구간_max": 결과["신뢰구간"][1],
    }

    try:
        df = pd.read_csv(저장경로)
        df = df.append(row, ignore_index=True)
    except FileNotFoundError:
        df = pd.DataFrame([row])

    df.to_csv(저장경로, index=False, encoding="utf-8-sig")
    print(f"\n✅ 예측 결과가 '{저장경로}'에 저장되었습니다.")

In [14]:
def main():
    승하차, 혼잡도, 시간표 = load_data()
    역명, 방향, 평일주말, 시간 = get_user_input()
    역번호 = get_station_number(승하차, 역명)
    결과 = estimate_train_occupancy(
        역번호, 방향, 평일주말, 시간, 승하차, 혼잡도, 시간표
    )

    print("\n📊 정밀 예측 결과")
    print(f"🟩 역명: {역명} | 방향: {방향} | {평일주말} {시간}")
    print(f"🚈 해당시간 열차 수: {결과['열차수']} 대")

    print(f"\n(1) 승하차 누적 기반 추정 인원: {결과['승하차기반']} 명")
    if 결과["혼잡도값"] is not None:
        print(
            f"(2) 혼잡도 수치: {결과['혼잡도값']:.1f}% → 추정 인원: {결과['혼잡도기반']} 명"
        )
        print(f"(3) 보정 결과 평균값: {결과['최종보정값']} 명")
        print(f"(4) 신뢰구간: {결과['신뢰구간'][0]} ~ {결과['신뢰구간'][1]} 명")
    else:
        print("혼잡도 데이터 없음 → 승하차 기반만 사용")
        print(f"최종 추정 인원: {결과['최종보정값']} 명")

    # 🔽 결과 저장 추가
    save_result_to_csv(역명, 방향, 평일주말, 시간, 결과)

In [15]:
# Cell 7
main()


📊 정밀 예측 결과
🟩 역명: 강남 | 방향: 상행 | 주말 18:00
🚈 해당시간 열차 수: 14 대

(1) 승하차 누적 기반 추정 인원: 303 명
(2) 혼잡도 수치: 49.9% → 추정 인원: 998 명
(3) 보정 결과 평균값: 650 명
(4) 신뢰구간: 303 ~ 998 명

✅ 예측 결과가 './예측결과.csv'에 저장되었습니다.


In [18]:
import pandas as pd
import itertools
import os
from tqdm import tqdm # 진행률 표시
import warnings
warnings.filterwarnings("ignore")


def generate_all_combinations(승하차_df):
    역명_리스트 = 승하차_df['역명'].unique().tolist()
    방향_리스트 = ['상행', '하행']
    평일주말_리스트 = ['평일', '주말']
    시간대_리스트 = [f"{str(h).zfill(2)}:00" for h in range(6, 24)] # 06:00 ~ 23:00
    
    모든조합 = list(itertools.product(역명_리스트, 방향_리스트, 평일주말_리스트, 시간대_리스트))
    return 모든조합


def batch_predict_and_save(filepath="./전체예측결과.csv"):
    승하차, 혼잡도, 시간표 = load_data()
    모든조합 = generate_all_combinations(승하차)

    결과_리스트 = []
    for 역명, 방향, 평일주말, 시간대 in tqdm(모든조합, desc="전체 예측 중"):
        try:
            역번호 = get_station_number(승하차, 역명)
            결과 = estimate_train_occupancy(
                역번호, 방향, 평일주말, 시간대, 승하차, 혼잡도, 시간표
            )
            결과_리스트.append({
                "날짜": pd.Timestamp.today().strftime("%Y-%m-%d"),
                "역명": 역명,
                "방향": 방향,
                "평일/주말": 평일주말,
                "시간대": 시간대,
                "열차수": 결과["열차수"],
                "승하차기반": 결과["승하차기반"],
                "혼잡도값": 결과["혼잡도값"],
                "혼잡도기반": 결과["혼잡도기반"],
                "최종보정값": 결과["최종보정값"],
                "신뢰구간_min": 결과["신뢰구간"][0],
                "신뢰구간_max": 결과["신뢰구간"][1],
            })
        except Exception as e:
            print(f"❗ 예외 발생 - 역명:{역명}, {방향} {평일주말} {시간대} → {e}")

    df = pd.DataFrame(결과_리스트)
    df.to_csv(filepath, index=False, encoding="utf-8-sig")
    print(f"\n✅ 전체 예측 결과가 '{filepath}'에 저장되었습니다. 총 {len(df)}건.")


In [None]:
# 모든 조합 예측 결과 저장 실행
batch_predict_and_save()

전체 예측 중:   1%|▏         | 53/3600 [00:54<2:55:23,  2.97s/it]