In [1]:
import os
import geopandas as gpd
import pandas as pd 
from shapely.geometry import Point, LineString
from shapely import wkt # for WKT 轉幾何物件

In [2]:
# 00_setup_os處理函數
def create_folder(folder_name):
    """建立資料夾"""
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
    return os.path.abspath(folder_name)

def findfiles(filefolderpath, filetype='.csv', recursive=True):
    """
    尋找指定路徑下指定類型的檔案，並返回檔案路徑列表。

    Args:
        filefolderpath (str): 指定的檔案路徑。
        filetype (str, optional): 要尋找的檔案類型，預設為 '.csv'。
        recursive (bool, optional): 是否檢索所有子資料夾，預設為 True；反之為False，僅查找當前資料夾的所有file。

    Returns:
        list: 包含所有符合條件的檔案路徑的列表。
    """
    filelist = []

    if recursive:
        # 遍歷資料夾及其子資料夾
        for root, _, files in os.walk(filefolderpath):
            for file in files:
                if file.endswith(filetype):
                    file_path = os.path.join(root, file)
                    filelist.append(file_path)
    else:
        # 僅檢索當前資料夾
        for file in os.listdir(filefolderpath):
            file_path = os.path.join(filefolderpath, file)
            if os.path.isfile(file_path) and file.endswith(filetype):
                filelist.append(file_path)

    return filelist

def read_combined_dataframe(file_list, filepath = True):
    dataframes = []
    
    for file in file_list:
        try:
            if file.endswith('.csv'):
                df = pd.read_csv(file)
            elif file.endswith('.shp'):
                df = gpd.read_file(file)
            elif file.endswith(('.xls', '.xlsx')):
                df = pd.read_excel(file)
            else:
                print(f"Unsupported file format: {file}")
                continue
            if filepath:
                df['FilePath'] = file  # 添加來源檔案路徑欄位
            dataframes.append(df)
        except Exception as e:
            print(f"Error reading {file}: {e}")

    # 合併所有 DataFrame
    combined_df = pd.concat(dataframes, ignore_index=True)
    return combined_df

# 01_geodataframe 圖像處理

def dataframe_to_point(df, lon_col, lat_col, crs="EPSG:4326", target_crs="EPSG:3826"):
    '''
    Parameters:
    df (dataframe) : 含經緯度座標欄位的dataframe
    lon_col (str) : 緯度欄位
    Lat_col (str) : 經度欄位
    crs (str) : 目前經緯度座標的座標系統，常用的為4326(WGS84)、3826(TWD97)
    target_crs：目標轉換的座標系統
    '''

    # from shapely.geometry import Point
    # import pandas as pd
    # import geopandas as gpd
    # Create Point geometries from the longitude and latitude columns
    geometry = [Point(xy) for xy in zip(df[lon_col], df[lat_col])]
    # Create a GeoDataFrame with the original CRS
    gdf = gpd.GeoDataFrame(df, geometry=geometry, crs=crs)
    # Convert the GeoDataFrame to the target CRS
    gdf = gdf.to_crs(epsg=target_crs.split(":")[1])
    return gdf

def get_line(df, x1 = 'Lon_o', x2 = 'Lon_d', y1 = 'Lat_o', y2 = 'Lat_d', target_crs = "EPSG:3826"):
    '''
    Parameters:
    df (dataframe) : 含經緯度座標欄位的dataframe
    x1 (str) : 起點經度欄位
    y1 (str) : 起點緯度欄位
    x2 (str) : 迄點經度欄位
    y2 (str) : 迄點緯度欄位

    預設立場：輸出為wgs84轉換的經緯度點位
    '''
    # from shapely.geometry import LineString
    # import pandas as pd
    # import geopandas as gpd
    df['geometry'] = df.apply(lambda row: LineString([(row[x1], row[y1]), (row[x2], row[y2])]), axis=1)
    gdf = gpd.GeoDataFrame(df, geometry='geometry')
    # 設定座標系統 (假設 WGS 84 / EPSG:4326)
    gdf.set_crs(epsg=4326, inplace=True)
    gdf = gdf.to_crs(epsg=target_crs.split(":")[1])
    return gdf



In [3]:
def get_screenline_and_surveypoint(SLfolder):
    gdf_sl = gpd.read_file(os.path.join(SLfolder, 'TRTS5_屏柵線線型.shp'))
    gdf_sl['Category'] = gdf_sl['TRTS5'].apply(
        lambda x: 0 if 'CD' in x else 1 if 'SL' in x else None
    )
    gdf_sl['Number'] = (
        gdf_sl['TRTS5']
        .str.extract(r'(\d+)')
        .astype(int)
    )
    gdf_sl = gdf_sl.sort_values(['Category', 'Number'])

    gdf_sp = gpd.read_file(os.path.join(SLfolder, 'TRTS5_全部調查點位.shp'))
    gdf_sp = gdf_sp.rename(columns = {'X':'PositionLon', 'Y':'PositionLat'})
    gdf_sp[['TRTS5', 'SP_No']] = gdf_sp['Name'].str.split('-', expand=True)
    gdf_sp['SP_No'] = gdf_sp['SP_No'].astype('int64')
    gdf_sp['Category'] = gdf_sp['TRTS5'].apply(
        lambda x: 0 if 'CD' in x else 1 if 'SL' in x else None
    )
    gdf_sp['Number'] = (
        gdf_sp['TRTS5']
        .str.extract(r'(\d+)')
        .astype(int)
    )
    gdf_sp = gdf_sp.sort_values(['Category', 'Number', 'SP_No'])

    return gdf_sl, gdf_sp

def get_gdf_sp_unique(gdf_sp):
    '''因為調查點位同一個名稱的有太多資料調查來源：先取平均'''

    df_sp_unique = gdf_sp.sort_values(['Category','TRTS5', 'SP_No']).groupby(['Category','TRTS5', 'SP_No']).agg({'Name':'first','PositionLon':'mean', 'PositionLat':'mean', 'Surveyname':'first'}).reset_index()
    gdf_sp_unique = dataframe_to_point(df = df_sp_unique, lon_col = 'PositionLon', lat_col = 'PositionLat', crs="EPSG:4326", target_crs="EPSG:3826")
    

    return gdf_sp_unique

def get_df_seq(seqfolder):
    '''讀取公車站序並轉為geodataframe
    seqfolder(str):站序點位資料csv所在資料夾'''
    df_seq = read_combined_dataframe(file_list=findfiles(seqfolder))
    df_seq = df_seq.reindex(columns = ['RouteUID', 'SubRouteUID', 'SubRouteName_Zh','Direction','StopUID', 'StopSequence', 'PositionLon', 'PositionLat']).rename(columns = {'SubRouteName_Zh':'SRouteName'})
    df_seq = df_seq.drop_duplicates(subset= ['RouteUID', 'SubRouteUID','Direction', 'StopSequence', 'PositionLon', 'PositionLat'])
    if (len(df_seq[df_seq['StopSequence'].isna()]) / len(df_seq) < 0.05):
        df_seq = df_seq[df_seq['StopSequence'].notna()] # 會有沒有站序的
    else:
        percentage = ((df_seq[df_seq['StopSequence'].isna()]) / len(df_seq)) * 100
        print(f"===== 需要檢查沒有站序的站點有哪些，共有{percentage}%的資料沒有站序 =====")

    return df_seq

def get_gdfroutepair(df_seq):
    # 先排序，確保順序正確
    df_seq = df_seq.sort_values(
        by=['RouteUID', 'SubRouteUID', 'Direction', 'StopSequence']
    )

    # 建立 To 欄位（往下一站）
    df_seq['ToStop'] = df_seq.groupby(
        ['RouteUID', 'SubRouteUID', 'Direction']
    )['StopUID'].shift(-1)

    df_seq['ToSeq'] = df_seq.groupby(
        ['RouteUID', 'SubRouteUID', 'Direction']
    )['StopSequence'].shift(-1)

    df_seq['ToLon'] = df_seq.groupby(
        ['RouteUID', 'SubRouteUID', 'Direction']
    )['PositionLon'].shift(-1)

    df_seq['ToLat'] = df_seq.groupby(
        ['RouteUID', 'SubRouteUID', 'Direction']
    )['PositionLat'].shift(-1)

    # 建立 From 欄位（本站）
    df_seq['FromSeq'] = df_seq['StopSequence']
    df_seq['FromLon'] = df_seq['PositionLon']
    df_seq['FromLat'] = df_seq['PositionLat']
    df_seq['FromStop'] = df_seq['StopUID']

    # 只保留需要的欄位
    df_result = df_seq.reindex(columns = [
        'RouteUID', 'SubRouteUID', 'Direction',
        'FromStop', 'FromSeq', 'FromLon', 'FromLat',
        'ToStop', 'ToSeq', 'ToLon', 'ToLat'])

    # 移除最後一站（沒有 To）
    df_result = df_result.dropna(subset=['ToSeq'])

    df_result = df_result.reset_index(drop=True)

    gdf_result = get_line(df_result, x1 = 'FromLon', x2 = 'ToLon', y1 = 'FromLat', y2 = 'ToLat')

    return gdf_result

def buffer_distance(gdf, buffermeters):
    gdf = gdf.copy()
    gdf['geometry'] = gdf.geometry.buffer(buffermeters)
    return gdf 

def get_gdf_route(routefolder):
    df_route = read_combined_dataframe(findfiles(routefolder))
    df_route["geometry"] = df_route["Geometry"].apply(wkt.loads)
    df_route = df_route.drop(columns = ['Geometry'])
    gdf_route = gpd.GeoDataFrame(
        df_route,
        geometry="geometry",
        crs="EPSG:4326"
    ) 

    return gdf_route



In [4]:
# 00_Setup 所有全域函數
# 1.) 參數
SL_buffer_meters = 500
SP_buffer_meters = 50

# 2.) 資料input資料夾
referencefolder = os.path.abspath(os.path.join(os.getcwd(), '..', '參考資料'))
seqfolder = os.path.abspath(os.path.join(os.getcwd(), '..', '00_TDX資料下載', '01公車站序資料'))
routefolder = os.path.abspath(os.path.join(os.getcwd(), '..', '00_TDX資料下載', '02公車路線資料'))
SLfolder = os.path.abspath(os.path.join(os.getcwd(), '..', '..', 'TRTS5屏柵線'))


In [5]:
# 1. 讀取資料
# 讀取模型編修組提供的屏柵線 & 調查點位
gdf_sl, gdf_sp = get_screenline_and_surveypoint(SLfolder=SLfolder)
# 轉為TWD97 (EPSG:3826) 才能使用buffer功能
gdf_sp_unique = get_gdf_sp_unique(gdf_sp).to_crs(epsg=3826) # 因為提供的點位有分路段，所以先改為同一個地點
gdf_sl_twd97 = gdf_sl.to_crs(epsg=3826).reindex(columns = ['TRTS5', 'Category', 'Number', 'geometry']) # 轉換為TWD97處理
# 讀取公車站序
df_seq = get_df_seq(seqfolder=seqfolder)
gdf_seq = dataframe_to_point(df = df_seq, lon_col='PositionLon', lat_col= 'PositionLat', crs="EPSG:4326", target_crs="EPSG:3826")
# 讀取公車原始線型
gdf_route  = get_gdf_route(routefolder)
gdf_route = gdf_route.to_crs(epsg=3826)

# 2. Buffer初步確認那些會經過調查點位
gdf_routepair = get_gdfroutepair(df_seq) # 取得公車路線pair直線
gdf_sl_buffer = buffer_distance(gdf=gdf_sl_twd97, buffermeters=SL_buffer_meters).reset_index(drop = True) # 屏柵線取buffer
gdf_sp_buffer = buffer_distance(gdf=gdf_sp_unique, buffermeters=SP_buffer_meters).reset_index(drop = True) # 調查點位取buffer

route_in_buffer = gpd.sjoin(gdf_routepair,
                            gdf_sp_buffer[['Category', 'TRTS5', 'SP_No', 'Name', 'PositionLon', 'PositionLat','Surveyname', 'geometry']],
                            how='inner',
                            predicate='intersects'
                            ).drop(columns='index_right')

# route_in_buffer = gpd.sjoin(gdf_routepair,
#                             gdf_sl_buffer[['TRTS5','Category','Number','geometry']],
#                             how='inner',
#                             predicate='intersects'
#                             ).drop(columns='index_right')

In [6]:
gdf_route_in_spbuffer = gpd.sjoin(gdf_route,
                                  gdf_sp_buffer[['Category', 'TRTS5', 'SP_No', 'Name', 'PositionLon', 'PositionLat','Surveyname', 'geometry']],
                                  how='inner',
                                  predicate='intersects').drop(columns='index_right')

In [7]:
gdf_route[gdf_route['SubRouteUID'].isna()][['RouteUID', 'RouteName_Zh', 'SubRouteUID', 'SubRouteName_Zh']]

Unnamed: 0,RouteUID,RouteName_Zh,SubRouteUID,SubRouteName_Zh
1992,NWT10116,242,,
1993,NWT10116,242,,
1994,NWT10143,棕7,,
1995,NWT10143,棕7,,
1996,NWT10148,702,,
...,...,...,...,...
4986,TPE19755,新莊-臺北車站,,
4987,TPE19759,通勤30,,
4988,TPE19759,通勤30,,
4989,TPE810,敦化幹線,,


In [10]:
busrouteinfofiles = findfiles(os.path.join('..', '00_TDX資料下載', '03公車路線營運資料'), 'xml')

In [11]:
import pandas as pd
import xml.etree.ElementTree as ET


def read_businfo_xml(xml_path: str) -> pd.DataFrame:
    """
    Read PTX BusRoute XML and convert to DataFrame.
    One SubRoute x Direction per row.
    """

    ns = {'ptx': 'https://ptx.transportdata.tw/standard/schema/'}

    tree = ET.parse(xml_path)
    root = tree.getroot()

    rows = []

    for route in root.findall('ptx:BusRoute', ns):
        route_uid = route.findtext('ptx:RouteUID', default=None, namespaces=ns)
        route_id = route.findtext('ptx:RouteID', default=None, namespaces=ns)
        route_name_zh = route.findtext('ptx:RouteName/ptx:Zh_tw', default=None, namespaces=ns)
        route_name_en = route.findtext('ptx:RouteName/ptx:En', default=None, namespaces=ns)

        city = route.findtext('ptx:City', default=None, namespaces=ns)
        city_code = route.findtext('ptx:CityCode', default=None, namespaces=ns)
        update_time = route.findtext('ptx:UpdateTime', default=None, namespaces=ns)
        version_id = route.findtext('ptx:VersionID', default=None, namespaces=ns)

        # Operator（通常只有一個）
        operator = route.find('ptx:Operators/ptx:Operator', ns)
        operator_id = operator.findtext('ptx:OperatorID', default=None, namespaces=ns) if operator is not None else None
        operator_name_zh = operator.findtext('ptx:OperatorName/ptx:Zh_tw', default=None, namespaces=ns) if operator is not None else None
        operator_name_en = operator.findtext('ptx:OperatorName/ptx:En', default=None, namespaces=ns) if operator is not None else None

        for sub in route.findall('ptx:SubRoutes/ptx:SubRoute', ns):
            rows.append({
                'RouteUID': route_uid,
                'RouteID': route_id,
                'RouteNameZh': route_name_zh,
                'RouteNameEn': route_name_en,
                'SubRouteUID': sub.findtext('ptx:SubRouteUID', default=None, namespaces=ns),
                'SubRouteID': sub.findtext('ptx:SubRouteID', default=None, namespaces=ns),
                'SubRouteNameZh': sub.findtext('ptx:SubRouteName/ptx:Zh_tw', default=None, namespaces=ns),
                'SubRouteNameEn': sub.findtext('ptx:SubRouteName/ptx:En', default=None, namespaces=ns),
                'Direction': sub.findtext('ptx:Direction', default=None, namespaces=ns),
                'Headsign': sub.findtext('ptx:Headsign', default=None, namespaces=ns),
                'HeadsignEn': sub.findtext('ptx:HeadsignEn', default=None, namespaces=ns),
                'OperatorID': operator_id,
                'OperatorNameZh': operator_name_zh,
                'OperatorNameEn': operator_name_en,
                'City': city,
                'CityCode': city_code,
                'UpdateTime': update_time,
                'VersionID': version_id
            })

    return pd.DataFrame(rows)


In [16]:
from __future__ import annotations

import json
import pandas as pd
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional


def read_businfo_xml(xml_path: str) -> pd.DataFrame:
    """
    Read PTX BusRoute XML and convert to a DataFrame.
    Output granularity: one row per SubRoute (typically SubRoute x Direction).

    Notes
    - Route-level fields are repeated on each SubRoute row (practical for joins).
    - Operators can be multiple:
        * Operators_json: full list preserved as JSON string
        * OperatorID_first / OperatorNameZh_first / ...: first operator extracted for convenience
    - SubRoute OperatorIDs can be multiple:
        * OperatorIDs: list
        * OperatorIDs_str: joined by '|'
    """

    ns = {'ptx': 'https://ptx.transportdata.tw/standard/schema/'}

    tree = ET.parse(xml_path)
    root = tree.getroot()

    def t(elem: Optional[ET.Element], path: str) -> Optional[str]:
        """Safe findtext with namespaces + default None."""
        if elem is None:
            return None
        return elem.findtext(path, default=None, namespaces=ns)

    def as_int(x: Optional[str]) -> Optional[int]:
        if x is None or x == "":
            return None
        try:
            return int(x)
        except ValueError:
            return None

    def parse_operators(route_elem: ET.Element) -> List[Dict[str, Any]]:
        ops = []
        for op in route_elem.findall('ptx:Operators/ptx:Operator', ns):
            ops.append({
                "OperatorID": t(op, 'ptx:OperatorID'),
                "OperatorNameZh": t(op, 'ptx:OperatorName/ptx:Zh_tw'),
                "OperatorNameEn": t(op, 'ptx:OperatorName/ptx:En'),
                "OperatorCode": t(op, 'ptx:OperatorCode'),
                "OperatorNo": t(op, 'ptx:OperatorNo'),
            })
        return ops

    def parse_subroute_operator_ids(sub_elem: ET.Element) -> List[str]:
        ids = []
        for opid in sub_elem.findall('ptx:OperatorIDs/ptx:OperatorID', ns):
            if opid.text is not None and opid.text.strip() != "":
                ids.append(opid.text.strip())
        return ids

    rows: List[Dict[str, Any]] = []

    for route in root.findall('ptx:BusRoute', ns):
        # --- Route-level fields ---
        route_uid = t(route, 'ptx:RouteUID')
        route_id = t(route, 'ptx:RouteID')
        has_subroutes = t(route, 'ptx:HasSubRoutes')
        authority_id = t(route, 'ptx:AuthorityID')
        provider_id = t(route, 'ptx:ProviderID')
        bus_route_type = as_int(t(route, 'ptx:BusRouteType'))

        route_name_zh = t(route, 'ptx:RouteName/ptx:Zh_tw')
        route_name_en = t(route, 'ptx:RouteName/ptx:En')

        dep_stop_zh = t(route, 'ptx:DepartureStopNameZh')
        dep_stop_en = t(route, 'ptx:DepartureStopNameEn')
        dest_stop_zh = t(route, 'ptx:DestinationStopNameZh')
        dest_stop_en = t(route, 'ptx:DestinationStopNameEn')

        ticket_zh = t(route, 'ptx:TicketPriceDescriptionZh')
        ticket_en = t(route, 'ptx:TicketPriceDescriptionEn')
        buffer_zh = t(route, 'ptx:FareBufferZoneDescriptionZh')
        buffer_en = t(route, 'ptx:FareBufferZoneDescriptionEn')

        route_map_url = t(route, 'ptx:RouteMapImageUrl')
        city = t(route, 'ptx:City')
        city_code = t(route, 'ptx:CityCode')
        update_time = t(route, 'ptx:UpdateTime')
        version_id = as_int(t(route, 'ptx:VersionID'))

        # Operators (may be multiple)
        ops = parse_operators(route)
        ops_json = json.dumps(ops, ensure_ascii=False)  # keep full info
        op0 = ops[0] if len(ops) > 0 else {}

        # SubRoutes (nullable; but in PTX usually exists)
        subroutes = route.findall('ptx:SubRoutes/ptx:SubRoute', ns)

        # If no SubRoute nodes exist, still output a single row at Route granularity
        # (rare but safer)
        if not subroutes:
            rows.append({
                # Route-level
                "RouteUID": route_uid,
                "RouteID": route_id,
                "HasSubRoutes": has_subroutes,
                "AuthorityID": authority_id,
                "ProviderID": provider_id,
                "BusRouteType": bus_route_type,
                "RouteNameZh": route_name_zh,
                "RouteNameEn": route_name_en,
                "DepartureStopNameZh": dep_stop_zh,
                "DepartureStopNameEn": dep_stop_en,
                "DestinationStopNameZh": dest_stop_zh,
                "DestinationStopNameEn": dest_stop_en,
                "TicketPriceDescriptionZh": ticket_zh,
                "TicketPriceDescriptionEn": ticket_en,
                "FareBufferZoneDescriptionZh": buffer_zh,
                "FareBufferZoneDescriptionEn": buffer_en,
                "RouteMapImageUrl": route_map_url,
                "City": city,
                "CityCode": city_code,
                "UpdateTime": update_time,
                "VersionID": version_id,

                # Operators
                "Operators_json": ops_json,
                "OperatorID_first": op0.get("OperatorID"),
                "OperatorNameZh_first": op0.get("OperatorNameZh"),
                "OperatorNameEn_first": op0.get("OperatorNameEn"),
                "OperatorCode_first": op0.get("OperatorCode"),
                "OperatorNo_first": op0.get("OperatorNo"),

                # SubRoute-level (empty)
                "SubRouteUID": None,
                "SubRouteID": None,
                "SubRouteNameZh": None,
                "SubRouteNameEn": None,
                "Headsign": None,
                "HeadsignEn": None,
                "Direction": None,
                "FirstBusTime": None,
                "LastBusTime": None,
                "HolidayFirstBusTime": None,
                "HolidayLastBusTime": None,
                "SubDepartureStopNameZh": None,
                "SubDepartureStopNameEn": None,
                "SubDestinationStopNameZh": None,
                "SubDestinationStopNameEn": None,
                "OperatorIDs": [],
                "OperatorIDs_str": None,
            })
            continue

        # normal case: output per SubRoute row
        for sub in subroutes:
            operator_ids = parse_subroute_operator_ids(sub)
            rows.append({
                # Route-level
                "RouteUID": route_uid,
                "RouteID": route_id,
                "HasSubRoutes": has_subroutes,
                "AuthorityID": authority_id,
                "ProviderID": provider_id,
                "BusRouteType": bus_route_type,
                "RouteNameZh": route_name_zh,
                "RouteNameEn": route_name_en,
                "DepartureStopNameZh": dep_stop_zh,
                "DepartureStopNameEn": dep_stop_en,
                "DestinationStopNameZh": dest_stop_zh,
                "DestinationStopNameEn": dest_stop_en,
                "TicketPriceDescriptionZh": ticket_zh,
                "TicketPriceDescriptionEn": ticket_en,
                "FareBufferZoneDescriptionZh": buffer_zh,
                "FareBufferZoneDescriptionEn": buffer_en,
                "RouteMapImageUrl": route_map_url,
                "City": city,
                "CityCode": city_code,
                "UpdateTime": update_time,
                "VersionID": version_id,

                # Operators
                "Operators_json": ops_json,
                "OperatorID_first": op0.get("OperatorID"),
                "OperatorNameZh_first": op0.get("OperatorNameZh"),
                "OperatorNameEn_first": op0.get("OperatorNameEn"),
                "OperatorCode_first": op0.get("OperatorCode"),
                "OperatorNo_first": op0.get("OperatorNo"),

                # SubRoute-level (per swagger)
                "SubRouteUID": t(sub, 'ptx:SubRouteUID'),
                "SubRouteID": t(sub, 'ptx:SubRouteID'),
                "SubRouteNameZh": t(sub, 'ptx:SubRouteName/ptx:Zh_tw'),
                "SubRouteNameEn": t(sub, 'ptx:SubRouteName/ptx:En'),
                "Headsign": t(sub, 'ptx:Headsign'),
                "HeadsignEn": t(sub, 'ptx:HeadsignEn'),
                "Direction": as_int(t(sub, 'ptx:Direction')),
                "FirstBusTime": t(sub, 'ptx:FirstBusTime'),
                "LastBusTime": t(sub, 'ptx:LastBusTime'),
                "HolidayFirstBusTime": t(sub, 'ptx:HolidayFirstBusTime'),
                "HolidayLastBusTime": t(sub, 'ptx:HolidayLastBusTime'),
                "SubDepartureStopNameZh": t(sub, 'ptx:DepartureStopNameZh'),
                "SubDepartureStopNameEn": t(sub, 'ptx:DepartureStopNameEn'),
                "SubDestinationStopNameZh": t(sub, 'ptx:DestinationStopNameZh'),
                "SubDestinationStopNameEn": t(sub, 'ptx:DestinationStopNameEn'),

                # SubRoute OperatorIDs
                "OperatorIDs": operator_ids,
                "OperatorIDs_str": "|".join(operator_ids) if operator_ids else None,
            })

    df = pd.DataFrame(rows)

    # Optional: normalize HasSubRoutes to boolean where possible
    if "HasSubRoutes" in df.columns:
        df["HasSubRoutes"] = df["HasSubRoutes"].map(
            lambda x: True if str(x).lower() == "true" else False if str(x).lower() == "false" else None
        )

    return df


In [34]:
from __future__ import annotations

import json
import pandas as pd
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional


def read_businfo_xml(xml_path: str) -> pd.DataFrame:
    ns = {'ptx': 'https://ptx.transportdata.tw/standard/schema/'}

    tree = ET.parse(xml_path)
    root = tree.getroot()

    def t(elem: Optional[ET.Element], path: str) -> Optional[str]:
        if elem is None:
            return None
        return elem.findtext(path, default=None, namespaces=ns)

    def as_int(x: Optional[str]) -> Optional[int]:
        if x is None or x == "":
            return None
        try:
            return int(x)
        except ValueError:
            return None

    def parse_operators(route_elem: ET.Element) -> List[Dict[str, Any]]:
        ops = []
        for op in route_elem.findall('ptx:Operators/ptx:Operator', ns):
            ops.append({
                "OperatorID": t(op, 'ptx:OperatorID'),
                "OperatorNameZh": t(op, 'ptx:OperatorName/ptx:Zh_tw'),
                "OperatorNameEn": t(op, 'ptx:OperatorName/ptx:En'),
                "OperatorCode": t(op, 'ptx:OperatorCode'),
                "OperatorNo": t(op, 'ptx:OperatorNo'),
            })
        return ops

    def parse_subroute_operator_ids(sub_elem: ET.Element) -> List[str]:
        ids = []
        for opid in sub_elem.findall('ptx:OperatorIDs/ptx:OperatorID', ns):
            if opid.text and opid.text.strip():
                ids.append(opid.text.strip())
        return ids

    rows: List[Dict[str, Any]] = []

    for route in root.findall('ptx:BusRoute', ns):
        ops = parse_operators(route)
        ops_json = json.dumps(ops, ensure_ascii=False)
        op0 = ops[0] if ops else {}

        # Route-level (swagger order)
        base = {
            "RouteUID": t(route, 'ptx:RouteUID'),
            "RouteID": t(route, 'ptx:RouteID'),
            "HasSubRoutes": t(route, 'ptx:HasSubRoutes'),
            # Operators (保留完整 + 常用第一筆)
            "Operators_json": ops_json,
            "OperatorID_first": op0.get("OperatorID"),
            "OperatorNameZh_first": op0.get("OperatorNameZh"),
            "OperatorNameEn_first": op0.get("OperatorNameEn"),
            "OperatorCode_first": op0.get("OperatorCode"),
            "OperatorNo_first": op0.get("OperatorNo"),
            # Authority/Provider
            "AuthorityID": t(route, 'ptx:AuthorityID'),
            "ProviderID": t(route, 'ptx:ProviderID'),
        }

        subroutes = route.findall('ptx:SubRoutes/ptx:SubRoute', ns)

        # 若真的沒有 SubRoute，就輸出一列（其餘 SubRoute 欄位補 None）
        if not subroutes:
            rows.append({
                **base,
                "SubRouteUID": None,
                "SubRouteID": None,
                "OperatorIDs": [],
                "SubRouteNameZh": None,
                "SubRouteNameEn": None,
                "Headsign": None,
                "HeadsignEn": None,
                "Direction": None,
                "FirstBusTime": None,
                "LastBusTime": None,
                "HolidayFirstBusTime": None,
                "HolidayLastBusTime": None,
                "SubDepartureStopNameZh": None,
                "SubDepartureStopNameEn": None,
                "SubDestinationStopNameZh": None,
                "SubDestinationStopNameEn": None,

                "BusRouteType": as_int(t(route, 'ptx:BusRouteType')),
                "RouteNameZh": t(route, 'ptx:RouteName/ptx:Zh_tw'),
                "RouteNameEn": t(route, 'ptx:RouteName/ptx:En'),
                "DepartureStopNameZh": t(route, 'ptx:DepartureStopNameZh'),
                "DepartureStopNameEn": t(route, 'ptx:DepartureStopNameEn'),
                "DestinationStopNameZh": t(route, 'ptx:DestinationStopNameZh'),
                "DestinationStopNameEn": t(route, 'ptx:DestinationStopNameEn'),
                "TicketPriceDescriptionZh": t(route, 'ptx:TicketPriceDescriptionZh'),
                "TicketPriceDescriptionEn": t(route, 'ptx:TicketPriceDescriptionEn'),
                "FareBufferZoneDescriptionZh": t(route, 'ptx:FareBufferZoneDescriptionZh'),
                "FareBufferZoneDescriptionEn": t(route, 'ptx:FareBufferZoneDescriptionEn'),
                "RouteMapImageUrl": t(route, 'ptx:RouteMapImageUrl'),
                "City": t(route, 'ptx:City'),
                "CityCode": t(route, 'ptx:CityCode'),
                "UpdateTime": t(route, 'ptx:UpdateTime'),
                "VersionID": as_int(t(route, 'ptx:VersionID')),
            })
            continue

        for sub in subroutes:
            operator_ids = parse_subroute_operator_ids(sub)

            rows.append({
                **base,

                # SubRoutes (swagger order)
                "SubRouteUID": t(sub, 'ptx:SubRouteUID'),
                "SubRouteID": t(sub, 'ptx:SubRouteID'),
                "OperatorIDs": operator_ids,
                "SubRouteNameZh": t(sub, 'ptx:SubRouteName/ptx:Zh_tw'),
                "SubRouteNameEn": t(sub, 'ptx:SubRouteName/ptx:En'),
                "Headsign": t(sub, 'ptx:Headsign'),
                "HeadsignEn": t(sub, 'ptx:HeadsignEn'),
                "Direction": as_int(t(sub, 'ptx:Direction')),
                "FirstBusTime": t(sub, 'ptx:FirstBusTime'),
                "LastBusTime": t(sub, 'ptx:LastBusTime'),
                "HolidayFirstBusTime": t(sub, 'ptx:HolidayFirstBusTime'),
                "HolidayLastBusTime": t(sub, 'ptx:HolidayLastBusTime'),
                "SubDepartureStopNameZh": t(sub, 'ptx:DepartureStopNameZh'),
                "SubDepartureStopNameEn": t(sub, 'ptx:DepartureStopNameEn'),
                "SubDestinationStopNameZh": t(sub, 'ptx:DestinationStopNameZh'),
                "SubDestinationStopNameEn": t(sub, 'ptx:DestinationStopNameEn'),

                # Route remaining fields (swagger order)
                "BusRouteType": as_int(t(route, 'ptx:BusRouteType')),
                "RouteNameZh": t(route, 'ptx:RouteName/ptx:Zh_tw'),
                "RouteNameEn": t(route, 'ptx:RouteName/ptx:En'),
                "DepartureStopNameZh": t(route, 'ptx:DepartureStopNameZh'),
                "DepartureStopNameEn": t(route, 'ptx:DepartureStopNameEn'),
                "DestinationStopNameZh": t(route, 'ptx:DestinationStopNameZh'),
                "DestinationStopNameEn": t(route, 'ptx:DestinationStopNameEn'),
                "TicketPriceDescriptionZh": t(route, 'ptx:TicketPriceDescriptionZh'),
                "TicketPriceDescriptionEn": t(route, 'ptx:TicketPriceDescriptionEn'),
                "FareBufferZoneDescriptionZh": t(route, 'ptx:FareBufferZoneDescriptionZh'),
                "FareBufferZoneDescriptionEn": t(route, 'ptx:FareBufferZoneDescriptionEn'),
                "RouteMapImageUrl": t(route, 'ptx:RouteMapImageUrl'),
                "City": t(route, 'ptx:City'),
                "CityCode": t(route, 'ptx:CityCode'),
                "UpdateTime": t(route, 'ptx:UpdateTime'),
                "VersionID": as_int(t(route, 'ptx:VersionID')),
            })

    df = pd.DataFrame(rows)

    # 依 swagger 順序強制排欄位（你要的重點）
    desired_cols = [
        # BusRoute
        "RouteUID", "RouteID", "HasSubRoutes",
        # Operators（swagger 原本是 array；我加了兩種表示法）
        "Operators_json", "OperatorID_first", "OperatorNameZh_first", "OperatorNameEn_first",
        "OperatorCode_first", "OperatorNo_first",
        # Authority/Provider
        "AuthorityID", "ProviderID",
        # SubRoutes（swagger order）
        "SubRouteUID", "SubRouteID", "OperatorIDs", "SubRouteNameZh", "SubRouteNameEn",
        "Headsign", "HeadsignEn", "Direction",
        "FirstBusTime", "LastBusTime", "HolidayFirstBusTime", "HolidayLastBusTime",
        "SubDepartureStopNameZh", "SubDepartureStopNameEn",
        "SubDestinationStopNameZh", "SubDestinationStopNameEn",
        # Remaining route fields (swagger order)
        "BusRouteType",
        "RouteNameZh", "RouteNameEn",
        "DepartureStopNameZh", "DepartureStopNameEn",
        "DestinationStopNameZh", "DestinationStopNameEn",
        "TicketPriceDescriptionZh", "TicketPriceDescriptionEn",
        "FareBufferZoneDescriptionZh", "FareBufferZoneDescriptionEn",
        "RouteMapImageUrl",
        "City", "CityCode",
        "UpdateTime", "VersionID",
    ]

    # 補齊缺的欄位（避免某些 XML 沒出現導致 KeyError）
    for c in desired_cols:
        if c not in df.columns:
            df[c] = None

    df = df[desired_cols]

    # Optional: HasSubRoutes 轉 bool
    df["HasSubRoutes"] = df["HasSubRoutes"].map(
        lambda x: True if str(x).lower() == "true" else False if str(x).lower() == "false" else None
    )

    return df


In [35]:
routeinfo = []
for file in busrouteinfofiles:
    routeinfo.append(read_businfo_xml(file))
df_routeinfo = pd.concat(routeinfo)

In [36]:
df_routeinfo.iloc[:, 30:]

Unnamed: 0,DepartureStopNameZh,DepartureStopNameEn,DestinationStopNameZh,DestinationStopNameEn,TicketPriceDescriptionZh,TicketPriceDescriptionEn,FareBufferZoneDescriptionZh,FareBufferZoneDescriptionEn,RouteMapImageUrl,City,CityCode,UpdateTime,VersionID
0,大竹消防隊,Dajhu Fire Brigade,庫倫街口,Kulun St. Entrance,,,,,https://web.taiwanbus.tw/MISUploadData/Schemat...,,,2025-12-16T14:53:03+08:00,6176
1,大竹消防隊,Dajhu Fire Brigade,庫倫街口,Kulun St. Entrance,,,,,https://web.taiwanbus.tw/MISUploadData/Schemat...,,,2025-12-16T14:53:03+08:00,6176
2,大竹消防隊,Dajhu Fire Brigade,庫倫街口,Kulun St. Entrance,,,,,https://web.taiwanbus.tw/MISUploadData/Schemat...,,,2025-12-16T14:53:03+08:00,6176
3,大竹消防隊,Dajhu Fire Brigade,庫倫街口,Kulun St. Entrance,,,,,https://web.taiwanbus.tw/MISUploadData/Schemat...,,,2025-12-16T14:53:03+08:00,6176
4,國家新城,Guojia Xincheng,樟樹國際實中,Zhangshu Junior High School,,,,,https://web.taiwanbus.tw/MISUploadData/Schemat...,,,2025-12-16T14:53:03+08:00,6176
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1161,新莊,Xinzhuang,臺北車站,Taipei Main Sta.,二段票,2 segments,,,https://ebus.gov.taipei/MapOverview?nid=016300...,Taipei,TPE,2025-12-16T14:51:04+08:00,6521
1162,東吳大學外雙溪校區,Soochow U. Waishuangsi Campus,東吳大學城中校區,Soochow U. Downtown Campus,二段票,2 segments,捷運士林站(中正)-士林區行政中心(中正),MRT Shilin Sta. (Zhongzheng)-Shilin Admin. Cen...,https://ebus.gov.taipei/MapOverview?nid=015200...,Taipei,TPE,2025-12-16T14:51:04+08:00,6521
1163,東吳大學外雙溪校區,Soochow U. Waishuangsi Campus,東吳大學城中校區,Soochow U. Downtown Campus,二段票,2 segments,捷運士林站(中正)-士林區行政中心(中正),MRT Shilin Sta. (Zhongzheng)-Shilin Admin. Cen...,https://ebus.gov.taipei/MapOverview?nid=015200...,Taipei,TPE,2025-12-16T14:51:04+08:00,6521
1164,麟光新村,Linguang New Village,榮總,Veterans General Hospital,兩段票,2 segments,民權松江路口-銘傳大學,Minquan and Songjiang Lntersection-Mingchuan U.),https://ebus.gov.taipei/MapOverview?nid=016100...,Taipei,TPE,2025-12-16T14:51:04+08:00,6521


In [45]:
df_routeinfo[['City','CityCode','RouteUID', 'RouteNameZh', 'SubRouteUID', 'SubRouteNameZh','Headsign','Direction']].drop_duplicates()

Unnamed: 0,City,CityCode,RouteUID,RouteNameZh,SubRouteUID,SubRouteNameZh,Headsign,Direction
0,,,THB0968,0968,THB096801,0968,桃園市蘆竹區→國道1號→圓山轉運站,0
1,,,THB0968,0968,THB096802,0968,圓山轉運站→國道1號→桃園市蘆竹區,1
2,,,THB0968,0968,THB0968A1,0968A,桃園市蘆竹區→國道1號→圓山轉運站[經開南大學],0
3,,,THB0968,0968,THB0968A2,0968A,圓山轉運站→國道1號→桃園市蘆竹區[經開南大學],1
4,,,THB1031,1031,THB103101,1031,國家新城→汐止樟樹灣,0
...,...,...,...,...,...,...,...,...
1161,Taipei,TPE,TPE19755,新莊-臺北車站,TPE162471,新莊-臺北車站,,0
1162,Taipei,TPE,TPE19759,通勤30,TPE162482,通勤30,,0
1163,Taipei,TPE,TPE19759,通勤30,TPE162482,通勤30,,1
1164,Taipei,TPE,TPE810,敦化幹線,TPE810,敦化幹線,,0


In [None]:
df_routeinfo.columnsdf_routeinfodf_routeinfo

Index(['RouteUID', 'RouteID', 'HasSubRoutes', 'AuthorityID', 'ProviderID',
       'BusRouteType', 'RouteNameZh', 'RouteNameEn', 'DepartureStopNameZh',
       'DepartureStopNameEn', 'DestinationStopNameZh', 'DestinationStopNameEn',
       'TicketPriceDescriptionZh', 'TicketPriceDescriptionEn',
       'FareBufferZoneDescriptionZh', 'FareBufferZoneDescriptionEn',
       'RouteMapImageUrl', 'City', 'CityCode', 'UpdateTime', 'VersionID',
       'Operators_json', 'OperatorID_first', 'OperatorNameZh_first',
       'OperatorNameEn_first', 'OperatorCode_first', 'OperatorNo_first',
       'SubRouteUID', 'SubRouteID', 'SubRouteNameZh', 'SubRouteNameEn',
       'Headsign', 'HeadsignEn', 'Direction', 'FirstBusTime', 'LastBusTime',
       'HolidayFirstBusTime', 'HolidayLastBusTime', 'SubDepartureStopNameZh',
       'SubDepartureStopNameEn', 'SubDestinationStopNameZh',
       'SubDestinationStopNameEn', 'OperatorIDs', 'OperatorIDs_str'],
      dtype='object')