In [441]:
import pandas as pd
import numpy as np
import sqlalchemy as sc
from datetime import datetime
import datetime as dt
from sqlalchemy import text

In [442]:
# Connect với file excel chứa bảng facts
def read_excel_to_df(file_path, sheet_name):
    """
    Đọc dữ liệu từ file Excel và trả về một hoặc nhiều DataFrame.
    """
    try:
        df = pd.read_excel(file_path, sheet_name=sheet_name)
        return df
    except Exception as e:
        print(f"Error: {e}")
        return None

In [443]:
# Trích xuất cột date dựa trên ngày bắt đầu nhận order và ngày giao hàng 
def create_date_range_df(df, date_cols, date_col_name = "date"):
    """
    Tạo một DataFrame chứa dãy ngày liên tục từ ngày nhỏ nhất đến ngày lớn nhất
    từ các cột ngày được chỉ định trong DataFrame.

    Parameters:
        - df: DataFrame gốc có chứa các cột ngày
        - date_cols: Danh sách tên các cột ngày cần dùng
        - date_col_name: Tên cột ngày trong DataFrame mới (default = "date")

    Return:
        - df_date: DataFrame mới với 1 cột ngày liên tục
    """
    try:
        # Kiểm tra cột có tồn tại không
        for col in date_cols:
            if col not in df.columns:
                raise ValueError(f"Cột '{col}' không tồn tại trong DataFrame")

        # Ép kiểu datetime an toàn
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors='coerce')

        # Lấy ngày nhỏ nhất & lớn nhất từ toàn bộ các cột ngày
        min_date = df[date_cols].min().min()
        max_date = df[date_cols].max().max()

        # Tạo dãy ngày
        all_dates = pd.date_range(start=min_date, end=max_date, freq="D")

        # Trả về DataFrame
        df_date = pd.DataFrame({date_col_name: all_dates})

        return df_date

    except Exception as e:
        print(f"[ERROR] Failed to create date range DataFrame: {e}")
        return pd.DataFrame()

date_cols = ["order_placement_date", "agreed_delivery_date", "actual_delivery_date"]
date = create_date_range_df(fact_order_lines, date_cols)
date.max()



date   2024-05-06
dtype: datetime64[ns]

In [444]:
# Thực hiện insert bảng date
def update_dim_date(df_date, connection_engine):
    """
    Cập nhật bảng dim_date: chỉ chèn những dòng mới chưa có trong database.
    
    Tham số:
        df_date: DataFrame chứa cột 'date'
        db_url: Đường dẫn kết nối tới database (PostgreSQL, MySQL,...)
    """
    df_date['date'] = pd.to_datetime(df_date['date'], errors='coerce')

    # Bước 2: loại bỏ dòng lỗi (NaT)
    df_date = df_date.dropna(subset=['date'])
    df_dim = df_date.copy()
    # Tạo bảng chuẩn dim_date
    df_dim['date_key'] = df_dim['date'].dt.strftime('%Y%m%d').astype(int)
    df_dim['mmm_yy'] = df_dim['date'].dt.strftime('%b %y')
    df_dim['week_no'] = df_dim['date'].dt.strftime('W%U %Y')
    df_dim['day_of_week'] = df_dim['date'].dt.weekday + 1
    df_dim['weekday_name'] = df_dim['date'].dt.day_name()
    df_dim['quarter'] = df_dim['date'].dt.quarter
    df_dim['year'] = df_dim['date'].dt.year.astype(int)

    # Lấy danh sách ngày đã có
    existing_dates = pd.read_sql("SELECT date FROM dim_date", connection_engine)
    existing_dates_list = pd.to_datetime(existing_dates['date']).dt.date.tolist()

     # Lọc các ngày chưa có
    df_new = df_dim[~df_dim['date'].dt.date.isin(existing_dates_list)]

    #Xếp lại cho đúng thứ tự 
    expected_columns = [
    'date_key',
    'date',
    'mmm_yy',
    'week_no',
    'day_of_week',
    'weekday_name',
    'quarter',
    'year'
]
    df_new = df_new[expected_columns]

    # Thêm dòng mới nếu có
    if df_new.empty:
        print("Không có ngày mới để cập nhật.")
        return pd.DataFrame()
    else:
        return df_new


In [445]:
def get_fact_orders_aggregate_preview(connection_engine):
    query = """
        SELECT DISTINCT
            order_id,
            customer_id,
            order_placement_date,
            on_time,
            in_full,
            CASE 
                WHEN on_time = 1 AND in_full = 1 THEN 1
                ELSE 0
            END AS otif
        FROM (
            SELECT  
                order_id,
                customer_id,
                order_placement_date,
                MIN(on_time) OVER(PARTITION BY order_id) AS on_time,
                MIN(in_full) OVER(PARTITION BY order_id) AS in_full
            FROM fact_order_lines
        ) AS fact
        WHERE NOT EXISTS (
            SELECT 1 FROM fact_orders_aggregate agg
            WHERE agg.order_id = fact.order_id
        )
    """
    df = pd.read_sql(query, connection_engine)
    return df


In [446]:
# Chuyển đổi và thực hiện biến đổi trên bảng fact chính, insert vào database
def process_fact_table(df_raw):
    """
    Xử lý bảng fact bao gồm:
    - Chuyển các cột ngày sang định dạng int kiểu YYYYMMDD
    - Tạo cột order_unique_id từ order_id + product_id
    - Sắp xếp lại các cột theo thứ tự mong muốn
    - Đưa vào cơ sở dữ liệu bằng to_sql (chế độ append)
    """

    df = df_raw.copy()

    # 1. Tự động chuyển các cột ngày sang int dạng YYYYMMDD
    for col in df.columns:
        if pd.api.types.is_datetime64_any_dtype(df[col]) or \
           (df[col].dtype == object and df[col].astype(str).str.match(r"\d{4}-\d{2}-\d{2}").all()):
            try:
                df[col] = pd.to_datetime(df[col])
                df[col] = df[col].dt.strftime("%Y%m%d").astype(int)
            except Exception as e:
                print(f"[WARNING] Không thể chuyển cột '{col}' sang YYYYMMDD: {e}")

    # 2. Tạo order_unique_id
    try:
        df["order_unique_id"] = df["order_id"].astype(str) + "-" + df["product_id"].astype(str)
    except Exception as e:
        print(f"[ERROR] Không thể tạo cột order_unique_id: {e}")
        return

    # 3. Xếp lại thứ tự các cột (nếu có đủ cột)
    desired_order = [
        "order_unique_id",
        "order_id",
        "order_placement_date",
        "customer_id",
        "product_id",
        "order_qty",
        "agreed_delivery_date",
        "actual_delivery_date",
        "delivery_qty",
        "in_full",
        "on_time",
        "on_time_in_full"
    ]
    
    missing_cols = [col for col in desired_order if col not in df.columns]
    if missing_cols:
        print(f"[WARNING] Không thể sắp xếp cột vì thiếu: {missing_cols}")
    else:
        df = df[desired_order]

    return df

In [447]:
# Thực hiện insert fact table
def create_query_insert_into(dataframe, name_table_update):
    """
    Parameters.

    ----------
    dataframe : TYPE
        DESCRIPTION.
    name_table_update : TYPE
        DESCRIPTION.

    Returns
    -------
    TYPE
        DESCRIPTION.

    """
    columns = ""
    values = ""
    odku = ""

    end_col = dataframe.columns[-1]
    for col in dataframe.columns:
        if col == end_col:
            columns += col
            values += "%s"
            odku += col + "=" + "VALUES(" + col + ")"
        else:
            columns += col + ", "
            values += "%s, "
            odku += col + "=" + "VALUES(" + col + "), "

    return (
        "INSERT INTO "
        + name_table_update
        + " ("
        + columns
        + ") "
        + "VALUES("
        + values
        + ") "
        + "ON DUPLICATE KEY UPDATE "
        + odku
    )


In [448]:
# Thực hiện insert data
def insert_new_records(df_new, table_name, unique_col, engine):
    """
    Chỉ chèn vào những bản ghi mới chưa tồn tại trong bảng (dựa vào khóa duy nhất).
    
    Parameters:
    -----------
    df_new : pd.DataFrame
        Dữ liệu mới cần chèn vào bảng.
    table_name : str
        Tên bảng trong cơ sở dữ liệu.
    unique_col : str
        Tên cột duy nhất dùng để kiểm tra trùng (ví dụ: order_unique_id).
    engine : sqlalchemy.engine.base.Engine
        Kết nối SQLAlchemy tới cơ sở dữ liệu.
    
    Returns:
    --------
    int
        Số bản ghi mới đã được chèn.
    """
    try:
        # Đọc danh sách các ID đã có trong bảng
        existing_ids = pd.read_sql(f"SELECT {unique_col} FROM {table_name}", engine)
        existing_ids_set = set(existing_ids[unique_col])

        # Lọc các bản ghi mới chưa tồn tại
        df_filtered = df_new[~df_new[unique_col].isin(existing_ids_set)]

        # Nếu có bản ghi mới thì insert
        if not df_filtered.empty:
            df_filtered.to_sql(table_name, engine, if_exists="append", index=False)
            print(f"[INFO] Inserted {len(df_filtered)} new rows into '{table_name}'.")
            return len(df_filtered)
        else:
            print("[INFO] No new records to insert.")
            return 0

    except Exception as e:
        print(f"[ERROR] Failed to insert new records: {e}")
        return 0

In [450]:
def main():

    # Connect vào sever SQL 
    connection_engine = sc.create_engine("mysql+pymysql://root:123456789@localhost:3306/sp")

    # Connect bảng fact
    file_path_connect = "/Users/phamvantung/Downloads/supply chain project/summary_fact_tab.xlsx"
    sheet_name = "fact_order_lines"
    df = read_excel_to_df(file_path_connect, sheet_name)

    # Xử lý bảng data fact
    df_new = process_fact_table(df)
    df_copy = df.copy()

    # insert bảng fact vào database
    insert_new_records(df_new,"fact_order_lines", "order_unique_id", connection_engine )

    # Thực hiện insert bảng dim_date
    date_cols = ["order_placement_date", "agreed_delivery_date", "actual_delivery_date"]
    df_date = create_date_range_df(df_copy, date_cols)
    df_transformed_date = update_dim_date(df_date, connection_engine)
    if not df_transformed_date.empty:
        df_transformed_date.to_sql("dim_date", connection_engine, if_exists="append", index=False)
        print(f"[INFO] Inserted {len(df_transformed_date)} new rows into 'dim_date'.")
    else:
        print("[INFO] No new data to insert into 'dim_date'.")

    # Thực hiện insert bảng fact_orders_aggregate
    df_update_get_fact_orders_aggregate = get_fact_orders_aggregate_preview(connection_engine)
    df_update_get_fact_orders_aggregate.to_sql("fact_orders_aggregate", connection_engine, if_exists= "append", index = False)

    

In [451]:
main()

[INFO] No new records to insert.
Không có ngày mới để cập nhật.
[INFO] No new data to insert into 'dim_date'.
