In [26]:
import pandas as pd
import os
import psycopg2
from sqlalchemy import create_engine
import glob
import logging
import duckdb
from pyspark.sql import SparkSession
from datetime import datetime, timedelta, date, time

In [27]:
CURRENT_PATH = os.getcwd()
SOURCE_DIR = "data"
PATH: str = CURRENT_PATH+"/"+SOURCE_DIR

USERNAME = "postgres"
PASSWORD = "Takaza00956"
HOST = "localhost"
PORT = "5432"
DATABASE = "etl"
DATABASE_URL = f"postgresql+psycopg2://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}"

forced_date = "2023-01-19"

In [28]:
logging.basicConfig( 
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

In [29]:
def extract_previous_date(date: str):
    date = datetime.strptime(date, "%Y-%m-%d")
    yesterday_date = date - timedelta(days=1)
    return yesterday_date

In [30]:
def extract_parquet(path: str, forced_date: str):
    previous_date = extract_previous_date(forced_date)
    year = previous_date.year
    month = previous_date.month if previous_date.month > 9 else "0"+str(previous_date.month)

    df = pd.read_parquet(f"{PATH}/fhvhv_tripdata_{year}-{month}.parquet")

    return df

In [31]:
def check_table_existed(database_name: str, table_name: str):
    conn = duckdb.connect(f"{database_name}.duckdb")
    result = conn.execute(f"""
        SELECT table_name 
        FROM information_schema.tables 
        WHERE table_name = '{table_name}'
    """).fetchall()
    conn.close()
    return 1 if len(result) > 0 else 0

In [32]:
def pipeline_1st(forced_date):
    df = extract_parquet(PATH, forced_date)

    previous_date = extract_previous_date(forced_date)
    transformed_df = df[df["request_datetime"]==previous_date]

    now = datetime.now()
    hour = now.hour
    minute = now.minute
    second = now.second

    calculated_at = datetime.combine(previous_date.date(), time(hour, minute, second))

    transformed_data = {
        "transaction_date": [previous_date],
        "total_transactions": [transformed_df.shape[0]],
        "calculated_at": [calculated_at]
    }

    processed_df = pd.DataFrame(transformed_data)

    pk = "transaction_date"
    database_name = "processed"
    table_name = "daily_transaction"
    is_table_existed = check_table_existed(database_name, table_name)
    query_string = ""

    if is_table_existed:
        query_string = f"""
            DELETE FROM {table_name}
            WHERE {pk} = '{previous_date}';

            INSERT INTO {table_name} 
            SELECT *
            FROM processed_df;
        """
    else:
        query_string = f"""
            CREATE TABLE {table_name} (
                transaction_date DATE,
                total_transactions INT,
                calculated_at TIMESTAMP,
                PRIMARY KEY ({pk})
            );

            INSERT INTO {table_name}
            SELECT * FROM processed_df
        """

    conn = duckdb.connect(f"{database_name}.duckdb")

    conn.execute(query_string)

    conn.close()

    return 1

In [33]:
def pipeline_2nd(forced_date):
    df = extract_parquet(PATH, forced_date)

    transformed_df = df[df["request_datetime"]==forced_date]
    transformed_df = transformed_df.groupby("PULocationID").size().reset_index(name="count")
    transformed_df.rename(columns={"PULocationID": "taxi_zone_id"}, inplace=True)
    transformed_df["rank"] = transformed_df["count"].rank(method="dense", ascending=False)
    transformed_df["rank"] = transformed_df["rank"].astype(int)
    transformed_df = transformed_df[transformed_df["rank"]<=5]

    forced_date = datetime.strptime(forced_date, "%Y-%m-%d")

    now = datetime.now()
    hour = now.hour
    minute = now.minute
    second = now.second

    calculated_at = datetime.combine(forced_date.date(), time(hour, minute, second))

    transformed_df["calculated_at"] = calculated_at

    processed_df = transformed_df[["taxi_zone_id", "rank", "calculated_at"]]
    
    pk = ["taxi_zone_id", "calculated_at"]
    database_name = "processed"
    table_name = "daily_topfive_taxi_zone"
    is_table_existed = check_table_existed(database_name, table_name)
    query_string = ""
    where_condition = " AND ".join(f"incoming.{column} = current.{column}" for column in pk)

    if is_table_existed:
        query_string = f"""
            DELETE FROM {table_name} current
            WHERE EXISTS (
                SELECT {', '.join(pk)}
                FROM processed_df incoming
                WHERE {where_condition}
            );

            INSERT INTO {table_name} 
            SELECT *
            FROM processed_df;
        """
    else:
        query_string = f"""
            CREATE TABLE {table_name} (
                taxi_zone_id INT,
                rank INT,
                calculated_at TIMESTAMP,
                PRIMARY KEY ({','.join(pk)})
            );

            INSERT INTO {table_name}
            SELECT * FROM processed_df
        """

    conn = duckdb.connect(f"{database_name}.duckdb")

    conn.execute(query_string)

    conn.close()
    return 1

In [34]:
result = pipeline_1st(forced_date)
result = pipeline_2nd(forced_date)


In [35]:
conn = duckdb.connect(f"processed.duckdb")
result2 = conn.execute(f"SELECT * FROM daily_topfive_taxi_zone").fetch_df()

result1 = conn.execute(f"SELECT * FROM daily_transaction").fetch_df()
result1


Unnamed: 0,transaction_date,total_transactions,calculated_at
0,2023-01-15,13,2023-01-15 18:25:13
1,2023-01-16,17,2023-01-16 18:27:21
2,2023-01-17,7,2023-01-17 18:33:14


In [36]:
result2

Unnamed: 0,taxi_zone_id,rank,calculated_at
0,69,1,2023-01-17 18:28:07
1,125,1,2023-01-17 18:28:07
2,164,1,2023-01-17 18:28:07
3,211,1,2023-01-17 18:28:07
4,221,1,2023-01-17 18:28:07
5,239,1,2023-01-17 18:28:07
6,243,1,2023-01-17 18:28:07
7,67,2,2023-01-18 18:33:58
8,85,2,2023-01-18 18:33:58
9,125,2,2023-01-18 18:33:58


In [37]:
pk = ["taxi_zone_id", "calculated_at"]
where_condition = " AND ".join(f"incoming.{column} = current.{column}" for column in pk)
where_condition

'incoming.taxi_zone_id = current.taxi_zone_id AND incoming.calculated_at = current.calculated_at'