In [1]:
import requests
from pathlib import Path
import xmltodict
import os
import pandas as pd
import io
import datetime
import pymongo
import json
import gzip
import dns
from os import listdir
from os.path import isfile, join

urlBase='https://tisvcloud.freeway.gov.tw/history/vd/'  # 20190624/cms_value_0000.xml.gz   20190624/cms_value_0014.xml.gz
baseDir='../../../data/'
client = pymongo.MongoClient("mongodb://localhost:27017")
db = client.traffic
trType='vd'
#建立 unique index 以防重複 insert doc to mongodb, 並且加速查詢, 如果同樣的 compound unique index已經存在則不作用
db['vd1'].create_index([("@vdid",1),("@datacollecttime",1)],unique=True)
db['vd5'].create_index([("@vdid",1),("@datacollecttime",1)],unique=True)

'@vdid_1_@datacollecttime_1'

In [8]:
#確認該 url 是否可以下載, 例如 20190230 就不存在, 無法下載; 而且必須是附件形式的方可
def isDownloadable(url):
    """
    Does the url contain a downloadable resource
    """
    h = requests.head(url, allow_redirects=True)
    header = h.headers
    content_type = header.get('content-type')
    if content_type is None:
        return False
    if 'text' in content_type.lower():
        return False
    if 'html' in content_type.lower():
        return False
    return True

In [9]:
#下載某url檔案後, 放在指定目錄下
def downloadFileFromUrl(url, directory):
    filename = directory+'/'+ url.rsplit('/', 1)[1]
    if not os.path.exists(filename):
        r = requests.get(url, allow_redirects=True)
        open(filename, 'wb').write(r.content)

In [10]:
#抓取 CMS 的某一天每1分鐘一次 所有檔案 並下載到指定目錄
def downloadVD1MinDay(trType, day, baseDir):
    downloads=[]
    for hour in range(0,24):
        for mininute in range(0,60,1):  #vd 每5分鐘一次
            downloads.append(urlBase + day + '/' + trType + '_value_'+ format(hour, '02d')+ format(mininute, '02d') +'.xml.gz' )
    p=baseDir+trType+'1Min'+'/'+day
    Path(p).mkdir(parents=True, exist_ok=True)
    for url in downloads:
        if isDownloadable(url):
            downloadFileFromUrl(url,p)
            #print(url)  

In [11]:
#抓取 CMS 的某一天每5分鐘一次 所有檔案 並下載到指定目錄
def downloadVD5MinDay(trType, day, baseDir):
    downloads=[]
    for hour in range(0,24):
        for mininute in range(0,60,5):  #vd 每5分鐘一次
            downloads.append(urlBase + day + '/' + trType + '_value5_'+ format(hour, '02d')+ format(mininute, '02d') +'.xml.gz' )
    p=baseDir+trType+'/'+day
    Path(p).mkdir(parents=True, exist_ok=True)
    for url in downloads:
        if isDownloadable(url):
            downloadFileFromUrl(url,p)
            #print(url)  

In [18]:
def insertFiles2Mongo(files, directory, OneMinute=True):
    if OneMinute:
        collect=db['vd1']
    else:    
        collect=db['vd5']
    i=1
    for file in files:
        print(i,directory,file)
        i += 1
        f = gzip.open(directory+file, 'rb')
        doc = xmltodict.parse(f)
        Y=doc['XML_Head']['Infos']
        if Y is not None:
            X=Y['Info']
            #has datacollecttime, no need updateTime=datetime.datetime.strptime(doc['XML_Head']['@updatetime'], "%Y/%m/%d %H:%M:%S")
            json_data = json.dumps(X)
            cc = json.loads(json_data)
            tmpDF=pd.DataFrame(cc)
            tmpDF['@datacollecttime']=tmpDF['@datacollecttime'].apply(lambda x:datetime.datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
            tmpDF['lane']=tmpDF['lane'].apply(splitLane2DF)
            #print(tmpDF['lane'][0],'\n',tmpDF['lane'][1],'\n',tmpDF['lane'][2],'\n',tmpDF['lane'][3])
            #a=cc[1]
            #print(a['@vdid'],a['@datacollecttime'],a['@status'],'\n',splitLane2DF(a['lane'])) 
            records = tmpDF.to_dict('records')
            try:
                collect.insert_many(records)
            except Exception as err:
                print ("collect.insert_many ERROR:", err)    
        else:
            print('TypeError: ',Y)
        
        f.close()
        ######3##########TODO 

In [22]:
from urllib.parse import urlparse
def downnload1HourFiles2Mongo(trType, day, hour, baseDir, OneMinute=True):
    downloads=[]
    p=baseDir+trType+'1Min'+'/'+day+'/'
    if OneMinute:
        for mininute in range(0,60,1):  #vd 每1分鐘一次
            downloads.append(urlBase + day + '/' + trType + '_value_'+ format(hour, '02d')+ format(mininute, '02d') +'.xml.gz' )
        Path(p).mkdir(parents=True, exist_ok=True)
    else:
        for mininute in range(0,60,5):  #vd 每1分鐘一次
            downloads.append(urlBase + day + '/' + trType + '_value5_'+ format(hour, '02d')+ format(mininute, '02d') +'.xml.gz' )
        p=baseDir+trType+'5Min'+'/'+day+'/'
        Path(p).mkdir(parents=True, exist_ok=True)
    
    for url in downloads:
        if isDownloadable(url):
            downloadFileFromUrl(url,p)
            
            insertFiles2Mongo([os.path.basename(urlparse(url).path)], p, True)
    

In [24]:
downnload1HourFiles2Mongo('vd', '20190322', 19, baseDir, True)

1 ../../../data/vd1Min/20190322/ vd_value_1900.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1901.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1902.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1903.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1904.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1905.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1906.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1907.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1908.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1909.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1910.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1911.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1912.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1913.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1914.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1915.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1916.xml.gz
1 ../../../data/vd1Min/20190322/ vd_value_1917.xml.gz
1 ../../../data/vd1Min/20190

In [12]:
from pandas import json_normalize #package for flattening json in pandas df

def splitLane2DF(lanes):
    tmpDF=json_normalize(data=lanes, record_path='cars', meta=['@vsrid', '@speed','@laneoccupy'])
    records = tmpDF.to_dict('records')
    return records
 

In [None]:
          
def getAllFiles2Mongo(trType, day, baseDir, OneMinute=True):
    if OneMinute:
        directory=baseDir+trType+'1Min'+'/'+day+'/'
        collect = db['vd1']
    else:
        directory=baseDir+trType+'/'+day+'/'
    
    files = [f for f in listdir(directory) if isfile(join(directory, f))]
    insertFiles2Mongo(files, directory)

In [None]:
def download5MinInsert(trType, day, baseDir):
    downloadVD5MinDay(trType, day, baseDir)
    getAllFiles2Mongo(trType, day, baseDir, False)

In [None]:
#days=['0405','0406','0407','0606','0607','0608'] #,'0624','0625','0626','0627','0628','0202','0203','0204','0205','0206','0207','0208','0209','0210']#
#days=['1010','1011','1012','1013','0913','0914','0915']
days=['0322','1210','1218']#,'0307']
for d in days:
    #downloadVD5MinDay('vd','2019'+d,baseDir)
    getAllFiles2Mongo('vd','2019'+d,baseDir, False)

In [None]:
def download1MinInsert(trType, day, baseDir):
    downloadVD1MinDay(trType, day, baseDir)
    getAllFiles2Mongo(trType, day, baseDir, True)


In [None]:
days=['0322'] #,'1210','1218']#,'0307']
for d in days:
    download1MinInsert('vd','2019'+d,baseDir)