In [23]:
import findspark
findspark.init()

In [24]:
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkContext

In [25]:
c = SparkContext.getOrCreate()
spark = SparkSession.builder \
    .appName("Finding strategies for dataframe transformation") \
    .master("local[*]") \
    .getOrCreate()
    


In [26]:
product_df = spark.read.csv("/app/airflow/ELT-prj/files/product_cleaned.csv", header=True, inferSchema=True)

In [27]:
client_df = spark.read.csv("/app/airflow/ELT-prj/files/clients.csv", header=True, inferSchema=True)

                                                                                

In [28]:
purchases_df = spark.read.csv("/app/airflow/ELT-prj/files/purchases_demo.csv", header=True, inferSchema=True)

In [29]:
store_df = spark.read.csv("/app/airflow/ELT-prj/files/store.csv", header=True, inferSchema=True)

In [30]:
product_df.printSchema()
client_df.printSchema()
purchases_df.printSchema()
store_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- level_1: string (nullable = true)
 |-- level_2: string (nullable = true)
 |-- level_3: string (nullable = true)
 |-- level_4: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)

root
 |-- client_id: string (nullable = true)
 |-- first_issue_date: timestamp (nullable = true)
 |-- first_redeem_date: timestamp (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)

root
 |-- client_id: double (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- transaction_datetime: date (nullable = true)
 |-- regular_points_received: timestamp (nullable = true)
 |-- express_points_received: integer (nullable = true)
 |-- regular_points_spent: integer (nullable = true)
 |-- express_points_spent: integer (nullable = true)
 |-- purchase_sum: integer (nullable = true)
 |-- store_id: integer (nullable =

In [31]:
store_df.select("store_location").distinct().show(150) 

+--------------------+
|      store_location|
+--------------------+
|    Ward 1, Vung Tau|
|   Gia Rai, Bac Lieu|
|Quang Ninh, Quang...|
|   Quy Chau, Nghe An|
|District 3, Ho Ch...|
| Vinh Hoa, Nha Trang|
|Bac Lieu City, Ba...|
|    Tu Son, Bac Ninh|
|    Ward 3, Vung Tau|
|  Hoang Mai, Nghe An|
|  Thanh Liem, Ha Nam|
|  Dong Ha, Quang Tri|
|     Dam Doi, Ca Mau|
|Phuoc Long, Bac Lieu|
|Duong Kinh, Hai P...|
|  Tran De, Soc Trang|
| Gia Vien, Ninh Binh|
|Soc Trang City, S...|
|Song Cong, Thai N...|
| Vinh Hai, Nha Trang|
| Ha Tien, Kien Giang|
|Thanh Hoa City, T...|
| Vinh Yen, Vinh Phuc|
|      O Mon, Can Tho|
|Nam Dinh City, Na...|
|District 4, Ho Ch...|
|Ngu Hanh Son, Da ...|
| Giao Thuy, Nam Dinh|
|  Tan Yen, Bac Giang|
|  Kim Son, Ninh Binh|
|  Tan Chau, An Giang|
|Ha Tinh City, Ha ...|
|    Duy Tien, Ha Nam|
|    My Loc, Nam Dinh|
|Lang Giang, Bac G...|
|Dong Trieu, Quang...|
|    Son Tra, Da Nang|
| Ca Mau City, Ca Mau|
|District 2, Ho Ch...|
| Thang Tam, Vung Tau|
|   Thot No

In [32]:
region_map = {
    "northwest": ["Lai Chau", "Dien Bien", "Son La", "Hoa Binh", "Lao Cai", "Yen Bai"],
    "northeast": ["Ha Giang", "Cao Bang", "Bac Kan", "Tuyen Quang", "Lang Son", "Thai Nguyen",
                  "Phu Tho", "Bac Giang", "Quang Ninh"],
    "red_river_delta": ["Hanoi", "Bac Ninh", "Ha Nam", "Hung Yen", "Hai Duong", "Hai Phong",
                        "Nam Dinh", "Ninh Binh", "Thai Binh", "Vinh Phuc"],
    "north_central": ["Thanh Hoa", "Nghe An", "Ha Tinh", "Quang Binh", "Quang Tri", "Thua Thien Hue"],
    "south_central": ["Da Nang", "Quang Nam", "Quang Ngai", "Binh Dinh", "Phu Yen", "Khanh Hoa",
                      "Ninh Thuan", "Binh Thuan"],
    "central_highlands": ["Kon Tum", "Gia Lai", "Dak Lak", "Dak Nong", "Lam Dong"],
    "southeast": ["TP Ho Chi Minh", "Binh Duong", "Dong Nai", "Tay Ninh", "Ba Ria Vung Tau", "Binh Phuoc"],
    "southwest": ["Long An", "Tien Giang", "Ben Tre", "Tra Vinh", "Vinh Long", "Dong Thap",
                  "An Giang", "Can Tho", "Hau Giang", "Kien Giang", "Soc Trang", "Bac Lieu", "Ca Mau"]
}


In [33]:
!pip install unidecode



In [34]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from unidecode import unidecode

In [35]:
def get_region_from_location(location):
    if location is None:
        return None
    try:
        # Lấy tỉnh/thành từ phần sau dấu phẩy
        province_raw = location.split(",")[-1].strip()
        # Chuyển về không dấu, viết hoa chữ cái đầu mỗi từ
        province = ' '.join(word.capitalize() for word in unidecode(province_raw).split())
        
        for region, provinces in region_map.items():
            if province in provinces:
                return region
        return "unknown"
    except:
        return "unknown"

get_region_udf = udf(get_region_from_location, StringType())

In [36]:
store_df = store_df.withColumn("region", get_region_udf(store_df["store_location"]))

In [37]:
store_df.show(25)

+----------+-----------------+--------------------+---------------+
|  store_id|       store_name|      store_location|         region|
+----------+-----------------+--------------------+---------------+
|291cedf475|    Binh Duong 26| Ben Cat, Binh Duong|      southeast|
|1aae841dbb|       Da Nang 06|Ngu Hanh Son, Da ...|  south_central|
|e10533694b|      Bac Lieu 69|Phuoc Long, Bac Lieu|      southwest|
|6ce43e3ee3|       Can Tho 68| Phong Dien, Can Tho|      southwest|
|d57f269b64|         Hanoi 67| Hai Ba Trung, Hanoi|red_river_delta|
|ae3fd8139a|     Vinh Phuc 91|Lap Thach, Vinh Phuc|red_river_delta|
|9522388621|      Nam Dinh 81|     Y Yen, Nam Dinh|red_river_delta|
|4a27070917|      Bac Lieu 17|   Gia Rai, Bac Lieu|      southwest|
|7c6c4bcc5b|      Nam Dinh 21|   Hai Hau, Nam Dinh|red_river_delta|
|ca0c3907f1|     Bac Giang 28|Bac Giang City, B...|      northeast|
|d0a0d857c5|     Quang Tri 35|  Dong Ha, Quang Tri|  north_central|
|88712df1a5|        Ca Mau 45|     Nam Can, Ca M

In [38]:
random_store_format = ["Flagship store", "Department store", "Supermarket", "Convenience store"]

In [39]:
import random

In [40]:
def assign_random_store_format():
    return random.choice(random_store_format)

assign_store_format_udf = udf(assign_random_store_format, StringType())


In [41]:
store_df = store_df.withColumn("store_format", assign_store_format_udf())

In [42]:
store_df.show(25)

+----------+-----------------+--------------------+---------------+-----------------+
|  store_id|       store_name|      store_location|         region|     store_format|
+----------+-----------------+--------------------+---------------+-----------------+
|291cedf475|    Binh Duong 26| Ben Cat, Binh Duong|      southeast| Department store|
|1aae841dbb|       Da Nang 06|Ngu Hanh Son, Da ...|  south_central|Convenience store|
|e10533694b|      Bac Lieu 69|Phuoc Long, Bac Lieu|      southwest|Convenience store|
|6ce43e3ee3|       Can Tho 68| Phong Dien, Can Tho|      southwest|      Supermarket|
|d57f269b64|         Hanoi 67| Hai Ba Trung, Hanoi|red_river_delta| Department store|
|ae3fd8139a|     Vinh Phuc 91|Lap Thach, Vinh Phuc|red_river_delta|Convenience store|
|9522388621|      Nam Dinh 81|     Y Yen, Nam Dinh|red_river_delta|      Supermarket|
|4a27070917|      Bac Lieu 17|   Gia Rai, Bac Lieu|      southwest|Convenience store|
|7c6c4bcc5b|      Nam Dinh 21|   Hai Hau, Nam Dinh|red

In [43]:
spark.catalog.clearCache()


In [44]:
store_df.write.csv("/app/airflow/ELT-prj/files/store_updated.csv", header=True, mode="overwrite")


In [46]:
store_lake_df = store_df.select("store_id", "store_name", "store_location", "region", "store_format")
product_lake_df = product_df
client_lake_df = client_df

In [48]:
from pyspark.sql.functions import col, expr, sequence, to_date, year, month, dayofmonth, dayofweek, weekofyear, dayofyear, quarter, date_format
from pyspark.sql.types import DateType

In [49]:
# Khoảng thời gian muốn tạo
start_date = "2018-11-22"
end_date = "2019-03-19"

# Tạo dãy ngày liên tiếp
df_date = spark.sql(f"SELECT sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day) as date_seq") \
    .selectExpr("explode(date_seq) as full_date")

# Tạo các cột dim_time
dim_time = df_date.withColumn("date_key", expr("CAST(date_format(full_date, 'yyyyMMdd') AS INT)")) \
    .withColumn("day_of_week", date_format(col("full_date"), "EEEE")) \
    .withColumn("day_number_of_week", dayofweek("full_date")) \
    .withColumn("day_of_month", dayofmonth("full_date")) \
    .withColumn("day_of_year", dayofyear("full_date")) \
    .withColumn("week_of_year", weekofyear("full_date")) \
    .withColumn("month", month("full_date")) \
    .withColumn("month_name", date_format("full_date", "MMMM")) \
    .withColumn("quarter", quarter("full_date")) \
    .withColumn("year", year("full_date")) \
    .withColumn("is_weekend", expr("CASE WHEN dayofweek(full_date) IN (1, 7) THEN 1 ELSE 0 END")) \
    .withColumn("is_holiday", expr("0"))  # Bạn có thể cập nhật logic holiday riêng nếu có

# Xem thử
dim_time.show(5)

+----------+--------+-----------+------------------+------------+-----------+------------+-----+----------+-------+----+----------+----------+
| full_date|date_key|day_of_week|day_number_of_week|day_of_month|day_of_year|week_of_year|month|month_name|quarter|year|is_weekend|is_holiday|
+----------+--------+-----------+------------------+------------+-----------+------------+-----+----------+-------+----+----------+----------+
|2018-11-22|20181122|   Thursday|                 5|          22|        326|          47|   11|  November|      4|2018|         0|         0|
|2018-11-23|20181123|     Friday|                 6|          23|        327|          47|   11|  November|      4|2018|         0|         0|
|2018-11-24|20181124|   Saturday|                 7|          24|        328|          47|   11|  November|      4|2018|         1|         0|
|2018-11-25|20181125|     Sunday|                 1|          25|        329|          47|   11|  November|      4|2018|         1|         0|

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `unit_price` cannot be resolved. Did you mean one of the following? [`client_id`, `store_id`, `product_id`, `purchase_sum`, `transaction_id`].;
'Project [client_id#281, transaction_id#282, transaction_datetime#283, regular_points_received#284, express_points_received#285, regular_points_spent#286, express_points_spent#287, (('unit_price * product_quantity#291) - 'discount_amount) AS purchase_sum#581, store_id#289, product_id#290, product_quantity#291, trn_sum_from_iss#292, trn_sum_from_red#293]
+- Relation [client_id#281,transaction_id#282,transaction_datetime#283,regular_points_received#284,express_points_received#285,regular_points_spent#286,express_points_spent#287,purchase_sum#288,store_id#289,product_id#290,product_quantity#291,trn_sum_from_iss#292,trn_sum_from_red#293] csv


In [52]:
transactions = purchases_df \
    .withColumn("client_id", col("client_id").cast("string")) \
    .withColumn("store_id", col("store_id").cast("string")) \
    .withColumn("product_quantity", col("product_quantity").cast("int")) \
    .withColumn("transaction_datetime", to_date("transaction_datetime")) \
    .withColumn("date_key", expr("CAST(date_format(transaction_datetime, 'yyyyMMdd') AS INT)"))

# Chuẩn hóa bảng sản phẩm để có unit_price
products = product_df.withColumnRenamed("product_price", "unit_price")

# Join với products để lấy đơn giá
fact_sales = transactions.join(products.select("product_id", "unit_price"), on="product_id", how="left")

# Tính discount_amount = 5% đơn giá * số lượng
fact_sales = fact_sales.withColumn("discount_amount", expr("unit_price * product_quantity * 0.05"))

# Tính lại purchase_sum nếu chưa chuẩn
fact_sales = fact_sales.withColumn("purchase_sum", expr("unit_price * product_quantity - discount_amount"))

# Gán cố định payment_method nếu không có
fact_sales = fact_sales.withColumn("payment_method", expr("'POS'"))

# Chọn đúng thứ tự các trường theo mô hình fact_sales
fact_sales_final = fact_sales.select(
    "transaction_id",
    "product_id",
    "client_id",
    "store_id",
    "date_key",
    "transaction_datetime",
    "product_quantity",
    "unit_price",
    "discount_amount",
    "purchase_sum",
    "regular_points_received",
    "payment_method"
)

# Ghi xuống định dạng Delta Lake
fact_sales_final.printSchema()
fact_sales_final.show(10)



root
 |-- transaction_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- client_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- date_key: integer (nullable = true)
 |-- transaction_datetime: date (nullable = true)
 |-- product_quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- purchase_sum: double (nullable = true)
 |-- regular_points_received: timestamp (nullable = true)
 |-- payment_method: string (nullable = false)

+--------------+----------+---------+--------+--------+--------------------+----------------+----------+---------------+------------+-----------------------+--------------+
|transaction_id|product_id|client_id|store_id|date_key|transaction_datetime|product_quantity|unit_price|discount_amount|purchase_sum|regular_points_received|payment_method|
+--------------+----------+---------+--------+--------+--------------------+----------------+--------

root
 |-- product_id: string (nullable = true)
 |-- level_1: string (nullable = true)
 |-- level_2: string (nullable = true)
 |-- level_3: string (nullable = true)
 |-- level_4: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- first_issue_date: timestamp (nullable = true)
