#Bước 1: Mount Azure Data Lake Storage vào Databricks

## Bronze Layer

In [0]:

dbutils.fs.mount(
    source="wasbs://bronze@hoangtien22133061.blob.core.windows.net",
    mount_point="/mnt/lakehouse/bronze",
    extra_configs={
        "fs.azure.account.key.hoangtien22133061.blob.core.windows.net": storage_account_key
    },
)

## Silver Layer

In [0]:

dbutils.fs.mount(
    source="wasbs://silver@hoangtien22133061.blob.core.windows.net",
    mount_point="/mnt/lakehouse/silver",
    extra_configs={
        "fs.azure.account.key.hoangtien22133061.blob.core.windows.net": storage_account_key
    },
)

## Gold Layer

In [0]:

dbutils.fs.mount(
    source="wasbs://gold@hoangtien22133061.blob.core.windows.net",
    mount_point="/mnt/lakehouse/gold",
    extra_configs={
        "fs.azure.account.key.hoangtien22133061.blob.core.windows.net": storage_account_key
    },
)

#Bước 2: Đọc dữ liệu từ Bronze (raw layer)

In [0]:
display(dbutils.fs.ls("/mnt/lakehouse/bronze"))

In [0]:
bronze_path = "dbfs:/mnt/lakehouse/bronze/"
# Đọc ví dụ cho từng bảng
df_customers = spark.read.option("header", "true").csv(
    f"{bronze_path}customers_dataset.csv"
)
df_orders = spark.read.option("header", "true").csv(f"{bronze_path}orders_dataset.csv")
df_order_items = spark.read.option("header", "true").csv(
    f"{bronze_path}order_items_dataset.csv"
)
df_products = spark.read.option("header", "true").csv(
    f"{bronze_path}products_dataset.csv"
)
df_sellers = spark.read.option("header", "true").csv(
    f"{bronze_path}sellers_dataset.csv"
)
df_payments = spark.read.option("header", "true").csv(
    f"{bronze_path}order_payments_dataset.csv"
)
df_reviews = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .option("quote", '"')
    .option("escape", '"')
    .option("multiLine", True)
    .csv(f"{bronze_path}order_reviews_dataset.csv")
)
df_geolocation = spark.read.option("header", "true").csv(
    f"{bronze_path}geolocation_dataset.csv"
)
df_cat_trans = spark.read.option("header", "true").csv(
    f"{bronze_path}product_category_name_translation.csv"
)
df_date =  spark.read.option("header", "true").csv(
    f"{bronze_path}date.csv"
)
df_date = df_date.drop("_c24")




In [0]:
display(df_date.limit(2))

# Bước 3: Làm sạch & xử lý dữ liệu

In [0]:
from pyspark.sql.functions import col, when, count

bronze_path = "dbfs:/mnt/lakehouse/bronze/"
silver_path = "dbfs:/mnt/lakehouse/silver/"


# Helper function to cast numeric columns
def cast_numeric(df, numeric_cols):
    for col_name in numeric_cols:
        df = df.withColumn(col_name, col(col_name).cast("double"))
    return df


def cast_numeric_int(df, numeric_cols):
    for col_name in numeric_cols:
        df = df.withColumn(col_name, col(col_name).cast("int"))
    return df


def check_na_and_duplicates(df, name):
    print(f"\n==== {name.upper()} ====")
    print("Null values per column:")
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
    total = df.count()
    distinct = df.dropDuplicates().count()
    print(f"Total rows: {total}, Duplicated rows: {total - distinct}")


# 1. Customers
check_na_and_duplicates(df_customers, "customers")
customers = df_customers.dropna(
    subset=["customer_id", "customer_unique_id"]
).dropDuplicates(["customer_id"])

# 2. Orders
check_na_and_duplicates(df_orders, "orders")
orders = df_orders.dropna(
    subset=["order_id", "customer_id", "order_status"]
).dropDuplicates(["order_id"])

# 3. Order Items
check_na_and_duplicates(df_order_items, "order_items")
order_items = cast_numeric(df_order_items, ["price", "freight_value"])
order_items = order_items.dropna(subset=["order_id", "order_item_id", "product_id"])

# 4. Products
check_na_and_duplicates(df_products, "products")
products = cast_numeric_int(
    df_products,
    [
        "product_name_lenght",
        "product_description_lenght",
        "product_photos_qty",
        "product_weight_g",
        "product_length_cm",
        "product_height_cm",
        "product_width_cm",
    ],
)
products = products.dropna(subset=["product_id"])

# 5. Sellers
check_na_and_duplicates(df_sellers, "sellers")
sellers = df_sellers.dropna(subset=["seller_id"])

# 6. Order Payments
check_na_and_duplicates(df_payments, "order_payments")
order_payments = cast_numeric(df_payments, ["payment_value"])
order_payments = cast_numeric_int(
    order_payments, ["payment_sequential", "payment_installments"]
)
order_payments = order_payments.dropna(subset=["order_id"])

# 7. Order Reviews
check_na_and_duplicates(df_reviews, "order_reviews")
order_reviews = cast_numeric_int(df_reviews, ["review_score"])
order_reviews = order_reviews.dropna(subset=["order_id"])

# 8. Geolocation
check_na_and_duplicates(df_geolocation, "geolocation")
geolocation = cast_numeric(df_geolocation, ["geolocation_lat", "geolocation_lng"])
geolocation = geolocation.dropna(subset=["geolocation_zip_code_prefix"])

# 9. Product Category Name Translation
check_na_and_duplicates(df_cat_trans, "product_cat")
product_cat = df_cat_trans.dropna(subset=["product_category_name"])
# #10 . Date
# from pyspark.sql.functions import to_timestamp, to_date
# # Định dạng: M/d/yyyy  h:mm:ss a (2 dấu cách giữa yyyy và giờ)
# # date_format = "M/d/yyyy"
# # date = df_date.withColumn("fulldate", to_date(to_timestamp("fulldate", date_format)))
# # date = df_date.withColumn("weekbegindate", to_date(to_timestamp("weekbegindate", date_format)))
# # date = df_date.withColumn("samedayyearago", to_date(to_timestamp("samedayyearago", date_format)))
# display(df_date.limit(2)) 
# print(df_date.schema["samedayyearago"].dataType)
# print(df_date.schema["fulldate"].dataType)




# Bước 4: Ghi dữ liệu ra Delta Table (Silver Layer)

## 1. Lưu dữ liệu đã xử lý vào dạng Delta Table trong Silver Layer

In [0]:
# 1. Customers
customers.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/customers"
)

# 2. Orders
orders.write.format("delta").mode("overwrite").save("dbfs:/mnt/lakehouse/silver/orders")

# 3. Order Items
order_items.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/order_items"
)

# 4. Products
products.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/products"
)

# 5. Sellers
sellers.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/sellers"
)

# 6. Order Payments
order_payments.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/order_payments"
)

# 7. Order Reviews
order_reviews.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/order_reviews"
)

# 8. Geolocation
geolocation.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/geolocation"
)

# 9. Product Category Name Translation
product_cat.write.format("delta").mode("overwrite").save(
    "dbfs:/mnt/lakehouse/silver/product_cat"
)

# 10. Date
df_date.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(
    "dbfs:/mnt/lakehouse/silver/date"
)



## 2. Đưa Delta Table vào metastore để dễ truy vấn dữ liệu trong Silver Layer

### Tạo Database 

In [0]:
%sql
USE CATALOG hive_metastore;

CREATE DATABASE IF NOT EXISTS olistdb LOCATION 'dbfs:/mnt/lakehouse/silver/olistdb';

In [0]:
%sql
DESCRIBE DATABASE EXTENDED olistdb;

### 3. Tạo các bảng trong olistdb

In [0]:
%sql
USE olistdb;

-- 1. Customers
CREATE TABLE IF NOT EXISTS silver_customers (
  customer_id STRING,
  customer_unique_id STRING,
  customer_zip_code_prefix INT,
  customer_city STRING,
  customer_state STRING
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 2. Orders
CREATE TABLE IF NOT EXISTS silver_orders (
  order_id STRING,
  customer_id STRING,
  order_status STRING,
  order_purchase_timestamp TIMESTAMP,
  order_approved_at TIMESTAMP,
  order_delivered_carrier_date TIMESTAMP,
  order_delivered_customer_date TIMESTAMP,
  order_estimated_delivery_date TIMESTAMP
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 3. Order Items
CREATE TABLE IF NOT EXISTS silver_order_items (
  order_id STRING,
  order_item_id INT,
  product_id STRING,
  seller_id STRING,
  shipping_limit_date TIMESTAMP,
  price DOUBLE,
  freight_value DOUBLE
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 4. Products
CREATE TABLE IF NOT EXISTS silver_products (
  product_id STRING,
  product_category_name STRING,
  product_name_lenght INT,
  product_description_lenght INT,
  product_photos_qty INT,
  product_weight_g INT,
  product_length_cm INT,
  product_height_cm INT,
  product_width_cm INT
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 5. Sellers
CREATE TABLE IF NOT EXISTS silver_sellers (
  seller_id STRING, seller_zip_code_prefix STRING, seller_city STRING, seller_state STRING
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 6. Order Payments
CREATE TABLE IF NOT EXISTS silver_order_payments (
  order_id STRING,
  payment_sequential INT,
  payment_type STRING,
  payment_installments INT,
  payment_value DOUBLE
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 7. Order Reviews
CREATE TABLE IF NOT EXISTS silver_order_reviews (
  review_id STRING,
  order_id STRING,
  review_score INT,
  review_comment_title STRING,
  review_comment_message STRING,
  review_creation_date TIMESTAMP,
  review_answer_timestamp TIMESTAMP
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 8. Geolocation
CREATE TABLE IF NOT EXISTS silver_geolocation (
  geolocation_zip_code_prefix STRING,
  geolocation_lat DOUBLE,
  geolocation_lng DOUBLE,
  geolocation_city STRING,
  geolocation_state STRING
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 9. Product Category Name Translation
CREATE TABLE IF NOT EXISTS silver_product_category_name_translation (
  product_category_name STRING, product_category_name_english STRING
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);

-- 10. Date
CREATE TABLE IF NOT EXISTS silver_date (
    date_key INT NOT NULL,
    full_date DATE,
    day_of_week INT,
    day_num_in_month INT,
    day_num_overall INT,
    day_name STRING,
    day_abbrev STRING,
    weekday_flag STRING,
    week_num_in_year INT,
    week_num_overall INT,
    week_begin_date DATE,
    week_begin_date_key INT,
    month INT,
    month_num_overall INT,
    month_name STRING,
    month_abbrev STRING,
    quarter INT,
    year INT,
    yearmo INT,
    fiscal_month INT,
    fiscal_quarter INT,
    fiscal_year INT,
    last_day_in_month_flag STRING,
    same_day_year_ago_date DATE
)
TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE);


### Load Delta Table và đưa vào các bảng trong metastore

In [0]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.getOrCreate()
# 1. Customers
dt_customers = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver/customers")

# 2. Orders
dt_orders = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver/orders")

# 3. Order Items
dt_order_items = spark.read.format("delta").load(
    "dbfs:/mnt/lakehouse/silver/order_items"
)

# 4. Products
dt_products = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver/products")

# 5. Sellers
dt_sellers = spark.read.format("delta").load("dbfs:/mnt/lakehouse/silver/sellers")

# 6. Order Payments
dt_order_payments = spark.read.format("delta").load(
    "dbfs:/mnt/lakehouse/silver/order_payments"
)

# 7. Order Reviews
dt_order_reviews = spark.read.format("delta").load(
    "dbfs:/mnt/lakehouse/silver/order_reviews"
)

# 8. Geolocation
dt_geolocation = spark.read.format("delta").load(
    "dbfs:/mnt/lakehouse/silver/geolocation"
)

# 9. Product Category Name Translation
dt_product_cat = spark.read.format("delta").load(
    "dbfs:/mnt/lakehouse/silver/product_cat"
)

# 10. Date
dt_date = spark.read.format("delta").load(
    "dbfs:/mnt/lakehouse/silver/date"
)


### Kiểm tra các Delta Table 

In [0]:
display(dt_customers.head(2))
display(dt_orders.head(2))
display(dt_order_items.head(2))
display(dt_products.head(2))
display(dt_sellers.head(2))
display(dt_order_payments.head(2))
display(dt_order_reviews.limit(2))
display(dt_geolocation.head(2))
display(dt_product_cat.head(2))
display(dt_date.limit(2))

### Load Data vào các bảng trong metastore

In [0]:
# 1. Customers
dt_customers.write.mode("overwrite").insertInto("silver_customers")

# 2. Orders
dt_orders.write.mode("overwrite").insertInto("silver_orders")

# 3. Order Items
dt_order_items.write.mode("overwrite").insertInto("silver_order_items")

# 4. Products
dt_products.write.mode("overwrite").insertInto("silver_products")

# 5. Sellers
dt_sellers.write.mode("overwrite").insertInto("silver_sellers")

# 6. Order Payments
dt_order_payments.write.mode("overwrite").insertInto("silver_order_payments")

# 7. Order Reviews
dt_order_reviews.write.mode("overwrite").insertInto("silver_order_reviews")

# 8. Geolocation
dt_geolocation.write.mode("overwrite").insertInto("silver_geolocation")

# 9. Product Category Name Translation
dt_product_cat.write.mode("overwrite").insertInto(
    "silver_product_category_name_translation"
)

# 10. Date
dt_date.write.mode("overwrite").insertInto(
    "silver_date"
)

In [0]:
print(dt_date.columns)


## Bước 5. Kiểm tra các bảng sau khi thêm data

In [0]:
%sql
select
  *
from
  silver_customers

In [0]:
%sql
SELECT
  *
FROM
  silver_orders

In [0]:
%sql
select
  *
from
  silver_geolocation

In [0]:
%sql
select
  *
from
  silver_order_items

In [0]:
%sql
select
  *
from
  silver_order_payments

In [0]:
%sql
select
  *
from
  silver_order_reviews

In [0]:
%sql
select
  *
from
  silver_products

In [0]:
%sql
select
  *
from
  silver_sellers

In [0]:
%sql
select
  *
from
  silver_product_category_name_translation

In [0]:
%sql
select *
from
  silver_date

## Bước 6 : Tạo các bảng Delta Table cho Gold Layer

### 1. Tạo database olistdb_gold

In [0]:
%sql
USE CATALOG hive_metastore;

CREATE DATABASE IF NOT EXISTS olistdb_gold LOCATION 'dbfs:/mnt/lakehouse/gold/olistdb_gold';

In [0]:
%sql
DESCRIBE DATABASE EXTENDED olistdb_gold;

### 2. Tạo các Delta Table 

In [0]:
%sql
USE olistdb_gold;

#### Tạo các bảng dim

##### 1. Dim Customer

In [0]:
%sql
CREATE OR REPLACE TABLE dim_customer (
    customerKey BIGINT GENERATED ALWAYS AS IDENTITY,   -- surrogate key tự động tăng
    customer_id STRING NOT NULL,
    customer_unique_id STRING,
    customer_zip_code_prefix INT,
    customer_city STRING,
    customer_state STRING
)
USING DELTA;

INSERT INTO dim_customer (
    customer_id, customer_unique_id, customer_zip_code_prefix, customer_city, customer_state
)
SELECT DISTINCT
    customer_id,
    customer_unique_id,
    customer_zip_code_prefix,
    customer_city,
    customer_state
FROM
    olistdb.silver_customers;


In [0]:
%sql
SELECT
  *
FROM
  dim_customer

##### Dim Products

In [0]:
%sql
CREATE OR REPLACE TABLE dim_product (
    productKey BIGINT GENERATED ALWAYS AS IDENTITY,    -- surrogate key tự động tăng
    product_id STRING NOT NULL,                        -- ID gốc từ hệ thống Olist
    product_category_name STRING,                      -- Tên danh mục sản phẩm
    product_name_lenght INT,                           -- Độ dài tên sản phẩm
    product_description_lenght INT,                    -- Độ dài mô tả
    product_photos_qty INT,                            -- Số lượng ảnh
    product_weight_g INT,                              -- Trọng lượng (gram)
    product_length_cm INT,                             -- Dài
    product_height_cm INT,                             -- Cao
    product_width_cm INT                               -- Rộng
)
USING DELTA;

INSERT INTO dim_product (
    product_id,
    product_category_name,
    product_name_lenght,
    product_description_lenght,
    product_photos_qty,
    product_weight_g,
    product_length_cm,
    product_height_cm,
    product_width_cm
)
SELECT DISTINCT
    p.product_id,
    p.product_category_name,
    p.product_name_lenght,
    p.product_description_lenght,
    p.product_photos_qty,
    p.product_weight_g,
    p.product_length_cm,
    p.product_height_cm,
    p.product_width_cm
FROM olistdb.silver_products p


    

In [0]:
%sql
SELECT
  *
FROM
  dim_product

##### 3. Dim Seller

In [0]:
%sql
CREATE OR REPLACE TABLE dim_seller (
    sellerKey BIGINT GENERATED ALWAYS AS IDENTITY,    -- surrogate key tự động tăng
    seller_id STRING NOT NULL,                        -- Mã người bán trong hệ thống
    seller_zip_code_prefix INT,                       -- Mã vùng
    seller_city STRING,                               -- Thành phố
    seller_state STRING NOT NULL                      -- Bang
)
USING DELTA;

INSERT INTO dim_seller (
    seller_id,
    seller_zip_code_prefix,
    seller_city,
    seller_state
)
SELECT DISTINCT
    seller_id,
    seller_zip_code_prefix,
    seller_city,
    seller_state
FROM
    olistdb.silver_sellers;


In [0]:
%sql
SELECT
  *
FROM
  dim_seller

##### 4. DimGeolocation 

In [0]:
%sql
CREATE OR REPLACE TABLE dim_geolocation (
    geoKey BIGINT GENERATED ALWAYS AS IDENTITY,    -- surrogate key tự động tăng
    geolocation_zip_code_prefix INT NOT NULL,      -- Mã ZIP
    geolocation_lat DOUBLE,                        -- Vĩ độ (dùng DOUBLE cho chuẩn Databricks)
    geolocation_lng DOUBLE,                        -- Kinh độ
    geolocation_city STRING,                       -- Thành phố
    geolocation_state STRING NOT NULL              -- Bang
)
USING DELTA;

INSERT INTO dim_geolocation (
    geolocation_zip_code_prefix,
    geolocation_lat,
    geolocation_lng,
    geolocation_city,
    geolocation_state
)
SELECT DISTINCT
    geolocation_zip_code_prefix,
    geolocation_lat,
    geolocation_lng,
    geolocation_city,
    geolocation_state
FROM
    olistdb.silver_geolocation;


In [0]:
%sql
SELECT
  *
FROM
  dim_geolocation

##### 5. DimOrders

In [0]:
%sql
CREATE OR REPLACE TABLE dim_order (
    orderKey BIGINT GENERATED ALWAYS AS IDENTITY,    -- surrogate key tự động tăng
    order_id STRING NOT NULL,                        -- Mã đơn hàng
    customer_id STRING NOT NULL,                     -- Khách hàng
    order_status STRING NOT NULL                     -- Trạng thái đơn hàng
)
USING DELTA;

INSERT INTO dim_order (
    order_id,
    customer_id,
    order_status
)
SELECT DISTINCT
    order_id,
    customer_id,
    order_status
FROM
    olistdb.silver_orders;


In [0]:
%sql
select
  *
from
  dim_order

##### 6. DimOrderItem

In [0]:
%sql
CREATE OR REPLACE TABLE dim_order_item (
    orderItemKey BIGINT GENERATED ALWAYS AS IDENTITY,   -- surrogate key tự động tăng
    order_id STRING NOT NULL,                           -- Mã đơn hàng
    order_item_id INT NOT NULL,                         -- STT mặt hàng trong đơn (dùng INT)
    product_id STRING,                                  -- Mã sản phẩm
    seller_id STRING,                                   -- Người bán
    shipping_limit_date TIMESTAMP,                      -- Hạn giao hàng
    price DOUBLE,                                       -- Giá sản phẩm
    freight_value DOUBLE                                -- Phí vận chuyển
)
USING DELTA;

INSERT INTO dim_order_item (
    order_id,
    order_item_id,
    product_id,
    seller_id,
    shipping_limit_date,
    price,
    freight_value
)
SELECT DISTINCT
    order_id,
    order_item_id,
    product_id,
    seller_id,
    shipping_limit_date,
    price,
    freight_value
FROM
    olistdb.silver_order_items;


In [0]:
%sql
select *
from
  dim_order_item

##### 7. DimOrderPayment

In [0]:
%sql
CREATE OR REPLACE TABLE dim_order_payment (
    orderPaymentKey BIGINT GENERATED ALWAYS AS IDENTITY,   -- surrogate key tự động tăng
    order_id STRING NOT NULL,                              -- Mã đơn hàng
    payment_sequential INT NOT NULL,                       -- STT lần thanh toán (dùng INT thay cho TINYINT)
    payment_type STRING,                                   -- Loại thanh toán
    payment_installments INT,                              -- Số kỳ trả góp
    payment_value DOUBLE NOT NULL                          -- Giá trị thanh toán
)
USING DELTA;

INSERT INTO dim_order_payment (
    order_id,
    payment_sequential,
    payment_type,
    payment_installments,
    payment_value
)
SELECT DISTINCT
    order_id,
    payment_sequential,
    payment_type,
    payment_installments,
    payment_value
FROM
    olistdb.silver_order_payments;


In [0]:
%sql
select *
from
  dim_order_payment

##### 8. DimOrderReview

In [0]:
%sql
CREATE OR REPLACE TABLE dim_order_review (
    reviewKey BIGINT GENERATED ALWAYS AS IDENTITY,      -- surrogate key tự động tăng
    review_id STRING NOT NULL,                          -- Mã đánh giá
    order_id STRING NOT NULL,                           -- Mã đơn hàng
    review_score INT,                                   -- Điểm đánh giá (dùng INT thay cho TINYINT)
    review_comment_title STRING,                        -- Tiêu đề đánh giá
    review_comment_message STRING                       -- Nội dung đánh giá
)
USING DELTA;

INSERT INTO dim_order_review (
    review_id,
    order_id,
    review_score,
    review_comment_title,
    review_comment_message
)
SELECT DISTINCT
    review_id,
    order_id,
    review_score,
    review_comment_title,
    review_comment_message
FROM
    olistdb.silver_order_reviews;


In [0]:
%sql
select *
from
  dim_order_review

##### 9. DimDate

In [0]:
%sql
CREATE OR REPLACE TABLE dim_date (
    date_key INT NOT NULL,
    full_date DATE,
    day_of_week INT,
    day_num_in_month INT,
    day_num_overall INT,
    day_name STRING,
    day_abbrev STRING,
    weekday_flag STRING,
    week_num_in_year INT,
    week_num_overall INT,
    week_begin_date DATE,
    week_begin_date_key INT,
    month INT,
    month_num_overall INT,
    month_name STRING,
    month_abbrev STRING,
    quarter INT,
    year INT,
    yearmo INT,
    fiscal_month INT,
    fiscal_quarter INT,
    fiscal_year INT,
    last_day_in_month_flag STRING,
    same_day_year_ago_date DATE
)
USING DELTA;

INSERT INTO dim_date
SELECT
    date_key,
    full_date,
    day_of_week,
    day_num_in_month,
    day_num_overall,
    day_name,
    day_abbrev,
    weekday_flag,
    week_num_in_year,
    week_num_overall,
    week_begin_date,
    week_begin_date_key,
    month,
    month_num_overall,
    month_name,
    month_abbrev,
    quarter,
    year,
    yearmo,
    fiscal_month,
    fiscal_quarter,
    fiscal_year,
    last_day_in_month_flag,
    same_day_year_ago_date
FROM olistdb.silver_date


In [0]:
%sql
select * from dim_date

##### Staging FactSales

In [0]:
%sql
USE CATALOG hive_metastore;
use olistdb_gold;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS olistdb.stgFactSales
USING DELTA
AS
SELECT 
    oi.order_id,
    o.customer_id,
    oi.product_id,
    oi.seller_id,
    o.order_purchase_timestamp AS order_date,
    op.payment_sequential,
    oi.order_item_id,
    oi.price,
    oi.freight_value
FROM olistdb.silver_order_items oi
JOIN olistdb.silver_orders o ON oi.order_id = o.order_id
JOIN olistdb.silver_order_payments op ON o.order_id = op.order_id;


In [0]:
%sql
select count(*) from olistdb.stgFactSales

##### Staging FactOrders

In [0]:
%sql
CREATE TABLE IF NOT EXISTS olistdb.stgFactOrders
USING DELTA
AS
SELECT
    so.order_id,
    so.customer_id,
    so.order_purchase_timestamp,
    so.order_approved_at,
    so.order_delivered_customer_date,
    so.order_estimated_delivery_date,
    DATEDIFF(so.order_delivered_customer_date, so.order_purchase_timestamp) AS delivery_time,
    DATEDIFF(so.order_approved_at, so.order_purchase_timestamp) AS order_processing_time,
    DATEDIFF(so.order_delivered_customer_date, so.order_estimated_delivery_date) AS delivery_delay,
    COUNT(soi.order_item_id) AS order_item_count
FROM olistdb.silver_orders so
LEFT JOIN olistdb.silver_order_items soi ON so.order_id = soi.order_id
GROUP BY
    so.order_id,
    so.customer_id,
    so.order_purchase_timestamp,
    so.order_approved_at,
    so.order_delivered_customer_date,
    so.order_estimated_delivery_date;


In [0]:
%sql
select count(*) from olistdb.stgFactOrders

##### Staging FactReviews

In [0]:
%sql
CREATE TABLE IF NOT EXISTS olistdb.stgFactReviews
USING DELTA
AS
SELECT
    sr.review_id,
    sr.order_id,
    so.customer_id,
    soi.seller_id,
    soi.product_id,
    CAST(sr.review_creation_date AS DATE) AS review_date,
    sr.review_score,
    CASE WHEN sr.review_score <= 2 THEN 1 ELSE 0 END AS is_negative_review,
    LENGTH(COALESCE(sr.review_comment_message, '')) AS review_comment_length,
    CASE 
        WHEN sr.review_comment_message IS NOT NULL AND LENGTH(TRIM(sr.review_comment_message)) > 0
        THEN 1 ELSE 0
    END AS has_comment
FROM olistdb.silver_order_reviews sr
JOIN olistdb.silver_orders so ON sr.order_id = so.order_id
JOIN olistdb.silver_order_items soi ON sr.order_id = soi.order_id;


In [0]:
%sql
select count(*) from olistdb.stgFactReviews

#### Tạo các Fact

##### Fact Sales

In [0]:
%sql
CREATE TABLE IF NOT EXISTS olistdb_gold.FactSales (
    salesKey BIGINT GENERATED ALWAYS AS IDENTITY,
    orderKey INT,
    customerKey INT,
    productKey INT,
    sellerKey INT,
    dateKey INT,
    paymentKey INT,
    orderItemKey INT,
    price DECIMAL(10,2),
    totalOrderValue DECIMAL(10,2)
)
USING DELTA
TBLPROPERTIES (
    delta.autooptimize.optimizeWrite = true,
    delta.autooptimize.autoCompact = true
);


In [0]:
%sql
INSERT INTO olistdb_gold.FactSales (
    orderKey,
    customerKey,
    productKey,
    sellerKey,
    dateKey,
    paymentKey,
    orderItemKey,
    price,
    totalOrderValue
)
SELECT
    do.orderKey,
    dc.customerKey,
    dp.productKey,
    ds.sellerKey,
    dd.date_key,
    dop.orderPaymentKey,
    doi.orderItemKey,
    s.price,
    s.price + s.freight_value AS totalOrderValue
FROM olistdb.stgFactSales s
JOIN olistdb_gold.dim_order do ON s.order_id = do.order_id
JOIN olistdb_gold.dim_customer dc ON s.customer_id = dc.customer_id
JOIN olistdb_gold.dim_product dp ON s.product_id = dp.product_id
JOIN olistdb_gold.dim_seller ds ON s.seller_id = ds.seller_id
JOIN olistdb_gold.dim_date dd ON CAST(s.order_date AS DATE) = dd.full_date
JOIN olistdb_gold.dim_order_payment dop ON s.order_id = dop.order_id AND s.payment_sequential = dop.payment_sequential
JOIN olistdb_gold.dim_order_item doi ON s.order_id = doi.order_id AND s.order_item_id = doi.order_item_id;


##### FactOrder

In [0]:
%sql
CREATE TABLE IF NOT EXISTS olistdb_gold.FactOrders (
    factOrderKey BIGINT GENERATED ALWAYS AS IDENTITY,   -- surrogate key cho fact (tuỳ chọn, dùng nếu muốn)
    customerKey INT,
    orderKey INT,
    orderDateKey INT,
    approvedDateKey INT,
    deliveryDateKey INT,
    estimatedDeliveryDateKey INT,
    orderPaymentKey INT,
    delivery_time INT,
    order_processing_time INT,
    delivery_delay INT,
    order_item_count INT
)
USING DELTA
TBLPROPERTIES (
    delta.autooptimize.optimizeWrite = true,
    delta.autooptimize.autoCompact = true
);


In [0]:
%sql
WITH CleanedData AS (
    SELECT
        sfo.order_id,
        sfo.customer_id,
        sfo.order_purchase_timestamp,
        sfo.order_approved_at,
        sfo.order_delivered_customer_date,
        sfo.order_estimated_delivery_date,
        sfo.delivery_time,
        sfo.order_processing_time,
        sfo.delivery_delay,
        sfo.order_item_count,
        CAST(sfo.order_purchase_timestamp AS DATE) AS order_date,
        CAST(sfo.order_approved_at AS DATE) AS approved_date,
        CAST(sfo.order_delivered_customer_date AS DATE) AS delivered_date,
        CAST(sfo.order_estimated_delivery_date AS DATE) AS estimated_date,
        dc.customerKey,
        dof.orderKey,
        dop.orderPaymentKey
    FROM olistdb.stgFactOrders sfo
    JOIN olistdb_gold.dim_order dof ON sfo.order_id = dof.order_id 
    JOIN olistdb_gold.dim_customer dc ON sfo.customer_id = dc.customer_id 
    JOIN olistdb_gold.dim_order_payment dop ON sfo.order_id = dop.order_id
)

INSERT INTO olistdb_gold.FactOrders (
    customerKey,
    orderKey,
    orderDateKey,
    approvedDateKey,
    deliveryDateKey,
    estimatedDeliveryDateKey,
    orderPaymentKey,
    delivery_time,
    order_processing_time,
    delivery_delay,
    order_item_count
)
SELECT
    c.customerKey,
    c.orderKey,
    dd_order.date_key,
    dd_approved.date_key,
    dd_delivered.date_key,
    dd_estimated.date_key,
    c.orderPaymentKey,
    c.delivery_time,
    c.order_processing_time,
    c.delivery_delay,
    c.order_item_count
FROM CleanedData c
JOIN olistdb_gold.dim_date dd_order ON c.order_date = dd_order.full_date
JOIN olistdb_gold.dim_date dd_approved ON c.approved_date = dd_approved.full_date
JOIN olistdb_gold.dim_date dd_delivered ON c.delivered_date = dd_delivered.full_date
JOIN olistdb_gold.dim_date dd_estimated ON c.estimated_date = dd_estimated.full_date;





##### FactReviews

In [0]:
%sql
CREATE TABLE IF NOT EXISTS olistdb_gold.FactReview (
    reviewKey INT,
    customerKey INT,
    sellerKey INT,
    productKey INT,
    dateKey INT,
    orderKey INT,
    review_score INT,
    is_negative_review INT,
    review_comment_length INT,
    has_comment INT
)
USING DELTA
TBLPROPERTIES (
    delta.autooptimize.optimizeWrite = true,
    delta.autooptimize.autoCompact = true
);


In [0]:
%sql
INSERT INTO olistdb_gold.FactReview (
    reviewKey,
    customerKey,
    sellerKey,
    productKey,
    dateKey,
    orderKey,
    review_score,
    is_negative_review,
    review_comment_length,
    has_comment
)
SELECT
    dor.reviewKey,
    dc.customerKey,
    ds.sellerKey,
    dp.productKey,
    dd.date_key,
    dof.orderKey,
    sfr.review_score,
    sfr.is_negative_review,
    sfr.review_comment_length,
    sfr.has_comment
FROM olistdb.stgFactReviews sfr
JOIN olistdb_gold.dim_order_review dor ON sfr.review_id = dor.review_id
JOIN olistdb_gold.dim_order dof ON sfr.order_id = dof.order_id 
JOIN olistdb_gold.dim_customer dc ON dof.customer_id = dc.customer_id 
JOIN olistdb_gold.dim_seller ds ON sfr.seller_id = ds.seller_id 
JOIN olistdb_gold.dim_product dp ON sfr.product_id = dp.product_id
JOIN olistdb_gold.dim_date dd ON sfr.review_date = dd.full_date;


#### Thực hiện lưu dữ liệu xuống lớp Gold

In [0]:
dim_customer_df = spark.table("olistdb_gold.dim_customer")

dim_customer_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_customer")

dim_product_df = spark.table("olistdb_gold.dim_product")

dim_product_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_product")

dim_seller_df = spark.table("olistdb_gold.dim_seller")

dim_seller_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_seller")


dim_geolocation_df = spark.table("olistdb_gold.dim_geolocation")

dim_geolocation_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_geolocation")

dim_order_df = spark.table("olistdb_gold.dim_order")

dim_order_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_order")

dim_order_item_df = spark.table("olistdb_gold.dim_order_item")

dim_order_item_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_order_item")

dim_order_payment_df = spark.table("olistdb_gold.dim_order_payment")

dim_order_payment_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_order_payment")

dim_order_review_df = spark.table("olistdb_gold.dim_order_review")

dim_order_review_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_order_review")

dim_date_df = spark.table("olistdb_gold.dim_date")

dim_date_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/dim_date")

In [0]:
FactSales_df = spark.table("olistdb_gold.FactSales")

FactSales_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/FactSales")

In [0]:
FactOrders_df = spark.table("olistdb_gold.FactOrders")

FactOrders_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/FactOrders")

In [0]:


FactReview_df = spark.table("olistdb_gold.FactReview")

FactReview_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/lakehouse/gold/FactReview")


#### Thực hiện VACUUM

In [0]:
%sql
VACUUM delta. `/mnt/lakehouse/gold/FactReview` RETAIN 336 HOURS

In [0]:
%sql
VACUUM delta. `/mnt/lakehouse/gold/FactOrders` RETAIN 336 HOURS

In [0]:
%sql
VACUUM delta. `/mnt/lakehouse/gold/FactSales` RETAIN 336 HOURS

In [0]:
%sql
DESCRIBE HISTORY delta. `/mnt/lakehouse/gold/FactSales`

In [0]:
%sql
DESCRIBE HISTORY delta. `/mnt/lakehouse/gold/FactOrders`

In [0]:
%sql

DESCRIBE HISTORY delta. `/mnt/lakehouse/gold/FactReview`

##### Lưu thành các file csv

In [0]:
table_names = [
    "olistdb_gold.FactSales",
    "olistdb_gold.FactOrders",
    "olistdb_gold.FactReview",
    "olistdb_gold.dim_date",
    "olistdb_gold.dim_customer",
    "olistdb_gold.dim_geolocation",
    "olistdb_gold.dim_order",
    "olistdb_gold.dim_order_item",
    "olistdb_gold.dim_order_review",
    "olistdb_gold.dim_order_payment",
    "olistdb_gold.dim_product",
    "olistdb_gold.dim_seller"
]

for tbl in table_names:
    df = spark.table(tbl)
    path = f"/mnt/lakehouse/gold/export/{tbl.split('.')[-1]}"
    df.write.option("header", "true").mode("overwrite").csv(path)
