In [1]:
import requests
import pandas as pd
import json
from dotenv import load_dotenv
import os
from datetime import datetime
import re
from sqlalchemy import create_engine, exc
from zoneinfo import ZoneInfo
from io import StringIO
from google.cloud import storage


# mrt_usage_history
# get csv download of every month's data
# each url can get one month's data


def E_mrt_usage_history_csvfilelist():
    url = "https://data.taipei/api/dataset/63f31c7e-7fc3-418b-bd82-b95158755b4d/resource/eb481f58-1238-4cff-8caa-fa7bb20cb4f4/download"
    response = requests.get(url=url)
    response_list = response.text.split("\r")

    col_name = response_list[0].split(",")
    url_df = pd.concat([pd.DataFrame([response_list[i].split(
        ",")[1:]], columns=col_name[1:]) for i in range(1, len(response_list))], axis=0)
    url_df.reset_index(drop=True, inplace=True)
    print("E_mrt_usage_history_csvfilelist finished")
    return (url_df)


# def T_mrt_usage_history_one_month_apply_reduce(url):
#     response = requests.get(url=url)
#     StringIO_df = StringIO(response.content.decode("utf-8-sig"))
#     df = pd.read_csv(StringIO_df)
#     pattern = re.compile(r"[A-Za-z]+")
#     df["進站"] = df["進站"].str.replace(pattern, "", regex=True)
#     df["出站"] = df["出站"].str.replace(pattern, "", regex=True)
#     df_enter = pd.DataFrame(df.groupby(["日期", "時段", "進站"])[
#                             "人次"].sum()).reset_index(drop=False)
#     df_out = pd.DataFrame(df.groupby(["日期", "時段", "出站"])[
#         "人次"].sum()).reset_index(drop=False)
#     df_enter.rename(columns={
#         "日期": "date",
#         "時段": "hour",
#         "進站": "mrt_station_name",
#         "人次": "enter_count"
#     }, inplace=True)

#     df_out.rename(columns={
#         "日期": "date",
#         "時段": "hour",
#         "出站": "mrt_station_name",
#         "人次": "exit_count"
#     }, inplace=True)
#     df = df_enter.merge(df_out,
#                         left_on=["date", "hour", "mrt_station_name"],
#                         right_on=["date", "hour", "mrt_station_name"],
#                         how="outer")
#     print("T_mrt_usage_history_one_month finished")
#     return (df)


def T_mrt_usage_history_one_month(url: str):
    response = requests.get(url=url)
    StringIO_df = StringIO(response.content.decode("utf-8-sig"))
    df = pd.read_csv(StringIO_df)
    pattern = re.compile(r"[A-Za-z]+")
    df["進站"] = df["進站"].str.replace(pattern, "", regex=True)
    df["出站"] = df["出站"].str.replace(pattern, "", regex=True)
    df.rename(columns={
        "日期": "date",
        "時段": "hour",
        "進站": "mrt_station_name_enter",
        "出站": "mrt_station_name_exit",
        "人次": "visitors_num"
    }, inplace=True)
    print(f"T_mrt_usage_history finished")
    return (df)


def T_mrt_usage_history_one_month_recuce(df: pd.DataFrame):
    df_enter = pd.DataFrame(df.groupby(["date", "hour", "mrt_station_name_enter"])[
                            "visitors_num"].sum()).reset_index(drop=False)
    df_out = pd.DataFrame(df.groupby(["date", "hour", "mrt_station_name_exit"])[
        "visitors_num"].sum()).reset_index(drop=False)
    df = df_enter.merge(df_out,
                        left_on=["date", "hour", "mrt_station_name_enter"],
                        right_on=["date", "hour", "mrt_station_name_exit"],
                        how="outer", suffixes=["_enter", "_exit"])
    df["mrt_station_name"] = df["mrt_station_name_exit"].combine_first(
        df["mrt_station_name_enter"])
    df = df.loc[:, ["date", "hour", "mrt_station_name",
                    "visitors_num_enter", "visitors_num_exit"]]
    return (df)


def L_mrt_usage_history(df: pd.DataFrame):
    username_sql = os.getenv("ANDY_USERNAME_SQL")
    password_sql = os.getenv("ANDY_PASSWORD_SQL")
    # server = "host.docker.internal:3306"  #docker用
    server = "localhost:3306"
    db_name = "group2_db"
    try:
        with create_engine(f"mysql+pymysql://{username_sql}:{password_sql}@{server}/{db_name}",).connect() as conn:
            df.to_sql(
                name="mrt_usage_history",
                con=conn,
                if_exists="append",
                index=False
            )
        print(f"L_mrt_usage_history finished")
        return ("L_mrt_usage_history finished")
    except:
        print("loading to sql fail")





In [2]:
url_df = E_mrt_usage_history_csvfilelist()

E_mrt_usage_history_csvfilelist finished


In [3]:
url_df

Unnamed: 0,年月,URL
0,201701,http://tcgmetro.blob.core.windows.net/stationo...
1,201702,http://tcgmetro.blob.core.windows.net/stationo...
2,201703,http://tcgmetro.blob.core.windows.net/stationo...
3,201704,http://tcgmetro.blob.core.windows.net/stationo...
4,201705,http://tcgmetro.blob.core.windows.net/stationo...
...,...,...
83,202312,http://tcgmetro.blob.core.windows.net/stationo...
84,202401,http://tcgmetro.blob.core.windows.net/stationo...
85,202402,http://tcgmetro.blob.core.windows.net/stationo...
86,202403,http://tcgmetro.blob.core.windows.net/stationo...


In [4]:
from google.cloud import storage
GCS_CREDENTIALS_FILE_PATH = r"D:\data_engineer\dev_TIR_group2\Taipei-transit-data_hub\airflow\dags\harry_GCS_BigQuery_write_cred.json"
#GCS_CREDENTIALS_FILE_PATH = r"C:\dev_TIR101\Taipei-transit-data_hub\airflow\dags\harry_GCS_BigQuery_write_cred.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GCS_CREDENTIALS_FILE_PATH
GCS_CLIENT = storage.Client()


In [5]:
def upload_to_bucket_string(df: pd.DataFrame, blob_name: str, bucket_name: str, storage_client: storage.Client):
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)
    csv_string = df.to_csv(index=False, encoding="utf-8-sig")
    blob.upload_from_string(csv_string)
    print(blob)
    return blob

In [6]:
def upload_df_to_gcs(
    client: storage.Client,
    bucket_name: str,
    blob_name: str,
    df: pd.DataFrame,
) -> bool:
    """
    Upload a pandas dataframe to GCS.

    Args:
        client (storage.Client): The client to use to upload to GCS.
        bucket_name (str): The name of the bucket to upload to.
        blob_name (str): The name of the blob to upload to.
        df (pd.DataFrame): The dataframe to upload.
        filetype (str): The type of the file to download. Default is "parquet".
                        Can be "parquet" or "csv" or "jsonl".

    Returns:
        bool: True if the upload was successful, False otherwise.
    """
    bucket = client.bucket(bucket_name)

    blob = bucket.blob(blob_name)
    if blob.exists():
        print("File already exists in GCP.")
        return False
    try:
        blob.upload_from_string(
            df.to_csv(index=False), content_type="text/csv")
        return True
    except Exception as e:
        raise Exception(f"Failed to upload pd.DataFrame to GCS, reason: {e}")

In [7]:
def E_mrt_usage_history_one_month(url: str):
    response = requests.get(url=url)
    StringIO_df = StringIO(response.content.decode("utf-8-sig"))
    df = pd.read_csv(StringIO_df)
    pattern = re.compile(r"[A-Za-z]+")
    # df["進站"] = df["進站"].str.replace(pattern, "", regex=True)
    # df["出站"] = df["出站"].str.replace(pattern, "", regex=True)
    df.rename(columns={
        "日期": "date",
        "時段": "hour",
        "進站": "mrt_station_name_enter",
        "出站": "mrt_station_name_exit",
        "人次": "visitors_num"
    }, inplace=True)
    print(f"T_mrt_usage_history finished")
    return (df)

In [8]:
start_month = url_df[url_df["年月"]=="201904"].index.tolist()[0]
start_month

27

In [9]:
for i in range(start_month,0,-1):
    month = url_df.loc[i, "年月"]
    url = url_df.loc[i, "URL"]
    print(f"{month}  is downloading")
    df = E_mrt_usage_history_one_month(url=url)
    print(f"{month} download finished")
    print(f"{month} is being uploaded to GCS")
    # upload_to_bucket_string(
    #     df=df, bucket_name="mrt_history_usage", blob_name=f"{month}_mrt_history_usage.csv", storage_client=GCS_CLIENT)
    upload_df_to_gcs(
        client=GCS_CLIENT,
        bucket_name="mrt_history_usage",
        blob_name=f"{month}_mrt_history_usage.csv",
        df=df,
    ) 
    print(f"{month} has been sucessfully uploaded to GCS")

201904  is downloading
T_mrt_usage_history finished
201904 download finished
201904 is being uploaded to GCS
201904 has been sucessfully uploaded to GCS
201903  is downloading
T_mrt_usage_history finished
201903 download finished
201903 is being uploaded to GCS
201903 has been sucessfully uploaded to GCS
201902  is downloading
T_mrt_usage_history finished
201902 download finished
201902 is being uploaded to GCS
201902 has been sucessfully uploaded to GCS
201901  is downloading
T_mrt_usage_history finished
201901 download finished
201901 is being uploaded to GCS
201901 has been sucessfully uploaded to GCS
201812  is downloading
T_mrt_usage_history finished
201812 download finished
201812 is being uploaded to GCS
201812 has been sucessfully uploaded to GCS
201811  is downloading
T_mrt_usage_history finished
201811 download finished
201811 is being uploaded to GCS
201811 has been sucessfully uploaded to GCS
201810  is downloading
T_mrt_usage_history finished
201810 download finished
20181

In [15]:
f"{month}_mrt_history_usage.csv"

'202404_mrt_history_usage.csv'

In [16]:
upload_df_to_gcs(
    client=GCS_CLIENT,
    bucket_name="mrt_history_usage",
    blob_name=f"{month}_mrt_history_usage.csv",
    df=df,
) 

True

In [5]:
# url_df = E_mrt_usage_history_csvfilelist()
# for i in range(0, 2):
for i in range(len(url_df)-1,0,-1):
    month = url_df.loc[i, "年月"]
    url = url_df.loc[i, "URL"]
    file_save_path = r"D:\data_engineer\TIR_group2\TIR101_Group2\DA\data\MRT\mrt_usage_history"
    # file_save_path = r"C:\TIR101_Group2\DA\data\MRT\mrt_usage_history"
    filename_full = f"{month}_full_mrt_usage_history.csv"
    filename_reduce = f"{month}_reduce_mrt_usage_history.csv"
    filename_full_save_path = os.path.join(file_save_path,"full",filename_full)
    filename_reduce_save_path = os.path.join(file_save_path,"reduce",filename_reduce)
    try:
        print(f"{month} prepare to download")

        T_df = T_mrt_usage_history_one_month(url=url)
        print(f"{month} download finished")
        print(f"{month} saving csv")
        
        T_df.to_csv(filename_full_save_path,encoding="utf-8-sig",index=False)

        print(f"{month} csv has been saved")
        print(f"{month} reducing(group by)")

        reduce_df = T_mrt_usage_history_one_month_recuce(T_df)
        reduce_df.to_csv(filename_reduce_save_path,encoding="utf-8-sig",index=False)
        print(f"{month} reducing file has been saved")
    except:
        print(f"ERROR {month} fail", sep=" ")
        continue
    

202403 prepare to download
T_mrt_usage_history finished
202403 download finished
202403 saving csv
202403 csv has been saved
202403 reducing(group by)
202403 reducing file has been saved
202402 prepare to download
T_mrt_usage_history finished
202402 download finished
202402 saving csv
202402 csv has been saved
202402 reducing(group by)
202402 reducing file has been saved
202401 prepare to download
T_mrt_usage_history finished
202401 download finished
202401 saving csv
202401 csv has been saved
202401 reducing(group by)
202401 reducing file has been saved
202312 prepare to download
T_mrt_usage_history finished
202312 download finished
202312 saving csv
202312 csv has been saved
202312 reducing(group by)
202312 reducing file has been saved
202311 prepare to download
T_mrt_usage_history finished
202311 download finished
202311 saving csv
202311 csv has been saved
202311 reducing(group by)
202311 reducing file has been saved
202310 prepare to download
T_mrt_usage_history finished
202310 do