In [17]:
from ProcessBasic import *
def flatten_directional_traffic_df(df: pd.DataFrame) -> pd.DataFrame:
    """
    將具有 MultiIndex 欄位（方向性交通量）的資料表展平為 long-form。
    自動辨識方向欄位與時間欄位（起、迄）。
    
    Parameters:
        df (pd.DataFrame): 原始的 MultiIndex 欄位資料表
        
    Returns:
        pd.DataFrame: 展平後含方向欄的資料表
    """
    if not isinstance(df.columns, pd.MultiIndex):
        raise ValueError("輸入的欄位不是 MultiIndex 結構")

    # 所有第一層欄位
    all_lv0 = df.columns.get_level_values(0).unique()

    # 自動找出方向欄（排除 起 與 迄）
    direction_cols = [col for col in all_lv0 if col not in ['起', '迄']]

    # 自動找出「起」、「迄」欄位（不假設第二層）
    col_起 = next(col for col in df.columns if col[0] == '起')
    col_迄 = next(col for col in df.columns if col[0] == '迄')

    df_list = []

    for direction in direction_cols:
        temp = df[direction].copy()
        temp['方向'] = direction
        temp['起'] = df[col_起]
        temp['迄'] = df[col_迄]
        df_list.append(temp)

    df_flat = pd.concat(df_list, ignore_index=True)

    # 欄位排序
    ordered_cols = ['方向', '起', '迄'] + [col for col in df_flat.columns if col not in ['方向', '起', '迄']]
    return df_flat[ordered_cols]

def primily_organized(filelist):
    '''讀取交通量檔案檔案清單，並整合成美5分鐘一筆的表格'''
    outputdf = []
    for file in filelist:
        surveynumber = os.path.basename(file).split(".")[0] #讀取檔名進行命名(因xlsx內中並沒有可以協助判斷的部分)
        surveyplace = os.path.basename(file).split(".")[1]

        sheetnamelist = ['平日','假日']
        for sheetname in sheetnamelist:
            df = pd.read_excel(file, header=[0,1], sheet_name=sheetname)
            df.rename(columns = {'Unnamed: 0_level_0':'起'}, inplace=True)
            df.rename(columns = {'Unnamed: 1_level_0':'迄'}, inplace=True)

            # 直接從第二層尋找欄位名稱為 1.8 的位置，在此之前才是調查表格
            drop_from = next(i for i, col in enumerate(df.columns) if col[1] == 1.8)
            df = df.iloc[:, :drop_from] # 使用 iloc 保留 drop_from 之前的所有欄位

            df_flat = flatten_directional_traffic_df(df)
            df_flat['原始資料'] = get_filename_withoutprojectname(file)
            df_flat['調查計畫書點位編號'] = surveynumber
            df_flat['調查路段'] = surveyplace
            df_flat['快慢車道'] = df_flat['方向'].apply(lambda x: '慢車道' if '慢車道' in x else '快車道')
            df_flat['方向'] = df_flat['方向'].str.split('-').str[0]
            df_flat['平假日'] = sheetname
            for column in ['平假日', '方向', '快慢車道', '調查路段', '調查計畫書點位編號']:
                df_flat = move_column(df = df_flat, column_name = column, insert_index=0)
            
            outputdf.append(df_flat)
    
    output = pd.concat(outputdf)

    return output

def getPCU(df):
    PCE = {'聯結車':3.0, 
        '大貨車':1.8,
        '大客車':1.8,
        '小型車':1.0,
        '機車':0.42,
        '自行車':0.42}

    df['Volume'] = df[['聯結車', '大貨車', '大客車(客運)', '遊覽車', '小型車', '機車']].sum(axis=1)


    df['聯結車PCU'] = df['聯結車'] * PCE['聯結車']
    df['大貨車PCU'] = df['大貨車'] * PCE['大貨車']
    df['大客車PCU'] = (df['大客車(客運)'] + df['遊覽車']) * PCE['大客車']
    df['小型車PCU'] = df['小型車'] * PCE['小型車']
    df['機車PCU'] = df['機車'] * PCE['機車']

    df['PCU'] = df[['聯結車PCU', '大貨車PCU', '大客車PCU', '小型車PCU', '機車PCU']].sum(axis=1)
    df = move_column(df = df, column_name="原始資料", insert_index=-1)

    return df 

def aggregate_by_window(df, window_size, starttimecolumn , endtimecolumn, group_cols = ['檔案', '平假日', '方向'], sum_cols = ['PCU'], drop = False):
    '''Alignment

    Args:
        df (pd.DataFrame): 要操作的 DataFrame。
        window_size (int): 每幾格一起換算。
        starttimecolumn (str): 開始時間。
        endtimecolumn (str): 截止時間。
        group_cols(list) : 需要用來分組的欄位。
        sum_cols(list)：需要計算的欄位，預設是PCU一欄。

    Returns:
        pd.DataFrame: 調整後的 DataFrame。
    
    '''

    df_result = []

    # 確保「起」時間是可排序的
    df[starttimecolumn] = pd.to_datetime(df[starttimecolumn].astype(str), format='%H:%M:%S', errors='coerce').dt.time
    df[endtimecolumn] = pd.to_datetime(df[endtimecolumn].astype(str), format='%H:%M:%S', errors='coerce').dt.time
    df = df.sort_values(group_cols + [starttimecolumn]).reset_index(drop=True)

    # 分組
    
    grouped = df.groupby(group_cols)

    # 對每組做 rolling window 加總
    for group_keys, group_df in grouped:
        group_df = group_df.reset_index(drop=True)
        for i in range(len(group_df) - window_size + 1):
            window = group_df.iloc[i:i+window_size]

            # 基本欄位
            agg_row = {col: key for col, key in zip(group_cols, group_keys)}

            # 時間欄位處理
            agg_row[starttimecolumn] = window.iloc[0][starttimecolumn].strftime('%H:%M:%S')
            agg_row[endtimecolumn] = window.iloc[-1][endtimecolumn] if pd.notna(window.iloc[-1][endtimecolumn]) else None

            for col in sum_cols:
                agg_row[col] = window[col].sum()

            df_result.append(agg_row)
    
    df_result = pd.DataFrame(df_result)
    
    if drop:
        df_result = (
            df_result
            .sort_values(by=sum_cols, ascending=[False] * len(sum_cols))
            .drop_duplicates(subset=group_cols)
            .sort_values(by=group_cols)
            .reset_index(drop=True)
        )
        

    return df_result

def hourlyformat(df):
    dfhour = df.copy()
    dfhour['小時'] = pd.to_datetime(dfhour['起'].astype(str), format='%H:%M:%S', errors='coerce').dt.hour
    dfhour = move_column(dfhour, '小時', 5)
    sum_cols = ['聯結車', '大貨車', '大客車(客運)', '遊覽車', '小型車', '機車', '自行車&行人',
                'Volume', '聯結車PCU', '大貨車PCU', '大客車PCU', '小型車PCU', '機車PCU', 'PCU']

    dfhour = (
        dfhour
        .groupby(['調查計畫書點位編號', '調查路段', '快慢車道', '方向', '平假日', '小時'])
        .agg({**{col: 'sum' for col in sum_cols}, '原始資料': 'first'})
        .reset_index()
    )


    dfhour_peak = dfhour.copy()
    dfhour_peak['尖峰小時'] = "*"
    dfhour_peak = dfhour_peak.sort_values(['PCU', '小型車PCU', '大客車PCU', '大貨車PCU', '聯結車PCU'], ascending=[False, False, False, False , False]).drop_duplicates(subset = ['調查計畫書點位編號', '調查路段', '快慢車道', '方向', '平假日'])

    dfhour = pd.merge(dfhour, dfhour_peak, on=dfhour.columns.to_list(), how='left')

    dfhour = move_column(dfhour, "原始資料", -1)

    return dfhour

def dailyformat(dfhour):
    groupbycolumns = ['調查計畫書點位編號', '調查路段', '快慢車道', '方向', '平假日']
    sumcolumns = ['聯結車', '大貨車', '大客車(客運)','遊覽車', '小型車', '機車', '自行車&行人', 'Volume', '聯結車PCU', '大貨車PCU', '大客車PCU', '小型車PCU', '機車PCU', 'PCU']
    firstcolumns = ['原始資料']
    # dfhour.groupby(['調查計畫書點位編號', '調查路段', '快慢車道', '方向', '平假日']).agg()
    dfdaily = (
        dfhour.groupby(groupbycolumns)
        .agg({**{col: 'sum' for col in sumcolumns},
                **{col: 'first' for col in firstcolumns}})
        .reset_index()
    )
    dfdaily['機車比例'] = dfdaily['機車'] / dfdaily['Volume']
    dfdaily = move_column(df = dfdaily, column_name="機車比例", insert_index=dfdaily.columns.get_loc("Volume") + 1) 
    return dfdaily  

def speed_primily_organized(filelist):
    dfs = []
    for file in filelist:

        surveynumber = os.path.basename(file)[:5] #讀取檔名進行命名(因xlsx內中並沒有可以協助判斷的部分)
        sheetnamelist = ['資料分析(平日晨峰)', '資料分析(平日昏峰)', '資料分析(假日晨峰)', '資料分析(假日昏峰)']
        # sheetname = sheetnamelist[0]
        for sheetname in sheetnamelist:

            speedlimit = read_specific_data(excelfilepath=file, sheetname=sheetname, cell='Y3')

            columnslist = ['路段編號', '路口起點', '路口迄點', '路線長度(公尺)', 
                        '旅行速率(公里/小時)', '行駛速率(公里/小時)', 
                        '旅行時間(秒)', '行駛時間(秒)', '延滯時間(秒)',
                        '路段延滯 (秒)_阻塞', '路段延滯 (秒)_公車停靠', '路段延滯 (秒)_計程車停靠', '路段延滯 (秒)_路邊停靠', '路段延滯 (秒)_行人穿越', '路段延滯 (秒)_其他',
                        '路口延滯 (秒)_紅燈', '路口延滯 (秒)_左轉同向', '路口延滯 (秒)_左轉對向', '路口延滯 (秒)_右轉', '路口延滯 (秒)_橫越車輛', '路口延滯 (秒)_行人', '路口延滯 (秒)_其他']


            df1 = pd.read_excel(file, skiprows=3, sheet_name=sheetname, header=None)
            direction1 = read_specific_data(file, sheetname, 'B2')
            df1 = df1.iloc[:19,:22]
            df1.columns = columnslist  # 手動指定欄位名稱
            df1 = df1.sort_values(['路段編號'])
            df1['速率調查編號'] = surveynumber
            df1['方向'] = direction1

            df2 = pd.read_excel(file, skiprows=27, sheet_name=sheetname, header=None) 
            direction2 = read_specific_data(file, sheetname, 'B26')
            df2 = df2.iloc[:19,:22]
            df2.columns = columnslist  # 手動指定欄位名稱
            df2 = df2.sort_values(['路段編號'])
            df2['速率調查編號'] = surveynumber
            df2['方向'] = direction2

            df = pd.concat([df1, df2])
            df['速限'] = speedlimit
            df['分頁'] = sheetname
            df['晨昏峰'] = df['分頁'].apply(lambda x: '晨峰' if '晨峰' in x else '昏峰')
            df['平假日'] = df['分頁'].apply(lambda x: '平日' if '平日' in x else '假日')
            df = move_column(df, '方向', 0)
            df = move_column(df, '晨昏峰', 0)
            df = move_column(df, '平假日', 0)
            df = move_column(df, '速率調查編號', 0)
            df['原始資料'] = get_filename_withoutprojectname(file)

            df = df[(df['路線長度(公尺)'].notna()) & (df['路線長度(公尺)'] != 0)]
            dfs.append(df)

    output = pd.concat(dfs)
    return output

def filter_dfspeed(df):
    # 這個部分是因為這次他計算的路段並非全部採用，所以會有這一段
    speedinfo = pd.read_excel(os.path.abspath(os.path.join(get_projectfolderpath(), 'Technical', '06_交通量調查', '01_交通量調查計畫', '交通調查點位盤點.xlsx')), sheet_name='速率路段')
    speedinfo = speedinfo[['速率調查編號', '迄點編號']]
    dfspeed_filterd = pd.merge(df, speedinfo, on = '速率調查編號')
    dfspeed_filterd = dfspeed_filterd[dfspeed_filterd['路段編號'] < dfspeed_filterd['迄點編號']]

    dfspeed_filterd.drop(columns = '迄點編號', inplace=True)

    return dfspeed_filterd

def speedcalculate(df):
    
    groupbycolumns = ['速率調查編號', '平假日', '晨昏峰', '方向']
    sumcolumns = ['路線長度(公尺)','旅行時間(秒)', '行駛時間(秒)', '延滯時間(秒)', '路段延滯 (秒)_阻塞', '路段延滯 (秒)_公車停靠', '路段延滯 (秒)_計程車停靠', '路段延滯 (秒)_路邊停靠', '路段延滯 (秒)_行人穿越', '路段延滯 (秒)_其他', '路口延滯 (秒)_紅燈', '路口延滯 (秒)_左轉同向', '路口延滯 (秒)_左轉對向', '路口延滯 (秒)_右轉', '路口延滯 (秒)_橫越車輛', '路口延滯 (秒)_行人', '路口延滯 (秒)_其他']
    firstcolumns = ['速限', '原始資料', '分頁']
    df_sum = (
        df.groupby(groupbycolumns)
        .agg({**{col: 'sum' for col in sumcolumns},
                **{col: 'first' for col in firstcolumns}})
        .reset_index()
    )
    df_sum['旅行速率(公里/小時)'] = (df_sum['路線長度(公尺)']/df_sum['旅行時間(秒)']) * 3.6
    df_sum['行駛速率(公里/小時)'] = (df_sum['路線長度(公尺)']/df_sum['行駛時間(秒)']) * 3.6

    outputcolumns = groupbycolumns + ['路線長度(公尺)','旅行時間(秒)', '行駛時間(秒)', '延滯時間(秒)','旅行速率(公里/小時)', '行駛速率(公里/小時)'] + firstcolumns
    df_sum = df_sum.reindex(columns=outputcolumns)

    df_sum = get_VL1(df_sum, Vcolumn='旅行速率(公里/小時)', VLimitcolumn = '速限')
    df_sum = get_VL2(df_sum, Vcolumn='旅行速率(公里/小時)', VLimitcolumn = '速限')
    df_sum = move_column(df_sum, '原始資料', len(df_sum.columns) - 1)
    df_sum = move_column(df_sum, '分頁', len(df_sum.columns) - 1)
    
    return df_sum

def main ():
    # Step0 定義資料夾
    datafolder = os.path.abspath(os.path.join(get_projectfolderpath(), 'Technical', '06_交通量調查', '02_原始資料'))
    volumefolder = os.path.abspath(os.path.join(datafolder, '01_路段交通量'))
    speedfolder = os.path.abspath(os.path.join(datafolder, '02_路段旅行速率'))

    initialfolder = create_folder(os.path.join(os.getcwd(), '..', '01_資料初步彙整'))
    volume_initialfolder = create_folder(os.path.join(initialfolder, '01_路段交通量'))
    speed_initialfolder = create_folder(os.path.join(initialfolder, '02_路段旅行速率'))

    # Step1 彙整路段交通量資料
    files = findfiles(volumefolder,'xlsx')
    dfvolume = primily_organized(filelist=files)
    dfvolume = getPCU(dfvolume) # 計算PCU
    dfhour = hourlyformat(dfvolume) #轉為分時資料
    dfdaily = dailyformat(dfhour)
    dfpeak = aggregate_by_window(df=dfvolume, window_size=4, 
                                starttimecolumn='起', endtimecolumn='迄', 
                                group_cols=['調查計畫書點位編號', '調查路段', '快慢車道', '方向', '平假日'], 
                                drop=True)

    volumeoutputpath = os.path.abspath(os.path.join(volume_initialfolder, '路段交通量資料彙整.xlsx'))
    with pd.ExcelWriter(volumeoutputpath, engine='xlsxwriter') as writer:
        dfvolume.to_excel(writer, sheet_name='交通量原始資料(每五分鐘一筆)', index=False)
        dfpeak.to_excel(writer, sheet_name='尖峰時段PCU(每五分鐘一筆)', index=False)
        dfhour.to_excel(writer, sheet_name='分時交通量', index=False)
        dfdaily.to_excel(writer, sheet_name='全日交通量', index=False)

    for sheetname in get_excel_sheet_names(volumeoutputpath):
        reformat_excel(excel_path=volumeoutputpath, sheetname=sheetname)

    # Step2 彙整路段旅行速率資料
    files = findfiles(speedfolder,'xlsx')
    dfspeedoriginal = speed_primily_organized(files)
    dfspeedselect  = filter_dfspeed(dfspeedoriginal)
    dfspeed = speedcalculate(dfspeedselect)

    speedoutputpath = os.path.abspath(os.path.join(speed_initialfolder, '旅行速率資料彙整.xlsx'))
    with pd.ExcelWriter(speedoutputpath, engine='xlsxwriter') as writer:
        dfspeedoriginal.to_excel(writer, sheet_name='旅行速率資料彙整(三趟平均)', index=False)
        dfspeedselect.to_excel(writer, sheet_name='採用之路段資料彙整', index=False)
        dfspeed.to_excel(writer, sheet_name='尖峰速率', index=False)

    for sheetname in get_excel_sheet_names(speedoutputpath):
        reformat_excel(excel_path=speedoutputpath, sheetname=sheetname)

if __name__ == '__main__':
    main()