In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

import requests
import concurrent.futures
from tqdm.notebook import tqdm
import gzip
import shutil
import xml.etree.ElementTree as ET
import zipfile
from datetime import datetime, timedelta
import pytz
import gc

# Specify the path to the directory you want to create
directory_path = r"D:\VD_data"

# Check if the directory already exists
if not os.path.exists(directory_path):
    # Create the directory if it does not exist
    os.makedirs(directory_path)
    print(f"Directory '{directory_path}' created successfully.")
else:
    print(f"Directory '{directory_path}' already exists.")

Directory 'D:\VD_data' already exists.


In [2]:
def download_file(url, file_path, log_file_path):
    """Download a single file, check its size, and return the status."""
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raise an error for bad status codes
        with open(file_path, 'wb') as file:
            file.write(response.content)

        # Check file size (< 1KB)
        if os.path.getsize(file_path) < 1024:
            os.remove(file_path)
            with open(log_file_path, 'a') as log_file:
                log_file.write(f'Deleted: File too small (<1KB): {url}\n')
            print(f'Deleted: {url} (File too small)')
            return url, 'small'
        #print(f'Downloaded: {url}')
        return url, True
    except requests.RequestException as e:
        with open(log_file_path, 'a') as log_file:
            log_file.write(f'Failed to download {url}: {e}\n')
        print(f'Failed to download: {url}')
        return url, False

def download_files_for_day(directory_path, date, max_concurrent_downloads=10):
    print(f"Starting download for date: {date}")
    base_folder_path = os.path.join(directory_path, date)
    compressed_folder_path = os.path.join(base_folder_path, 'compressed')
    os.makedirs(compressed_folder_path, exist_ok=True)
    log_file_path = os.path.join(base_folder_path, 'download_issues.log')

    # Prepare the download tasks
    download_tasks = []
    skipped_files = 0
    for hour in range(24):
        for minute in range(60):
            current_time = f'{hour:02d}{minute:02d}'
            url = f'https://tisvcloud.freeway.gov.tw/history/motc20/VD/{date}/VDLive_{current_time}.xml.gz'
            file_path = os.path.join(compressed_folder_path, f'VDLive_{current_time}.xml.gz')
            if os.path.exists(file_path):
                skipped_files += 1
                #print(f'Skipped: {url} (File already exists)')
            else:
                download_tasks.append((url, file_path))
    if skipped_files > 0:
        print(f'Skipped {skipped_files} files. (File already exists)')

    # Download files concurrently with a progress bar
    failed_downloads = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_downloads) as executor, tqdm(total=len(download_tasks) + skipped_files) as progress:
        progress.update(skipped_files)
        future_to_url = {executor.submit(download_file, url, file_path, log_file_path): url for url, file_path in download_tasks}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                _, result = future.result()
                if result != True:
                    failed_downloads.append((url, os.path.join(compressed_folder_path, url.split('/')[-1])))
                progress.update(1)
            except Exception as e:
                failed_downloads.append((url, os.path.join(compressed_folder_path, url.split('/')[-1])))
                print(f'Error during download: {url}')
                progress.update(1)

    # Retry failed downloads
    if len(failed_downloads) > 0:
        print("Retrying failed downloads...")
        with tqdm(total=len(failed_downloads)) as progress:
            for url, file_path in failed_downloads:
                _, result = download_file(url, file_path, log_file_path)
                if result != True:
                    with open(log_file_path, 'a') as log_file:
                        log_file.write(f'Failed to download on retry: {url}\n')
                    print(f'Failed to download on retry: {url}')
                progress.update(1)

    print("Download process completed.")

def decompress_files(directory_path, date):
    base_folder_path = os.path.join(directory_path, date)
    compressed_folder_path = os.path.join(base_folder_path, 'compressed')
    decompressed_folder_path = os.path.join(base_folder_path, 'decompressed')
    log_file_path = os.path.join(base_folder_path, 'download_issues.log')
    
    os.makedirs(decompressed_folder_path, exist_ok=True)

    # List all .xml.gz files in the compressed folder
    compressed_files = [f for f in os.listdir(compressed_folder_path) if f.endswith('.xml.gz')]
    total_files = len(compressed_files)
    print("Decompressing xml.gz files...")

    # Progress bar setup
    with tqdm(total=total_files) as progress:
        for file in compressed_files:
            compressed_file_path = os.path.join(compressed_folder_path, file)
            decompressed_file_path = os.path.join(decompressed_folder_path, file[:-3])  # Remove .gz from filename

            # Skip if decompressed file already exists
            if os.path.exists(decompressed_file_path):
                print(f'Skipped: {file} (Already decompressed)')
                progress.update(1)
                continue

            try:
                # Decompress file
                with gzip.open(compressed_file_path, 'rb') as f_in, open(decompressed_file_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
                #print(f'Decompressed: {file}')
            except Exception as e:
                with open(log_file_path, 'a') as log_file:
                    log_file.write(f'Failed to decompress {file}: {e}\n')
                print(f'Failed to decompress: {file}')
            progress.update(1)

    print("Decompression process completed.")

#############################################################################################################

def parse_xml_file(file_path, namespace):
    """
    Parse an XML file using iterparse for better memory management and return a list of dictionaries
    of data, flattened for easy CSV conversion.
    """
    data_dict = {}

    for event, elem in ET.iterparse(file_path, events=('end',)):
        if elem.tag == f"{{{namespace['ns1']}}}VDLive":
            vdid = elem.find(f".//{{{namespace['ns1']}}}VDID").text if elem.find(f".//{{{namespace['ns1']}}}VDID") is not None else ''

            if vdid not in data_dict:
                data_dict[vdid] = {}

            for lane in elem.findall(f".//{{{namespace['ns1']}}}Lane"):
                lane_id = lane.find(f".//{{{namespace['ns1']}}}LaneID").text if lane.find(f".//{{{namespace['ns1']}}}LaneID") is not None else ''
                speed = lane.find(f".//{{{namespace['ns1']}}}Speed").text if lane.find(f".//{{{namespace['ns1']}}}Speed") is not None else ''
                occupancy = lane.find(f".//{{{namespace['ns1']}}}Occupancy").text if lane.find(f".//{{{namespace['ns1']}}}Occupancy") is not None else ''

                lane_key = f'L{lane_id}'
                if lane_key not in data_dict[vdid]:
                    data_dict[vdid][lane_key] = {}

                data_dict[vdid][lane_key].update({
                    'Speed': speed,
                    'Occupancy': occupancy,
                })

                for vehicle in lane.findall(f".//{{{namespace['ns1']}}}Vehicle"):
                    vehicle_type = vehicle.find(f".//{{{namespace['ns1']}}}VehicleType").text if vehicle.find(f".//{{{namespace['ns1']}}}VehicleType") is not None else ''
                    volume = vehicle.find(f".//{{{namespace['ns1']}}}Volume").text if vehicle.find(f".//{{{namespace['ns1']}}}Volume") is not None else ''
                    speed2 = vehicle.find(f".//{{{namespace['ns1']}}}Speed").text if vehicle.find(f".//{{{namespace['ns1']}}}Speed") is not None else ''

                    data_dict[vdid][lane_key].update({
                        f'{vehicle_type}_Volume': volume,
                        f'{vehicle_type}_Vehicle_Speed': speed2,
                    })

            # Clear the element to free memory
            elem.clear()

    # Flattening the data structure for CSV conversion
    flattened_data = []
    for vdid, lanes in data_dict.items():
        row = {'VDID': vdid}
        for lane_id, details in lanes.items():
            for key, value in details.items():
                row[f'{lane_id}_{key}'] = value
        flattened_data.append(row)

    return flattened_data


def process_file(file_name, input_dir, output_dir, namespace):
    try:
        # Construct full paths for input and output files
        file_path = os.path.join(input_dir, file_name)
        output_file = os.path.join(output_dir, file_name.replace('.xml', '.csv'))

        # Check if corresponding CSV file already exists
        if os.path.exists(output_file):
            print(f"Skipping {file_name} as CSV already exists.")
            return

        # Convert XML to DataFrame
        flattened_data = parse_xml_file(file_path, namespace)
        df = pd.DataFrame(flattened_data)

        # Save to CSV, skipping index
        df.to_csv(output_file, index=False)
        return file_name
    except Exception as e:
        print(f"Error converting file {file_name}: {e}")
        return None

def convert_xml_to_csv(directory_path, date):
    # Implementation remains mostly the same as before
    # The function now defaults to using 16 worker threads

    input_dir = os.path.join(directory_path, date, "decompressed")
    output_dir = os.path.join(directory_path, date, "csv")

    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Define your XML namespace
    namespace = {'ns1': 'http://traffic.transportdata.tw/standard/traffic/schema/'}

    # List all XML files in the input directory
    xml_files = [f for f in os.listdir(input_dir) if f.endswith('.xml')]
    total_files = len(xml_files)

    # Use ThreadPoolExecutor with a specified number of workers to process files concurrently
    with tqdm(total=total_files) as progress:
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            # Prepare futures for all files
            futures = [executor.submit(process_file, file_name, input_dir, output_dir, namespace) for file_name in xml_files]

            # Process futures as they complete
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result:
                    progress.update(1)



#############################################################################################################

def process_csv_files(directory_path, date):
    # Define input and output directories based on the provided date
    input_directory = os.path.join(directory_path, date, "csv")
    output_directory = os.path.join(directory_path, date, "VDID")

    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Initialize an empty list to store DataFrames
    dfs = []

    # List all CSV files in the input directory
    csv_files = [f for f in os.listdir(input_directory) if f.endswith('.csv')]
    
    print(f"Processing {len(csv_files)} CSV files:")
    
    with tqdm(total=len(csv_files), unit='file') as pbar_files:
        for filename in csv_files:
            try:
                # Read the CSV file and insert the 'file_name' column at the beginning
                df = pd.read_csv(os.path.join(input_directory, filename))
                df.insert(0, 'file_name', filename)
                
                # Append the DataFrame to the list
                dfs.append(df)
                
                pbar_files.update(1)
            except Exception as e:
                # Print an error message and continue processing other files
                print(f'Error processing file {filename}: {e}')

    # Concatenate all DataFrames in the list to create the combined DataFrame
    combined_df = pd.concat(dfs, ignore_index=True)

    # Clear the list of individual DataFrames to release memory
    dfs.clear()
    gc.collect()  # Manually trigger garbage collection

    # Group the combined DataFrame by 'VDID'
    groups = combined_df.groupby('VDID')
    
    print(f"\nSaving {len(groups)} VDID-specific CSV files:")
    
    with tqdm(total=len(groups), unit='VDID') as pbar_vdids:
        for vdid, group_df in groups:
            try:
                # Save the group-specific data to a CSV file in the output directory
                group_df.to_csv(os.path.join(output_directory, f'{vdid}.csv'), index=False)
                
                pbar_vdids.update(1)
            except Exception as e:
                # Print an error message if saving fails
                print(f'Error saving VDID {vdid}: {e}')
    
    print(f"\n{len(groups)} VDID-specific CSV files saved.")

    # Clear variables holding large data and manually collect garbage again
    del combined_df, groups
    gc.collect()



def delete_files(directory_path, date, delete_compressed, delete_decompressed, delete_csv):
    # Define the directory paths based on the input date
    compressed_directory = os.path.join(directory_path, date, 'compressed')
    decompressed_directory = os.path.join(directory_path, date, 'decompressed')
    csv_directory = os.path.join(directory_path, date, 'csv')
    
    # Helper function to delete files in a directory
    def delete_files_in_directory(directory):
        if os.path.exists(directory):
            file_list = os.listdir(directory)
            for file in file_list:
                file_path = os.path.join(directory, file)
                try:
                    if os.path.isfile(file_path):
                        os.remove(file_path)
                except Exception as e:
                    print(f"Error deleting file: {file_path} ({e})")
        print(f"Deleted file: {directory}")
    
    # Delete files in the specified directories based on the parameter values
    if delete_compressed == 1:
        delete_files_in_directory(compressed_directory)
    
    if delete_decompressed == 1:
        delete_files_in_directory(decompressed_directory)
    
    if delete_csv == 1:
        delete_files_in_directory(csv_directory)

def zip_output(directory_path, date, delete_files_sp_zip=0):
    try:
        # Construct the path to the directory to zip
        dir_to_zip = os.path.join(directory_path, date)
        
        # Check if the directory exists
        if not os.path.exists(dir_to_zip):
            print(f"Directory {dir_to_zip} does not exist.")
            return
        
        # Output zip file path
        output_zip_path = f"{dir_to_zip}.zip"
        
        # Name of the root folder within the zip file
        root_folder_name = os.path.basename(dir_to_zip)
        
        # Gather all files to zip
        files_to_zip = []
        for root, dirs, files in os.walk(dir_to_zip):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.join(root_folder_name, os.path.relpath(file_path, dir_to_zip))
                files_to_zip.append((file_path, arcname))
        
        file_count = len(files_to_zip)
        
        # Notify user about the zipping process
        print(f"Zipping {file_count} files in {dir_to_zip}, please wait...")
        
        with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            # Wrap the files_to_zip list with tqdm for a progress bar
            for file_path, arcname in tqdm(files_to_zip, desc="Zipping"):
                zipf.write(file_path, arcname)
        
        # Notify completion
        print(f"Completed zipping directory {dir_to_zip} into {output_zip_path}")
        
        # Delete the original directory if delete_files_sp_zip equals 1
        if delete_files_sp_zip == 1:
            shutil.rmtree(dir_to_zip)
            print(f"Deleted directory {dir_to_zip}")
        
    except Exception as e:
        print(f"An error occurred: {e}")



def get_yesterday_date(timezone):
    # Convert the current time to the specified timezone
    tz = pytz.timezone(timezone)
    now_in_timezone = datetime.now(tz)
    
    # Calculate yesterday's date
    yesterday_in_timezone = now_in_timezone - timedelta(days=1)
    
    # Format yesterday's date as "YYYYMMDD"
    return yesterday_in_timezone.strftime("%Y%m%d")

# Set timezone to Taipei
timezone = "Asia/Taipei"
yesterday_date = get_yesterday_date(timezone)
print(f"yesterday date: {yesterday_date}")


# Main Program

def fetch_vd(directory_path, date, delete_compressed, delete_decompressed, delete_csv, delete_files_sp_zip):
    download_files_for_day(directory_path, date, max_concurrent_downloads=10)
    decompress_files(directory_path, date)
    convert_xml_to_csv(directory_path, date)
    process_csv_files(directory_path, date)
    delete_files(directory_path, date, delete_compressed, delete_decompressed, delete_csv)
    zip_output(directory_path, date, delete_files_sp_zip)
    
    
def batch_fetch_vd(start_date, num_days_backwards, directory_path, delete_compressed, delete_decompressed, delete_csv, delete_files_sp_zip):
    # Convert start_date string to datetime object
    date_format = "%Y%m%d"
    current_date = datetime.strptime(start_date, date_format)
    
    # Iterate backwards from start_date for num_days_backwards
    for _ in range(num_days_backwards):
        # Convert current_date back to string and call fetch_vd
        formatted_date = current_date.strftime(date_format)
        fetch_vd(directory_path, formatted_date, delete_compressed, delete_decompressed, delete_csv, delete_files_sp_zip)
        
        # Decrement the day by one
        current_date -= timedelta(days=1)

yesterday date: 20240314


In [3]:
#fetch_vd(directory_path, "20240228", 1, 1, 0, 1) # date, delete_compressed, delete_decompressed, delete_csv, delete_files_sp_zip

#fetch_vd(directory_path, yesterday_date, 1, 1, 0, 0)

# Example usage
# You need to replace "your_directory_path_here" with the actual directory path.
# Also, adjust the boolean flags as needed for your use case.
batch_fetch_vd("20240523", 23, directory_path, delete_compressed=True, delete_decompressed=True, delete_csv=False, delete_files_sp_zip=True)

Starting download for date: 20240523


  0%|          | 0/1440 [00:00<?, ?it/s]

Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0007.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0000.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0006.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0001.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0008.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0009.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0002.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0003.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0004.xml.gz
Failed to download: https://tisvcloud.freeway.gov.tw/history/motc20/VD/20240523/VDLive_0005.xml.gz
Failed to 