# M03A

M03A資料可作為統計通過門架ID對應的**通過量**  
可以分析路段的道路服務水準

## Setup

In [1]:
import os
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import subprocess
import shutil
import tarfile
import xml.etree.ElementTree as ET



In [68]:
def create_folder(folder_name):
    """建立資料夾"""
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
    return os.path.abspath(folder_name)

def delete_folders(deletelist):
    """
    刪除資料夾
    deletelist(list):需要為皆為路徑的list
    """
    for folder_name in deletelist: 
        if os.path.exists(folder_name): # 檢查資料夾是否存在
            shutil.rmtree(folder_name) # 刪除資料夾及其內容
        else:
            print(f"資料夾 '{folder_name}' 不存在。")

def getdatelist(time1, time2):
    '''
    建立日期清單
    time1、time2(str):為%Y-%M-%D格式的日期字串
    '''
    if time1 > time2:
        starttime = time2
        endtime = time1
    else:
        starttime = time1
        endtime = time2

    date_range = pd.date_range(start=starttime, end=endtime)
    datelist = [d.strftime("%Y%m%d") for d in date_range]
    return datelist

def freewaydatafolder(datatype):
    savelocation = create_folder(os.path.join(os.getcwd(), datatype))
    rawdatafolder = create_folder(os.path.join(savelocation, '0_rawdata'))
    mergefolder = create_folder(os.path.join(savelocation, '1_merge'))
    excelfolder = create_folder(os.path.join(savelocation, '2_excel'))
    return savelocation, rawdatafolder, mergefolder, excelfolder

def delete_folders_permanently(deletelist):
    """
    永久刪除資料夾及其內容，不放入資源回收筒
    deletelist (list): 需要刪除的資料夾路徑列表
    """
    for item in deletelist:
        if os.path.isdir(item):  # 檢查是否為資料夾
            try:
                shutil.rmtree(item)  # 永久刪除資料夾
                print(f"已永久刪除資料夾： {item}")
            except OSError as e:
                print(f"刪除資料夾 {item} 時發生錯誤： {e}")
        elif os.path.isfile(item):  # 檢查是否為檔案
            try:
                os.remove(item)  # 永久刪除檔案
                print(f"已永久刪除檔案： {item}")
            except OSError as e:
                print(f"刪除檔案 {item} 時發生錯誤： {e}")
        else:
            print(f"{item} 不是檔案或資料夾。")

def download_etag(etagurl, etagdownloadpath):
    """
    下載指定網址的 XML 檔案到指定位置。

    Args:
        etagurl (str): 要下載的 XML 檔案網址。
        etagdownloadpath (str): 檔案下載後的儲存路徑（包含檔案名稱）。
    """

    try:
        response = requests.get(etagurl, stream=True)
        response.raise_for_status()  # 檢查 HTTP 狀態碼，如有錯誤則拋出異常

        with open(etagdownloadpath, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

    except requests.exceptions.RequestException as e:
        print(f"下載時發生錯誤：{e}")
    except Exception as e:
        print(f"發生錯誤：{e}")

def read_xml(xml_file_path):
    """
    讀取並解析 XML 檔案。

    Args:
        xml_file_path (str): XML 檔案路徑。

    Returns:
        ElementTree.Element: XML 文件的根節點。
        None: 如果解析失敗。
    """
    # try:
    #     tree = ET.parse(xml_file_path)
    #     root = tree.getroot()
    #     return root
    try:
        with open(xml_file_path, 'r', encoding='utf-8') as f:  # 指定編碼
            xml_content = f.read()
        return xml_content
    except FileNotFoundError:
        print(f"檔案未找到：{xml_file_path}")
        return None
    except ET.ParseError as e:
        print(f"解析 XML 檔案時發生錯誤：{e}")
        return None

def etag_xml_to_dataframe(xml_content):
    """
    將 XML 內容轉換為 Pandas DataFrame。

    Args:
        xml_content (str): XML 內容字串。

    Returns:
        pandas.DataFrame: 轉換後的 DataFrame。
        None: 如果解析失敗。
    """
    try:
        root = ET.fromstring(xml_content)  # 從字串解析 XML

        data = []
        for etag in root.findall('.//{http://traffic.transportdata.tw/standard/traffic/schema/}ETag'):
            etag_data = {}
            for element in etag:
                tag_name = element.tag.split('}')[-1]  # 去除命名空間
                if tag_name == 'RoadSection':  # 處理 RoadSection
                    for section_element in element:
                        etag_data[section_element.tag] = section_element.text
                else:
                    etag_data[tag_name] = element.text
            data.append(etag_data)

        df = pd.DataFrame(data)
        df.columns = ['ETagGantryID','LinkID', 'LocationType', 'PositionLon', 'PositionLat', 'RoadID', 'RoadName', 'RoadClass', 'RoadDirection', 'Start','End', 'LocationMile']
        return df

    except ET.ParseError as e:
        print(f"解析 XML 內容時發生錯誤：{e}")
        return None
    except Exception as e:
        print(f"發生錯誤：{e}")
        return None

def etag_getdf():
    etagfolder = create_folder(os.path.join(os.getcwd(), 'ETag'))
    etagurl = 'https://tisvcloud.freeway.gov.tw/history/motc20/ETag.xml'
    etagdownloadpath = os.path.join(etagfolder, 'ETag.xml')
    download_etag(etagurl=etagurl, etagdownloadpath=etagdownloadpath)
    etagxml = read_xml(etagdownloadpath)
    etag = etag_xml_to_dataframe(etagxml)
    return etag

def extract_tar_gz(tar_gz_file, extract_path):
    try:
        with tarfile.open(tar_gz_file, 'r:gz') as tar:
            tar.extractall(path=extract_path)
    except Exception as e:
        print(f"解壓縮 {tar_gz_file} 失敗：{e}")

def download_and_extract(url, datatype, date, downloadfolder, keep = False):
    '''針對高公局交通資料庫的格式進行下載'''
    downloadurl = f"{url}/{datatype}_{date}.tar.gz"
    destfile = os.path.join(downloadfolder, f"{datatype}_{date}.tar.gz")

    response = requests.get(downloadurl)
    with open(destfile, 'wb') as file:
        file.write(response.content)

    extractpath = create_folder(os.path.join(downloadfolder, date))
    extract_tar_gz(destfile, extractpath)
    if keep == False:
        os.remove(destfile)

    return extractpath

def findfiles(filefolderpath, filetype='.csv'):
    """
    尋找指定路徑下指定類型的檔案，並返回檔案路徑列表。

    Args:
        filefolderpath (str): 指定的檔案路徑。
        filetype (str, optional): 要尋找的檔案類型，預設為 '.csv'。

    Returns:
        list: 包含所有符合條件的檔案路徑的列表。
    """

    filelist = []  # 建立一個空列表來儲存檔案路徑

    # 使用 os.walk 遍歷資料夾及其子資料夾
    for root, _, files in os.walk(filefolderpath):
        for file in files:
            if file.endswith(filetype):  # 檢查檔案是否以指定類型結尾
                file_path = os.path.join(root, file)  # 建立完整的檔案路徑
                filelist.append(file_path)  # 將檔案路徑添加到列表中

    return filelist

def combinefile(filelist, datatype='M03A'):
    """
    更有效率地合併多個CSV檔案。

    Args:
        filelist (list): 包含CSV檔案路徑的列表。
        datatype (str, optional): 資料類型，決定欄位名稱。預設為 'M03A'。

    Returns:
        pandas.DataFrame: 合併後的DataFrame。
    """

    # 使用字典來映射資料類型和欄位名稱，避免重複的 if/elif 判斷
    column_mapping = {
        'M03A': ['TimeStamp', 'GantryID', 'Direction', 'VehicleType', 'Volume'],
        'M04A': ['TimeStamp', 'GantryFrom', 'GantryTo', 'VehicleType', 'TravelTime', 'Volume'],
        'M05A': ['TimeStamp', 'GantryFrom', 'GantryTo', 'VehicleType', 'Speed', 'Volume'],
        'M06A': ['VehicleType', 'DetectionTimeO', 'GantryO', 'DetectionTimeD', 'GantryD', 'TripLength', 'TripEnd', 'TripInformation'],
        'M07A': ['TimeStamp', 'GantryO', 'VehicleType', 'AverageTripLength', 'Volume'],
        'M08A': ['TimeStamp', 'GantryO', 'GantryD', 'VehicleType', 'Trips']
    }

    columns = column_mapping.get(datatype)  # 使用 get() 方法，如果找不到鍵，會返回 None
    if columns is None:
        raise ValueError(f"未知的資料類型：{datatype}")

    combineddf = pd.concat(
        (pd.read_csv(i, header=None, names=columns) for i in filelist),  # 使用生成器表達式
        ignore_index=True  # 避免重複的索引
    )

    return combineddf

def THI_M03A(df):
    df = df.pivot(index=['TimeStamp', 'GantryID', 'Direction'], columns='VehicleType', values='Volume').reset_index()
    df = df.rename(columns = {
        5 : 'Vol_Trail',
        31 : 'Vol_Car',
        32 : 'Vol_Truck',
        41 : 'Vol_TourBus',
        42 : 'Vol_BTruck'
    })
    df = df.reindex(columns = ['TimeStamp', 'GantryID', 'Direction', 'Vol_Trail', 'Vol_Car', 'Vol_Truck', 'Vol_TourBus', 'Vol_BTruck'])

    df['TimeStamp'] = pd.to_datetime(df['TimeStamp'])

    df['Date'] = df['TimeStamp'].dt.date
    df['Hour'] = df['TimeStamp'].dt.hour

    df = df.groupby(['Date','Hour','GantryID','Direction']).agg({
            'Vol_Trail':'sum',
            'Vol_Car':'sum', 
            'Vol_Truck':'sum',
            'Vol_TourBus':'sum',
            'Vol_BTruck':'sum'}).reset_index()
    return df

def THI_M05A(df, weighted = False):
    
    # 將每5分鐘的資料，轉為分時資料
    df['TimeStamp'] = pd.to_datetime(df['TimeStamp'])
    df['Date'] = df['TimeStamp'].dt.date
    df['Hour'] = df['TimeStamp'].dt.hour

    df = df[df['Volume']!=0] # 需要避開Volume 為0的資料

    if weighted == True:
        df['Speed_time_volume'] = df['Speed'] * df['Volume']
        df = df.groupby(['Date', 'Hour', 'GantryFrom', 'GantryTo', 'VehicleType']).agg({'Speed_time_volume':'sum', 'Volume':'sum'}).reset_index()
        df['Speed'] = df['Speed_time_volume'] / df['Volume']
    else :
        df = df.groupby(['Date', 'Hour', 'GantryFrom', 'GantryTo', 'VehicleType']).agg({'Speed':'mean'}).reset_index()
    
    
    df['Speed'] = df['Speed'].round(3)
    df = df.pivot(index=['Date', 'Hour', 'GantryFrom', 'GantryTo'], columns='VehicleType', values='Speed').reset_index()
    df = df.rename(columns = {
        5 : 'Speed_Trail',
        31 : 'Speed_Car',
        32 : 'Speed_Truck',
        41 : 'Speed_TourBus',
        42 : 'Speed_BTruck'
    })

    df = df.fillna(0)
    df = df.reindex(columns = ['Date', 'Hour', 'GantryFrom', 'GantryTo', 'Speed_Trail', 'Speed_Car', 'Speed_Truck', 'Speed_TourBus', 'Speed_BTruck'])
    
    return df

In [None]:
# 還沒有完成的
def THI_M06A(df):

In [67]:
def THI_process(df, datatype, weighted = False):
    if datatype == 'M03A':
        df = THI_M03A(df)
    elif datatype == 'M05A':
        df = THI_M05A(df, weighted = weighted)
    return df

In [20]:
def M03A_Tableau_combined(folder , etag):
    allfiles = findfiles(filefolderpath=folder, filetype='.xlsx')
    combineddf = pd.concat(
        (pd.read_excel(i) for i in allfiles),  # 使用生成器表達式
        ignore_index=True  # 避免重複的索引
    )

    combineddf['Day'] = combineddf["Date"].dt.day_name() #生成星期幾

    combineddf = pd.merge(combineddf,etag[['ETagGantryID', 'RoadName','Start', 'End']].rename(columns = {'ETagGantryID':'GantryID'}) , on = 'GantryID')
    combineddf['RoadSection'] = combineddf['Start'] + '-' + combineddf['End']

    outputfolder = create_folder(os.path.join(folder, '..', '03_TableauData'))
    combineddf.to_csv(os.path.join(outputfolder, 'M03A.csv'), index=False)

In [5]:
'''
待改進
def combinefile(filelist, datatype = 'M03A'):
    if datatype == 'M03A':
        columns = ['TimeStamp', 'GantryID', 'Direction', 'VehicleType', 'Volume']
    elif datatype == 'M04A':
        columns = ['TimeStamp', 'GantryFrom', 'GantryTo', 'VehicleType', 'TravelTime', 'Volume']
    elif datatype == 'M05A':
        columns = ['TimeStamp', 'GantryFrom', 'GantryTo', 'VehicleType', 'Speed', 'Volume']
    elif datatype == 'M06A':
        columns = ['VehicleType', 'DetectionTimeO', 'GantryO',  'DetectionTimeD', 'GantryD', 'TripLength', 'TripEnd', 'TripInformation']
    elif datatype == 'M07A':
        columns = ['TimeStamp', 'GantryO', 'VehicleType', 'AverageTripLength', 'Volume']
    elif datatype == 'M08A':
        columns = ['TimeStamp',	'GantryO', 'GantryD', 'VehicleType', 'Trips']
    
    combineddf = []
    for i in filelist:
        df = pd.read_csv(i, header=None)
        df.columns = columns
        combineddf.append(df)
    combineddf = pd.concat(combineddf)

    return combineddf
'''

"\n待改進\ndef combinefile(filelist, datatype = 'M03A'):\n    if datatype == 'M03A':\n        columns = ['TimeStamp', 'GantryID', 'Direction', 'VehicleType', 'Volume']\n    elif datatype == 'M04A':\n        columns = ['TimeStamp', 'GantryFrom', 'GantryTo', 'VehicleType', 'TravelTime', 'Volume']\n    elif datatype == 'M05A':\n        columns = ['TimeStamp', 'GantryFrom', 'GantryTo', 'VehicleType', 'Speed', 'Volume']\n    elif datatype == 'M06A':\n        columns = ['VehicleType', 'DetectionTimeO', 'GantryO',  'DetectionTimeD', 'GantryD', 'TripLength', 'TripEnd', 'TripInformation']\n    elif datatype == 'M07A':\n        columns = ['TimeStamp', 'GantryO', 'VehicleType', 'AverageTripLength', 'Volume']\n    elif datatype == 'M08A':\n        columns = ['TimeStamp',\t'GantryO', 'GantryD', 'VehicleType', 'Trips']\n    \n    combineddf = []\n    for i in filelist:\n        df = pd.read_csv(i, header=None)\n        df.columns = columns\n        combineddf.append(df)\n    combineddf = pd.concat(comb

## 需要調整的參數

In [26]:
# ===== Step 0: 手動需要調整的參數 =====

# 需要調整的項目有2個
# 1. 調整需要確認下載的資料型態是什麼
# datatype = "M03A"  # Data type (e.g., M03A, M06A, M05A) 

# 2. 調整下載的資料區間
starttime = "2024-09-10"
endtime = "2024-09-10"
datelist = getdatelist(endtime,starttime) # 下載的時間區間清單

# 建立後續要處理儲存資料的資料夾位置
# savelocation, rawdatafolder, mergefolder, excelfolder = freewaydatafolder(datatype=datatype)
# savelocation = create_folder(os.path.join(os.getcwd(), datatype))
# rawdatafolder = create_folder(os.path.join(savelocation, '0_rawdata'))
# mergefolder = create_folder(os.path.join(savelocation, '1_merge'))
# excelfolder = create_folder(os.path.join(savelocation, '2_excel'))
# basicurl = "https://tisvcloud.freeway.gov.tw/history/TDCS/"
# url = basicurl + datatype

## 程式執行

In [78]:
def freeway(datatype, datelist, Tableau = False, etag = None):
    savelocation, rawdatafolder, mergefolder, excelfolder = freewaydatafolder(datatype=datatype)
    url = "https://tisvcloud.freeway.gov.tw/history/TDCS/" + datatype

    for date in datelist :
        # 1. 下載並解壓縮
        dowloadfilefolder = download_and_extract(url = url, datatype = datatype, date = date, downloadfolder = rawdatafolder)

        # 2. 合併
        filelist = findfiles(filefolderpath=dowloadfilefolder, filetype='.csv')
        df = combinefile(filelist=filelist, datatype=datatype)
        mergeoutputfolder = create_folder(os.path.join(mergefolder, date)) # 建立相同日期的資料夾進行處理
        df.to_csv(os.path.join(mergeoutputfolder, f'{date}.csv') , index = False) # 輸出整併過的csv
        delete_folders([dowloadfilefolder]) #回頭刪除解壓縮過的資料

        # # 3. 處理
        # df = THI_process(df, datatype=datatype)
        # df.to_excel(os.path.join(excelfolder, f'{date}.xlsx'), index = False, sheet_name = date)
    
    if Tableau == True:
        if datatype == 'M03A':
            M03A_Tableau_combined(folder=excelfolder, etag = etag)

    return df

In [95]:
etag = etag_getdf()
etag['Direction'] = etag['ETagGantryID'].str[-1]

In [79]:
df = freeway(datatype = 'M06A', datelist = datelist)

In [97]:
etag[['ETagGantryID', 'LocationMile']]

Unnamed: 0,ETagGantryID,LocationMile
0,03F2899N,289K+900
1,05F0000S,0K+000
2,03F2306N,230K+600
3,01F3227N,322K+700
4,03F1991S,199K+100
...,...,...
334,01F2827S,282K+700
335,01F2394N,239K+400
336,03F1332N,133K+200
337,05F0439S,43K+900


In [92]:
etag_S = etag[etag['Direction'] == 'S'].sort_values(['RoadName','LocationMile'])

etag_N = etag[etag['Direction'] == 'N'].sort_values(['RoadName','LocationMile'])

In [94]:
etag_N

Unnamed: 0,ETagGantryID,LinkID,LocationType,PositionLon,PositionLat,RoadID,RoadName,RoadClass,RoadDirection,Start,End,LocationMile,Direction
318,01F0005N,0000100100050C,4,121.731636,25.11831,000010,國道1號,0,N,基隆,基隆端,0K+500,N
124,01F1045N,0000100110400J,4,120.9519,24.72713,000010,國道1號,0,N,頭份,新竹系統,104K+500,N
56,01F1123N,0000100111200K,4,120.90427,24.679317,000010,國道1號,0,N,頭屋,頭份,112K+300,N
307,01F1292N,0000100112900K,4,120.83893,24.550442,000010,國道1號,0,N,苗栗,頭屋,129K+200,N
48,01F1389N,0000100113800K,4,120.781906,24.48592,000010,國道1號,0,N,銅鑼,苗栗,138K+900,N
...,...,...,...,...,...,...,...,...,...,...,...,...,...
299,05F0287N,0000500102820G,4,121.789185,24.842714,000050,國道5號,0,N,頭城,坪林交控中心專用道,28K+700,N
60,05F0309N,0000500103100G,4,121.78635,24.823656,000050,國道5號,0,N,宜蘭(四城、大福),頭城,30K+900,N
323,05F0438N,0000500104400G,4,121.789474,24.711027,000050,國道5號,0,N,羅東,宜蘭(壯圍),43K+800,N
78,05F0528N,0000500105300G,4,121.80696,24.632729,000050,國道5號,0,N,蘇澳,羅東,52K+800,N


In [88]:
etag['Direction'].unique()

array(['N', 'S'], dtype=object)

In [86]:
etag.sort_values(['Direction', 'ETagGantryID'])

Unnamed: 0,ETagGantryID,LinkID,LocationType,PositionLon,PositionLat,RoadID,RoadName,RoadClass,RoadDirection,Start,End,LocationMile,Direction
318,01F0005N,0000100100050C,4,121.731636,25.11831,000010,國道1號,0,N,基隆,基隆端,0K+500,N
320,01F0017N,0000100100160C,4,121.72591,25.109568,000010,國道1號,0,N,八堵,基隆,1K+700,N
321,01F0029N,0000100100270C,4,121.71786,25.102837,000010,國道1號,0,N,大華系統,八堵,2K+900,N
322,01F0061N,0000100100600C,4,121.69364,25.088272,000010,國道1號,0,N,五堵,大華系統,6K+100,N
260,01F0099N,0000100100900F,4,121.659424,25.076044,000010,國道1號,0,N,汐止&汐止系統,五堵,9K+900,N
...,...,...,...,...,...,...,...,...,...,...,...,...,...
276,05F0287S,0000500002820G,4,121.789185,24.842714,000050,國道5號,0,S,坪林交控中心專用道,頭城,28K+700,S
62,05F0309S,0000500003100G,4,121.78635,24.823656,000050,國道5號,0,S,頭城,宜蘭(四城、大福),30K+900,S
337,05F0439S,0000500004400G,4,121.78946,24.710684,000050,國道5號,0,S,宜蘭(壯圍),羅東,43K+900,S
302,05F0494S,0000500004900G,4,121.79991,24.66258,000050,國道5號,0,S,羅東,蘇澳,49K+400,S


In [62]:
M05A = THI_M05A(df)

In [74]:
def main():
    etag = etag_getdf()
    freeway(datatype = 'M03A', datelist = datelist, Tableau = True, etag = etag)
    freeway(datatype = 'M05A', datelist = datelist)
    freeway(datatype = 'M06A', datelist = datelist)

In [71]:
if __name__ == '__main__':
    main()

In [70]:
delete_folders([os.path.join(os.getcwd(), 'M05A')])

# 以備不時之需

In [13]:
# def main():
#     etag = etag_getdf()

#     for date in datelist :
#         # 1. 下載並解壓縮
#         dowloadfilefolder = download_and_extract(url = url, datatype = datatype, date = date, downloadfolder = rawdatafolder)

#         # 2. 合併
#         filelist = findfiles(filefolderpath=dowloadfilefolder, filetype='.csv')
#         df = combinefile(filelist=filelist, datatype=datatype)
#         mergeoutputfolder = create_folder(os.path.join(mergefolder, date)) # 建立相同日期的資料夾進行處理
#         df.to_csv(os.path.join(mergeoutputfolder, f'{date}.csv') , index = False) # 輸出整併過的csv
#         delete_folders([dowloadfilefolder]) #回頭刪除解壓縮過的資料

#         # 3. 處理
#         df = THI_process(df, datatype=datatype)
#         df.to_excel(os.path.join(excelfolder, f'{date}.xlsx'), index = False, sheet_name = date)

#     # M03A_Tableau_combined(folder=excelfolder, etag = etag)
