In [1]:
import os
import sys
import pandas as pd
import logging
import xml.etree.ElementTree as ET
import gzip
import os
import shutil
import re
from sqlalchemy import create_engine, MetaData, text, insert
from datetime import datetime, timedelta
from argparse import ArgumentParser
from typing import Tuple

sys.path.append('/app')
from utils import test
from lib.tisvcloud import vd

In [None]:
def parse_xmlfile(filename: str) -> Tuple[ET.ElementTree, ET.Element]:
    """ xml 解析器 """
    if (os.path.splitext(filename)[1] == '.xml'):
        tree = ET.parse(filename)
        root = tree.getroot()
    elif (os.path.splitext(filename)[1] == '.gz'):
        with gzip.open(filename, 'rb') as f:
            tree = ET.parse(f)
            root = tree.getroot()
    else:
        raise ValueError('File cannot be parsed.')

    # 移除命名空間前綴
    for elem in root.iter():
        if ('}' in elem.tag):
            elem.tag = elem.tag.split('}')[1]
    return tree, root

def mile_str_to_float(mileage: str) -> float:
    """ 里程表示方式轉換成 float """
    return round(int(mileage[:mileage.find('K')]) +\
                 float(mileage[mileage.find('+')+1:])/1000, 3)

def get_nfb_section_df(filename: str) -> pd.DataFrame:
    """ 取得國道路段基本資料 """
    _, root = parse_xmlfile(filename)
    nfb_section = []
    secs = root[-1]
    for sec in secs:
        data = {}
        for col in sec:
            if (col.tag == 'SectionID'):
                data[col.tag] = col.text
            
            if (col.tag == 'RoadSection'):
                for subcol in col:
                    data[subcol.tag] = subcol.text
            elif (col.tag == 'SectionMile'):
                for subcol in col:
                    data[subcol.tag] = mile_str_to_float(subcol.text)
            else:
                data[col.tag] = col.text

            data['UpdateTime'] =\
                datetime.strptime(root[0].text, '%Y-%m-%dT%H:%M:%S+08:00')
            data['CreateTime'] = datetime.now().replace(microsecond=0)

        nfb_section.append(data)
    
    df = pd.DataFrame(nfb_section).astype({
        'RoadClass': 'int64',
        'SectionLength': 'float64',
        'SpeedLimit': 'int64'
    })
    df.index += 1
    return df

def get_nfb_vd_static_df(filename: str) -> pd.DataFrame:
    """ 取得 VD 靜態資料 (資料來源: 高公局) """
    _, root = parse_xmlfile(filename)
    vd_static = []
    vds = root[-1]
    for vd in vds:
        data = {}
        for col in vd:
            if (col.tag == 'VDID'):
                vdid = col.text.replace('--', '-')
                data[col.tag] = vdid
                if (re.search(r"[A-Z]*-(\w*)-([A-Z])-([(\d+(\.\d+)?)]*)-(\w)", vdid).group(4) == 'M') or \
                    (re.search(r"[A-Z]*-(\w*)-([A-Z])-([(\d+(\.\d+)?)]*)-(\w)", vdid).group(4) == 'N'):
                    data['Mainlane'] = 1
                else:
                    data['Mainlane'] = 0
            elif (col.tag == 'LocationMile'):
                data[col.tag] = mile_str_to_float(col.text)
            elif (col.tag == 'DetectionLinks'):
                for subcol in col[0]:
                    data[subcol.tag] = subcol.text
            elif (col.tag == 'RoadSection'):
                pass
            else:
                data[col.tag] = col.text
            data['CreateTime'] = datetime.now().replace(microsecond=0)
        
        vd_static.append(data)


    vd_static = pd.DataFrame(vd_static).astype({
        'BiDirectional': 'int64',
        'LaneNum': 'int64',
        'ActualLaneNum': 'int64',
        'VDType': 'int64',
        'LocationType': 'int64',
        'DetectionType': 'int64',
        'PositionLon': 'float64',
        'PositionLat': 'float64',
        'RoadClass': 'int64'
    })
    
    return vd_static

def get_nfb_vd_dynamic_df(filename: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """ 取得 VD 動態資料 (資料來源: 高公局) """
    vd_dynamic = []
    vd_dynamic_details = []
    _, root = parse_xmlfile(filename)
    vdLives = root[-1]
    for vdLive in vdLives:
        data = {}
        for col in vdLive:
            if (col.tag == 'VDID'):
                vdid = col.text.replace('--', '-')
                data[col.tag] = vdid
            elif (col.tag == 'LinkFlows'):
                for subcol in col[0]:
                    if (subcol.tag == 'LinkID'):
                        data[subcol.tag] = subcol.text
                    else:
                        for lane in subcol:
                            dtlData = {}
                            for laneDtl in lane:
                                dtlData['VDID'] = vdid
                                if (laneDtl.tag == 'Vehicles'):
                                    dtlData['Volume'] = 0
                                    for veh in laneDtl:
                                        if (veh[0].text == 'S'):
                                            dtlData['SmallCarVolume'] = int(veh[1].text)
                                            if (int(veh[1].text) != -99):
                                                dtlData['Volume'] += int(veh[1].text)
                                            else:
                                                dtlData['Volume'] = -99
                                        elif (veh[0].text == 'L'):
                                            dtlData['LargeCarVolume'] = int(veh[1].text)
                                            if (int(veh[1].text) != -99):
                                                dtlData['Volume'] += int(veh[1].text)
                                            else:
                                                dtlData['Volume'] = -99
                                        elif (veh[0].text == 'T'):
                                            dtlData['TruckCarVolume'] = int(veh[1].text)
                                            if (int(veh[1].text) != -99):
                                                dtlData['Volume'] += int(veh[1].text)
                                            else:
                                                dtlData['Volume'] = -99
                                else:
                                    dtlData[laneDtl.tag] = laneDtl.text
                            vd_dynamic_details.append(dtlData)
            elif (col.tag == 'DataCollectTime'):
                data[col.tag] = datetime.strptime(col.text, '%Y-%m-%dT%H:%M:%S+08:00')
                data['DataCollectTimeStamp'] = int(datetime.strptime(col.text, '%Y-%m-%dT%H:%M:%S+08:00').timestamp())
                data['CreateTime'] = datetime.now().replace(microsecond=0)
            else:
                data[col.tag] = col.text

        vd_dynamic.append(data)

    vd_dynamic = pd.DataFrame(vd_dynamic).astype({
        'Status': 'int64'
    })

    vd_dynamic_details = pd.DataFrame(vd_dynamic_details).astype({
        'LaneID': 'int64',
        'LaneType': 'int64',
        'Speed': 'int64',
        'Occupancy': 'int64'
    })
    vd_dynamic_details['DataCollectTime'] =\
        [vd_dynamic['DataCollectTime'].iloc[0] for _ in range(vd_dynamic_details.shape[0])]
    
    vd_dynamic_details['DataCollectTimeStamp'] =\
        vd_dynamic_details['DataCollectTime'].apply(
            lambda x: int(x.tz_localize('Asia/Taipei').timestamp())
        )
    
    vd_dynamic_details['CreateTime'] =\
        [vd_dynamic['CreateTime'].iloc[0] for _ in range(vd_dynamic_details.shape[0])]

    return vd_dynamic, vd_dynamic_details

def get_tw_basic_road_info_df(filename: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """ 取得全臺道路基本資料 """
    roadInfo = pd.read_csv(filename).astype({'UpdateDate': 'datetime64[ns]'})
    roadInfo = roadInfo.rename(columns={'UpdateDate': 'UpdateTime'})
    roadInfo['CreateTime'] =\
        [datetime.now().replace(microsecond=0) for _ in range(roadInfo.shape[0])]

    roadClassDf = roadInfo.groupby(by='RoadClass').agg({
        'RdCName': 'max',
        'CreateTime': 'max'
    }).reset_index()
    
    roadInfo = roadInfo[
        ['RoadID', 'RoadClass', 'RoadName', 'UpdateTime', 'CreateTime']
    ].groupby(by=['RoadID', 'RoadName']).agg({
        'RoadClass': 'max',
        'UpdateTime': 'max',
        'CreateTime': 'max'
    }).reset_index()

    roadInfo.index += 1
    roadClassDf.index += 1
    return roadInfo, roadClassDf

def get_thb_vd_static_df(filename: str) -> pd.DataFrame:
    """ 取得 VD 靜態資料 (資料來源: 公路局) """
    _, root = parse_xmlfile(filename)
    vd_static = []
    vds = root[-2]
    for vd in vds:
        if (vd[2].text == '1'):
            data_0, data_1 = {}, {}
            for col in vd:
                if (col.tag == 'DetectionLinks'):
                    for subcol in col[0]:
                        data_0[subcol.tag] = subcol.text
                    for subcol in col[1]:
                        data_1[subcol.tag] = subcol.text
                else:
                    data_0[col.tag] = col.text
                    data_1[col.tag] = col.text

                data_0['CreateTime'] = datetime.now().replace(microsecond=0)
                data_1['CreateTime'] = datetime.now().replace(microsecond=0)

            vd_static.append(data_0)
            vd_static.append(data_1)
        
        else:
            data = {}
            for col in vd:
                if (col.tag == 'DetectionLinks'):
                    for subcol in col[0]:
                        data[subcol.tag] = subcol.text
                else:
                    data[col.tag] = col.text
                data['CreateTime'] = datetime.now().replace(microsecond=0)
            vd_static.append(data)

    vd_static = pd.DataFrame(vd_static).astype({
        'BiDirectional': 'int64',
        'LaneNum': 'int64',
        'ActualLaneNum': 'int64',
        'VDType': 'int64',
        'LocationType': 'int64',
        'DetectionType': 'int64',
        'PositionLon': 'float64',
        'PositionLat': 'float64',
        'RoadClass': 'int64',
        'CreateTime': 'datetime64[ns]'
    })
    vd_static['LocationMile'] = vd_static['VDID'].apply(lambda x: float(re.search(r"[A-Z]*-\d*-\w*-(\d*)-\d*", x).group(1)))
    vd_static['Abnormal'] = [0 for _ in range(vd_static.shape[0])]
    return vd_static

In [None]:
sql = " SELECT * FROM vd_static_2023 "
vd_static = pd.read_sql(sql, con=engine)
vdStaticIDs = {k: v for k, v in zip(vd_static['VDID'], vd_static['id'])}

dtList = list(map(lambda x: str(x),
                  list(pd.date_range('2024-01-01', '2024-04-15'))))
for dt in dtList:
    date = dt[:10]
    dtEnd = str((datetime.strptime(dt, '%Y-%m-%d %H:%M:%S') + timedelta(days=1)).replace(microsecond=0))
    
    with engine.connect() as conn:
        sql = text(f"""
        CREATE TABLE vd_dynamic_detail_{date.replace('-','')} (
            id INT PRIMARY KEY AUTO_INCREMENT COMMENT '流水號',
            VdStaticID INT COMMENT 'ref. vd_static.id',
            FOREIGN KEY (VdStaticID) REFERENCES vd_static_2023(id),
            LaneID INT COMMENT '車道代碼',
            LaneType INT COMMENT '車道種類 = [1: 一般車道, 2: 快車道, 3: 慢車道, 4: 機車道, 5: 高承載車道, 6: 公車專用道, 7: 轉向車道, 8: 路肩, 9: 輔助車道, 10: 調撥車道, 11: 其他]',
            Speed INT COMMENT '平均速率偵測值, [-99:資料異常, -1:道路封閉]',
            Occupancy INT COMMENT '佔有率偵測值',
            Volume INT COMMENT '流量偵測值, -99=資料異常',
            SmallCarVolume INT COMMENT '小型車流量偵測值',
            LargeCarVolume INT COMMENT '大型車流量偵測值',
            TruckCarVolume INT COMMENT '聯結車流量偵測值',
            DataCollectTime DATETIME COMMENT '資料蒐集時間',
            CreateTime DATETIME COMMENT '建立時間'
        ) COMMENT '{date} VD 動態資料';
        """)
        conn.execute(sql)
        conn.commit()
        conn.close()
        logging.debug(
            f"Table `vd_dynamic_detail_{date.replace('-','')}` has been created."
        )
                
    
    for hour in range(24):
        for minute in range(60):
            err_msg = vd.download(date.replace('-',''), hour, minute, directory='./')
            if err_msg:
                logging.debug(err_msg)
                continue
            else:
                nfb_vd_dynamic, nfb_vd_dynamic_details =\
                    get_nfb_vd_dynamic_df(
                        filename=f"./VDLive_{date.replace('-','')}/VDLive_{hour:02d}{minute:02d}.xml.gz"
                    )
            
            nfb_vd_dynamic =\
                nfb_vd_dynamic.loc[
                    (nfb_vd_dynamic['VDID'].str.contains('N5-')) &\
                    (nfb_vd_dynamic['VDID'].isin(vdStaticIDs.keys()))
                ].reset_index(drop=True)
            
            nfb_vd_dynamic_details =\
                nfb_vd_dynamic_details.loc[
                    (nfb_vd_dynamic_details['VDID'].str.contains('N5-')) &\
                    (nfb_vd_dynamic_details['VDID'].isin(vdStaticIDs.keys()))
                ].reset_index(drop=True)

            # 3NF Process
            nfb_vd_dynamic['VDID'] = nfb_vd_dynamic['VDID'].apply(lambda x: vdStaticIDs[x])
            nfb_vd_dynamic.rename(columns={'VDID': 'VdStaticID'}, inplace=True)
            nfb_vd_dynamic_details['VDID'] = nfb_vd_dynamic_details['VDID'].apply(lambda x: vdStaticIDs[x])
            nfb_vd_dynamic_details.rename(columns={'VDID': 'VdStaticID'}, inplace=True)
                                
            # Transfer nan to None to avoid ProgrammingError in pymysql.err
            nfb_vd_dynamic = nfb_vd_dynamic.where(nfb_vd_dynamic.notnull(), None)
            nfb_vd_dynamic_details = nfb_vd_dynamic_details.where(nfb_vd_dynamic_details.notnull(), None)
            
            # Get the info of table
            metadata = MetaData()
            metadata.reflect(bind=engine)
            
            logging.debug(f"Writing to db: {date.replace('-','')}_{hour:02d}:{minute:02d}")
            with engine.connect() as conn:
                conn.execute(
                    insert(metadata.tables[f"vd_dynamic_detail_{date.replace('-','')}"]),
                    nfb_vd_dynamic_details.to_dict(orient='records')
                )
                conn.commit()
                conn.close()
    shutil.rmtree(f"VDLive_{date.replace('-','')}")
    logging.debug(f"Finished `vd_dynamic_detail_{date.replace('-','')}`.")