In [1]:
import pandas as pd
from sqlalchemy import create_engine
from credentials.db_info import DB_INFO
pd.set_option('display.max_rows', None)

In [2]:
def read_parquet_to_df(file_path):
    # Đọc file parquet và chuyển thành pandas dataframe
    df = pd.read_parquet(file_path)
    return df

In [3]:
# Function to insert data into the 'dim_product' table
def insert_data_to_product_table(df, DB_INFO):
    # Select necessary columns from the DataFrame
    adjusted_df = df[['product_id', 'category', 'sub_category', 'product_name', 'buying_price', 'selling_price']].drop_duplicates(subset=['product_id'])

    # Create a connection to the database
    engine = create_engine(f"postgresql://{DB_INFO['DB_USER']}:{DB_INFO['DB_PASSWORD']}@{DB_INFO['DB_HOST']}/{DB_INFO['DB_DATABASE']}")

    # Create an SQL statement to create the 'dim_product' table with necessary columns and data types
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {DB_INFO['DB_SCHEMA']}.dim_product (
        product_id VARCHAR PRIMARY KEY,
        category VARCHAR,
        sub_category VARCHAR,
        product_name VARCHAR
    )
    """
    # Execute the create table query
    with engine.begin() as transaction:
        transaction.execute(create_table_query)

    # Insert data from the DataFrame into the 'dim_product' table, with 'product_id' as the primary key
    for index, row in adjusted_df.iterrows():
        # Replace single quotes with two single quotes in the product_name value
        product_name = row['product_name'].replace("'", "").replace("%","percent")
        insert_query = f"""
        INSERT INTO {DB_INFO['DB_SCHEMA']}.dim_product (product_id, category, sub_category, product_name)
        VALUES ('{row['product_id']}', '{row['category']}', '{row['sub_category']}', '{product_name}')
        ON CONFLICT (product_id) DO NOTHING
        """
        # Execute the insert query
        with engine.begin() as transaction:
            transaction.execute(insert_query)

In [4]:
# Function to insert data into the 'dim_customer' table
def insert_data_to_customer_table(df, DB_INFO):
    # Select necessary columns from the DataFrame
    adjusted_df = df[['customer_id', 'customer_name', 'birth_date', 'phone_number']].drop_duplicates(subset=['customer_id'])

    # Create a connection to the database
    engine = create_engine(f"postgresql://{DB_INFO['DB_USER']}:{DB_INFO['DB_PASSWORD']}@{DB_INFO['DB_HOST']}/{DB_INFO['DB_DATABASE']}")

    # Create an SQL statement to create the 'dim_customer' table with necessary columns and data types
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {DB_INFO['DB_SCHEMA']}.dim_customer (
        customer_id VARCHAR PRIMARY KEY,
        customer_name VARCHAR,
        birth_date DATE,
        phone_number INTEGER
    )
    """
    # Execute the create table query
    with engine.begin() as transaction:
        transaction.execute(create_table_query)

    # Insert data from the DataFrame into the 'dim_customer' table, with 'customer_id' as the primary key
    for index, row in adjusted_df.iterrows():
        insert_query = f"""
        INSERT INTO {DB_INFO['DB_SCHEMA']}.dim_customer (customer_id, customer_name, birth_date, phone_number)
        VALUES ('{row['customer_id']}', '{row['customer_name']}', '{row['birth_date']}', '{row['phone_number']}')
        ON CONFLICT (customer_id) DO NOTHING
        """
        # Execute the insert query
        with engine.begin() as transaction:
            transaction.execute(insert_query)

In [5]:
def insert_data_to_address_table(df, DB_INFO):
    # Select necessary columns from the DataFrame
    adjusted_df = df[['address_id', 'province', 'district', 'ward', 'ship_cost']].drop_duplicates(subset=['address_id'])

    # Create a connection to the database
    engine = create_engine(f"postgresql://{DB_INFO['DB_USER']}:{DB_INFO['DB_PASSWORD']}@{DB_INFO['DB_HOST']}/{DB_INFO['DB_DATABASE']}")

    # Create an SQL statement to create the 'dim_address' table with necessary columns and data types
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {DB_INFO['DB_SCHEMA']}.dim_address (
        address_id VARCHAR PRIMARY KEY,
        province VARCHAR,
        district VARCHAR,
        ward VARCHAR,
        ship_cost INTEGER
    )
    """
    # Execute the create table query
    with engine.begin() as transaction:
        transaction.execute(create_table_query)

    # Insert data from the DataFrame into the 'dim_address' table, with 'address_id' as the primary key
    for index, row in adjusted_df.iterrows():
        # Replace single quotes with double quotes
        province = row['province'].replace("'", "")
        district = row['district'].replace("'", "") if row['district'] is not None else row['district']
        ward = row['ward'].replace("'", "") if row['ward'] is not None else row['ward']

        insert_query = f"""
        INSERT INTO {DB_INFO['DB_SCHEMA']}.dim_address (address_id, province, district, ward, ship_cost)
        VALUES ('{row['address_id']}', '{province}', '{district}', '{ward}', '{row['ship_cost']}')
        ON CONFLICT (address_id) DO NOTHING
        """
        # Execute the insert query
        with engine.begin() as transaction:
            transaction.execute(insert_query)

In [6]:
def insert_data_to_order_table(df, DB_INFO):
    # Select necessary columns from the DataFrame
    adjusted_df = df[['order_id', 'order_date', 'ship_date', 'customer_id', 'address_id', 'product_id', 'product_number', 'revenue', 'cost', 'discount', 'profit']].drop_duplicates(subset=['order_id'])

    # Create a connection to the database
    engine = create_engine(f"postgresql://{DB_INFO['DB_USER']}:{DB_INFO['DB_PASSWORD']}@{DB_INFO['DB_HOST']}/{DB_INFO['DB_DATABASE']}")

    # Create an SQL statement to create the 'dim_order' table with necessary columns and data types
    create_table_query = f"""
CREATE TABLE IF NOT EXISTS {DB_INFO['DB_SCHEMA']}.dim_order (
    order_id VARCHAR PRIMARY KEY,
    order_date DATE,
    ship_date DATE,
    customer_id VARCHAR,
    address_id VARCHAR,
    product_id VARCHAR,
    product_number INTEGER,
    revenue FLOAT,
    cost FLOAT,
    discount FLOAT,
    profit FLOAT,
    FOREIGN KEY (customer_id) REFERENCES {DB_INFO['DB_SCHEMA']}.dim_customer(customer_id),
    FOREIGN KEY (address_id) REFERENCES {DB_INFO['DB_SCHEMA']}.dim_address(address_id),
    FOREIGN KEY (product_id) REFERENCES {DB_INFO['DB_SCHEMA']}.dim_product(product_id)
)
    """
    # Execute the create table query
    with engine.begin() as transaction:
        transaction.execute(create_table_query)

    # Insert data from the DataFrame into the 'dim_order' table, with 'order_id' as the primary key
    for index, row in adjusted_df.iterrows():
        insert_query = f"""
        INSERT INTO {DB_INFO['DB_SCHEMA']}.dim_order (order_id, order_date, ship_date, customer_id, address_id, product_id, product_number, revenue, cost, discount, profit)
        VALUES ('{row['order_id']}', '{row['order_date']}', '{row['ship_date']}', '{row['customer_id']}', '{row['address_id']}', '{row['product_id']}', '{row['product_number']}', '{row['revenue']}', '{row['cost']}', '{row['discount']}', '{row['profit']}')
        ON CONFLICT (order_id) DO NOTHING
        """
        # Execute the insert query
        with engine.begin() as transaction:
            transaction.execute(insert_query)

In [7]:
from datetime import datetime, timedelta

def list_dates(start, end):
    start_date = datetime.strptime(start, "%Y-%m-%d")
    end_date = datetime.strptime(end, "%Y-%m-%d")
    delta = timedelta(days=1)
    current_date = start_date
    dates = []
    while current_date <= end_date:
        date_str = current_date.strftime("%Y-%m-%d")
        dates.append(f'log_{date_str}.parquet')
        current_date += delta
    return dates

file_path_list = list_dates('2024-04-01', '2024-04-23')

In [8]:
for file_path in file_path_list:            
    # Read the parquet file and convert it to a DataFrame
    df = read_parquet_to_df(file_path)
    # Insert data from the DataFrame into the 'dim_product' table
    insert_data_to_product_table(df, DB_INFO)
    # Insert data from the DataFrame into the 'dim_customer' table
    insert_data_to_customer_table(df, DB_INFO)
    # Insert data from the DataFrame into the 'dim_address' table
    insert_data_to_address_table(df, DB_INFO)
    # Insert data from the DataFrame into the 'dim_order' table
    insert_data_to_order_table(df, DB_INFO)
    
    print(f'Ingested data from {file_path.split("/")[-1]} to DB successfully')

  transaction.execute(create_table_query)


Ingested data from log_2024-04-01.parquet to DB successfully
Ingested data from log_2024-04-02.parquet to DB successfully
Ingested data from log_2024-04-03.parquet to DB successfully
Ingested data from log_2024-04-04.parquet to DB successfully
Ingested data from log_2024-04-05.parquet to DB successfully
Ingested data from log_2024-04-06.parquet to DB successfully
Ingested data from log_2024-04-07.parquet to DB successfully
Ingested data from log_2024-04-08.parquet to DB successfully
Ingested data from log_2024-04-09.parquet to DB successfully
Ingested data from log_2024-04-10.parquet to DB successfully
Ingested data from log_2024-04-11.parquet to DB successfully
Ingested data from log_2024-04-12.parquet to DB successfully
Ingested data from log_2024-04-13.parquet to DB successfully
Ingested data from log_2024-04-14.parquet to DB successfully
Ingested data from log_2024-04-15.parquet to DB successfully
Ingested data from log_2024-04-16.parquet to DB successfully
Ingested data from log_2