In [1]:
import numpy as np

"""STVM 계산(3차로 점선)"""
import pandas as pd

import os


# FIX 값 모음
###################################################################################################################
start_interval = 1800
end_interval = 5400

weights = {
    "w1" : 1,
    "w2" : 1,
    "w3" : 1,
    "w4" : 1,
    "w5" : 1,
    "w6" : 1
}

vehicle_types = [100, 300, 630, 640, 650]
######## 검지기 #############
# 진입부 검지기
enter_line = 10
# 본선부 검지기
main_line = 60
############################


######## 램프 ###############
# 유입램프
input_ramp = 59

# 유출램프
output_ramp = 61
############################

######## 구간 ###############
# 진입부
entry_point = [i for i in range(1,21)]

# 본선부
middle_point = [i for i in range(21,101)]

# 진출부
exit_point = [i for i in range(101,121)]
############################



###################################################################################################################

# 함수 모음
###################################################################################################################
# 평균속도
def speed_mean(df):
    # TimeGroup, New_Measurement별 그룹화 및 속도 평균
    speed_mean_df = (
        df.groupby(["TimeGroup", "New_Measurement"])
          .agg(V_mean=("v[km/h]", "mean"), V_count=("v[km/h]", "count"))
          .reset_index()
    )
    speed_mean_df["V_next"] = speed_mean_df.groupby("TimeGroup")["V_mean"].shift(-1)
    speed_mean_df["delta_V"] = (speed_mean_df["V_next"] - speed_mean_df["V_mean"]) / speed_mean_df["V_mean"]
    speed_mean_df["delta_V"] = speed_mean_df["delta_V"].fillna(0)
    return speed_mean_df

# 밀도
def density_mean(df):
    density_mean_df = df.assign(K = df["V_count"] * 12 / df["V_mean"])
    density_mean_df["K_next"] = density_mean_df.groupby("TimeGroup")["K"].shift(-1)
    density_mean_df["delta_K"] = (density_mean_df["K_next"] - density_mean_df["K"]) / density_mean_df["K"]
    density_mean_df["delta_K"] = density_mean_df["delta_K"].fillna(0)
    return density_mean_df

# 가중 중차량 비율
def heavy_rate(original_df):

    # 진입부 검지기 선정
    measurement = enter_line

    # TimeGroup별 중차량 갯수 집계
    heavy_df = (
        original_df[
            (original_df["New_Measurement"] == measurement) &
            (original_df["Vehicle type"].isin([630,640,650]))
        ]
        .groupby("TimeGroup")
        .size()
        .reset_index(name="heavy_count")
    )
    # TimeGroup별 총 차량 갯수 집계
    total_df = (
        original_df[original_df["New_Measurement"] == measurement]
        .groupby("TimeGroup")
        .size()
        .reset_index(name="total_count")
    )

    heavy_rate_df = pd.merge(heavy_df, total_df, on="TimeGroup", how="left")
    heavy_rate_df["rate"] = heavy_rate_df["heavy_count"] / heavy_rate_df["total_count"]
    return heavy_rate_df


# 진입 포화도
def entry_saturation(original_df):
    # 실측용량 C
    max_capacity = 2200
    entry_saturation_df = (
        # 진입부에서 유입된 교통량이므로 진입부 중 한 개의 검지기를 선정하여 측정
        original_df[original_df["New_Measurement"] == 10]
        .groupby(["TimeGroup", "New_Measurement"])
        .size()
        .reset_index(name="entry_volume")  # 차량 수를 entry_volume이라는 컬럼명으로
    )
    # 단위가 대/시 이기 때문에 현재 5분집계 * 12
    entry_saturation_df["Phi_진입"] = entry_saturation_df["entry_volume"] * 12 / max_capacity
    return entry_saturation_df

# 램프 유출입 비율
def rfr_rate(original_df):
    """
    1. 본선 1개의 검지기 data 수집(New_Measurement = 60)
    2. 위의 검지기 앞, 뒤 검지기 data (59 / 61)
    3. RFR 연산
    """
    main_df = (
        original_df[original_df["New_Measurement"] == main_line]
        .groupby(["TimeGroup", "New_Measurement"])
        .size()
        .reset_index(name="main_line")
    )

    input_df = (
        original_df[original_df["New_Measurement"] == input_ramp]
        .groupby(["TimeGroup", "New_Measurement"])
        .size()
        .reset_index(name="input_ramp")
    )
    output_df = (
        original_df[original_df["New_Measurement"] == output_ramp]
        .groupby(["TimeGroup", "New_Measurement"])
        .size()
        .reset_index(name="output_ramp")
    )
    main_df["RFR"] = (input_df["input_ramp"] + output_df["output_ramp"])/main_df["main_line"]

    return main_df

# 유출 정상성 비율
def output_normality(original_df):
    # 진입부
    entry_df = (
        original_df[original_df["New_Measurement"].isin(entry_point)]
        .groupby(["TimeGroup"])
        .size()
        .reset_index(name="entry_volume")
    )

    # 진출부
    exit_df = (
        original_df[original_df["New_Measurement"].isin(exit_point)]
        .groupby(["TimeGroup"])
        .size()
        .reset_index(name="exit_volume")
    )
    normality_df = pd.concat([entry_df, exit_df["exit_volume"]], axis=1)
    normality_df["F(outrate)"] = normality_df["exit_volume"] / normality_df["entry_volume"]
    normality_df["1-F(outrate)"] = 1 - normality_df["F(outrate)"]
    return normality_df

def calculate_stvm(speed_df, density_df, heavy_df, entry_saturation_df, rfr_df, normality_df):

    # TimeGroup 기준으로  Merge
    merged = (
    speed_df[["TimeGroup", "New_Measurement", "delta_V"]]
    .merge(density_df[["TimeGroup", "New_Measurement", "delta_K"]], on=["TimeGroup", "New_Measurement"])
    .merge(heavy_df[["TimeGroup", "rate"]], on="TimeGroup")
    .merge(entry_saturation_df[["TimeGroup", "Phi_진입"]], on="TimeGroup")
    .merge(rfr_df[["TimeGroup", "RFR"]], on="TimeGroup")
    .merge(normality_df[["TimeGroup", "1-F(outrate)"]], on="TimeGroup")
    )


    merged["STVM"] = (
        weights["w1"] * merged["delta_V"] +
        weights["w2"] * merged["delta_K"] +
        weights["w3"] * merged["rate"] +
        weights["w4"] * merged["Phi_진입"] +
        weights["w5"] * merged["RFR"] +
        weights["w6"] * merged["1-F(outrate)"]
    )

    return merged[["TimeGroup", "New_Measurement", "STVM"]]


def calculate_z_score(stvm_df):
    # 평균
    mean_stvm = stvm_df["STVM"].mean(skipna=True)

    # 표준편차
    std_stvm = stvm_df["STVM"].std(skipna=True)

    # Z-Score 계산
    stvm_df["Z-Score"] = (stvm_df["STVM"] - mean_stvm) / std_stvm
    z_max = stvm_df["Z-Score"].max()
    z_min = stvm_df["Z-Score"].min()

    stvm_df["환산점수"] = stvm_df["Z-Score"].apply(lambda z : z_to_score(z, z_min, z_max))

    stvm_df = pd.pivot(stvm_df, index="TimeGroup", columns= "New_Measurement", values="환산점수")
    return stvm_df


def z_to_score(z, z_min, z_max):
    if 1.645 <= z <= z_max:
        return 50 + ((95 + 5 * ((z - 1.645) / (z_max - 1.645))) * 0.5)
    elif 1.282 <= z < 1.645:
        return 50 + ((90 + 5 * ((z - 1.282) / (1.645 - 1.282))) * 0.5)
    elif 1.038 <= z < 1.282:
        return 50 + ((85 + 5 * ((z - 1.038) / (1.282 - 1.038))) * 0.5)
    elif 0.842 <= z < 1.038:
        return 50 + ((80 + 5 * ((z - 0.842) / (1.038 - 0.842))) * 0.5)
    elif 0.676 <= z < 0.842:
        return 50 + ((75 + 5 * ((z - 0.676) / (0.842 - 0.676))) * 0.5)
    elif 0.526 <= z < 0.676:
        return 50 + ((70 + 5 * ((z - 0.526) / (0.676 - 0.526))) * 0.5)
    elif 0.387 <= z < 0.526:
        return 50 + ((65 + 5 * ((z - 0.387) / (0.526 - 0.387))) * 0.5)
    elif 0.255 <= z < 0.387:
        return 50 + ((60 + 5 * ((z - 0.255) / (0.387 - 0.255))) * 0.5)
    elif -0.255 <= z < 0.255:
        return 50 + ((40 + 5 * ((z + 0.255) / (0.255 + 0.255))) * 0.5)
    elif -0.387 <= z < -0.255:
        return 50 + ((35 + 5 * ((z + 0.387) / (-0.255 + 0.387))) * 0.5)
    elif -0.526 <= z < -0.387:
        return 50 + ((30 + 5 * ((z + 0.526) / (-0.387 + 0.526))) * 0.5)
    elif -0.676 <= z < -0.526:
        return 50 + ((25 + 5 * ((z + 0.676) / (-0.676 + 0.842))) * 0.5)
    elif -0.842 <= z < -0.676:
        return 50 + ((20 + 5 * ((z + 0.842) / (-0.676 + 0.842))) * 0.5)
    elif -1.038 <= z < -0.842:
        return 50 + ((15 + 5 * ((z + 1.038) / (-0.842 + 1.038))) * 0.5)
    elif -1.282 <= z < -1.038:
        return 50 + ((10 + 5 * ((z + 1.282) / (-1.038 + 1.282))) * 0.5)
    elif -1.645 <= z < -1.282:
        return 50 + ((5 + 5 * ((z + 1.645) / (-1.282 + 1.645))) * 0.5)
    elif z_min <= z < -1.645:
        return 50 + ((0 + 5 * ((z - z_min) / (-1.645 + abs(z_min)))) * 0.5)
    else:
        return np.nan

###################################################################################################################


folder_path = r"C:\VISSIM_Workspace\network_new\network_version2\network_version2.3\network_version2.3.0\network03_mer"
mer_list = [file for file in os.listdir(folder_path) if file.endswith(".mer")]
mer_file = mer_list[0]

grouped_df = pd.DataFrame()
result_df = pd.DataFrame()





with open(os.path.join(folder_path, mer_file), "r", encoding="utf-8", errors="ignore") as file:
        lines = file.readlines()
        # 데이터가 시작하는 인덱스 찾기
        data_start_idx = None

        for j, line in enumerate(lines):
            if "Measurem." in line:  # 컬럼명이 포함된 행 찾기
                data_start_idx = j
                break

        # 데이터프레임 생성
        if data_start_idx is not None:

            # 컬럼명 추출 및 공백 제거
            columns = [col.strip() for col in lines[data_start_idx].strip().split(";")]

            # 데이터 부분 추출 및 가공
            data_lines = lines[data_start_idx + 1:]  # 컬럼명 제외, 데이터 부분
            data = [line.strip().split(";") for line in data_lines if line.strip()]

            # 데이터프레임 생성
            df = pd.DataFrame(data, columns=columns)

            # 컬럼 내부 데이터 정수형 변환
            df = df.apply(pd.to_numeric, errors="coerce")

            original_df = df[(df["t(Entry)"] != -1.00)].reset_index(drop=True)


            #불필요 컬럼 제거
            original_df.drop(columns=["b[m/s2]", "tQueue", "Occ", "Pers"], inplace=True, errors="ignore")

            original_df["New_Measurement"] = original_df["Measurem."] % 1000

            bins = np.arange(start_interval, end_interval+1, 300)
            labels = [f"{start}~{start+300}" for start in bins[:-1]]  # 구간 라벨링

            # 구간 나누기 및 컬럼 추가
            original_df["TimeGroup"] = pd.cut(original_df["t(Entry)"], bins=bins, labels=labels, right=False)

            # 평균속도
            speed_df = speed_mean(original_df)

            # 밀도
            density_df = density_mean(speed_df)

            # 가중 중차량 비율
            heavy_df = heavy_rate(original_df)

            # 진입 포화도
            entry_saturation_df = entry_saturation(original_df)

            # 램프 유출입 비율
            rfr_df = rfr_rate(original_df)

            # 진출 정상성
            normality_df = output_normality(original_df)

            # STVM 계산
            stvm_df = calculate_stvm(speed_df, density_df, heavy_df, entry_saturation_df, rfr_df, normality_df)

            # Z-Score 계산
            z_score_df = calculate_z_score(stvm_df)

            excel_folder_path = os.path.join(folder_path, "STVM")
            os.makedirs(excel_folder_path, exist_ok=True)
            excel_file_path = os.path.join(excel_folder_path, "STVM2.xlsx")
            z_score_df.to_excel(excel_file_path, index=True)