In [77]:
! pip install pyspark



In [78]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Fecom_Inc_Analysis") \
    .getOrCreate()

### Câu 1. Hãy đọc dữ liệu từ các file csv, sử dụng tự suy ra kiểu dữ liệu cho mỗi cột. 

#### Load dữ liệu

In [79]:
customer_list_df = spark.read.format("csv").options(header="True", delimiter=";", inferSchema="True").csv("Customer_List.csv")
order_items_df = spark.read.format("csv").options(header="True", delimiter=";", inferSchema="True").csv("Order_Items.csv")
order_reviews_df = spark.read.format("csv").options(header="True", delimiter=";", inferSchema="True").csv("Order_Reviews.csv")
orders_df = spark.read.format("csv").options(header="True", delimiter=";", inferSchema="True").csv("Orders.csv")
products_df = spark.read.format("csv").options(header="True", delimiter=";", inferSchema="True").csv("Products.csv")


                                                                                

#### In lược đồ

In [80]:
customer_list_df.printSchema()
order_items_df.printSchema()
order_reviews_df.printSchema()
orders_df.printSchema()
products_df.printSchema()

root
 |-- Customer_Trx_ID: string (nullable = true)
 |-- Subscriber_ID: string (nullable = true)
 |-- Subscribe_Date: date (nullable = true)
 |-- First_Order_Date: date (nullable = true)
 |-- Customer_Postal_Code: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Customer_Country_Code: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)

root
 |-- Order_ID: string (nullable = true)
 |-- Order_Item_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Seller_ID: string (nullable = true)
 |-- Shipping_Limit_Date: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Freight_Value: double (nullable = true)

root
 |-- Review_ID: string (nullable = true)
 |-- Order_ID: string (nullable = true)
 |-- Review_Score: string (nullable = true)
 |-- Review_Comment_Title_En: string (nullable = true)
 |-- Review_Comment_Message_En: string (null

#### In dữ liệu

In [81]:
order_reviews_df.show(5)

+--------------------+--------------------+------------+-----------------------+-------------------------+--------------------+-----------------------+
|           Review_ID|            Order_ID|Review_Score|Review_Comment_Title_En|Review_Comment_Message_En|Review_Creation_Date|Review_Answer_Timestamp|
+--------------------+--------------------+------------+-----------------------+-------------------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|                   NULL|                     NULL|    2024-01-18 00:00|    2024-01-18 21:46:00|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|                   NULL|                     NULL|    2024-03-10 00:00|    2024-03-11 03:05:00|
|228ce5500dc1d8e02...|f9e4b658b201a9f2e...|           5|                   NULL|                     NULL|    2024-02-17 00:00|    2024-02-18 14:36:00|
|e64fb393e7b32834b...|658677c97b385a9be...|           5|                   NULL|     I r

### Câu 2. Thống kê tổng số đơn hàng, số lượng khách hàng và người bán.

In [82]:
from pyspark.sql.functions import count

In [83]:
total_customers = customer_list_df.select("Customer_Trx_ID").distinct().count()
print(f"Số lượng khách hàng là: {total_customers}")

Số lượng khách hàng là: 99442


In [84]:
total_orders = order_items_df.count()
print(f"Số lượng đơn hàng là {total_orders}")

Số lượng đơn hàng là 112650


In [85]:
total_sellers = order_items_df.select("Seller_ID").distinct().count()
print(f"Số lượng người bán: {total_sellers}")

Số lượng người bán: 3095


### Câu 3. Phân tích số lượng đơn hàng theo quốc gia, sắp xếp theo thứ tự giảm dần.

In [86]:
from pyspark.sql.functions import count, col

In [87]:
result_df = orders_df.join(
        customer_list_df,
        orders_df.Customer_Trx_ID == customer_list_df.Customer_Trx_ID,
        "inner"
    ).groupBy(
        "Customer_Country"
    ).agg(
        count("Order_ID").alias("Total_Orders")
    ).orderBy(
        "Total_Orders", ascending=False
    ).select(
        col("Customer_Country").alias("Country"),
        col("Total_Orders")
    )


In [88]:
result_df.show(10)

+--------------+------------+
|       Country|Total_Orders|
+--------------+------------+
|       Germany|       41754|
|        France|       12848|
|   Netherlands|       11629|
|       Belgium|        5464|
|       Austria|        5043|
|   Switzerland|        3640|
|United Kingdom|        3382|
|        Poland|        2139|
|       Czechia|        2034|
|         Italy|        2025|
+--------------+------------+
only showing top 10 rows



### Câu 4. Phân tích số lượng đơn hàng nhóm theo năm, tháng đặt hàng (Hiển thị theo năm tăng dần, tháng giảm dần)

In [89]:
from pyspark.sql.functions import year, month, count


In [90]:
orders_with_ym_df = orders_df.withColumn("Order_Year", year("Order_Purchase_Timestamp")) \
                             .withColumn("Order_Month", month("Order_Purchase_Timestamp"))

orders_by_year_month_df = orders_with_ym_df.groupBy(
    "Order_Year", "Order_Month"
    ).agg(
        count("Order_ID").alias("Total_Orders")
    ).orderBy(
        "Order_Year",
        "order_Month",
        ascending=[True, False]
    )

In [91]:
orders_by_year_month_df.show(10)

+----------+-----------+------------+
|Order_Year|Order_Month|Total_Orders|
+----------+-----------+------------+
|      2022|         12|           1|
|      2022|         10|         324|
|      2022|          9|           4|
|      2023|         12|        5673|
|      2023|         11|        7544|
|      2023|         10|        4631|
|      2023|          9|        4285|
|      2023|          8|        4331|
|      2023|          7|        4026|
|      2023|          6|        3245|
+----------+-----------+------------+
only showing top 10 rows



### Câu 5. Thống kê điểm đánh giá trung bình, số lượng đánh giá theo từng mức (ví dụ: 1 đến 5). 

In [92]:
from pyspark.sql.functions import avg, when, sum, round

In [93]:
order_reviews_levels = order_reviews_df.withColumn("Level_1", order_reviews_df.Review_Score == 1) \
                                        .withColumn("Level_2", order_reviews_df.Review_Score == 2) \
                                        .withColumn("Level_3", order_reviews_df.Review_Score == 3) \
                                        .withColumn("Level_4", order_reviews_df.Review_Score == 4) \
                                        .withColumn("Level_5", order_reviews_df.Review_Score == 5)

In [94]:
order_reviews_levels.show(10)

+--------------------+--------------------+------------+-----------------------+-------------------------+--------------------+-----------------------+-------+-------+-------+-------+-------+
|           Review_ID|            Order_ID|Review_Score|Review_Comment_Title_En|Review_Comment_Message_En|Review_Creation_Date|Review_Answer_Timestamp|Level_1|Level_2|Level_3|Level_4|Level_5|
+--------------------+--------------------+------------+-----------------------+-------------------------+--------------------+-----------------------+-------+-------+-------+-------+-------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|                   NULL|                     NULL|    2024-01-18 00:00|    2024-01-18 21:46:00|  false|  false|  false|   true|  false|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|                   NULL|                     NULL|    2024-03-10 00:00|    2024-03-11 03:05:00|  false|  false|  false|  false|   true|
|228ce5500dc1d8e02...|f9e4b658b201a9f2e.

In [95]:
order_average_levels = order_reviews_levels.agg(
    avg("Review_Score").alias("Average Review Score"),
    sum(when(order_reviews_levels.Level_1 == True, 1).otherwise(0)).alias("Level_1"),
    sum(when(order_reviews_levels.Level_2 == True, 1).otherwise(0)).alias("Level_2"),
    sum(when(order_reviews_levels.Level_3 == True, 1).otherwise(0)).alias("Level_3"),
    sum(when(order_reviews_levels.Level_4 == True, 1).otherwise(0)).alias("Level_4"),
    sum(when(order_reviews_levels.Level_5 == True, 1).otherwise(0)).alias("Level_5"),
)

In [96]:
order_average_levels.show(5)

+--------------------+-------+-------+-------+-------+-------+
|Average Review Score|Level_1|Level_2|Level_3|Level_4|Level_5|
+--------------------+-------+-------+-------+-------+-------+
|  4.0864214950162765|  11424|   3151|   8179|  19141|  57328|
+--------------------+-------+-------+-------+-------+-------+



### Câu 6. Tính doanh thu (giá sản phẩm + phí vận chuyển) trong năm 2024 và nhóm theo danh mục sản phẩm 

In [97]:
order_items_df.show(1)
orders_df.show(1)
products_df.show(1)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            Order_ID|Order_Item_ID|          Product_ID|           Seller_ID|Shipping_Limit_Date|Price|Freight_Value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2023-09-19 09:45:00| 58.9|        13.29|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
only showing top 1 row

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            Order_ID|     Customer_Trx_ID|Order_Status|Order_Purchase_Timestamp|  Order_Approved_At|Order_Delivered_Carrier_Date|Order_Delivered_Customer_Date|Order_Estimated_Delivery_Date|

In [98]:
order_items_with_order = order_items_df.join(
    orders_df.select("Order_ID", "Order_Purchase_Timestamp"),
    on="Order_ID",
    how="inner"
).join(
    products_df.select("Product_ID", "Product_Category_Name"),
    on="Product_ID",
    how="inner"
)

In [99]:
order_items_revenue_2024_df = order_items_with_order \
    .withColumn("Revenue", col("Price") + col("Freight_Value")) \
    .filter(year(col("Order_Purchase_Timestamp")) == 2024)


In [100]:
order_items_revenue_2024_df.show(5)

+--------------------+--------------------+-------------+--------------------+-------------------+------+-------------+------------------------+---------------------+-------+
|          Product_ID|            Order_ID|Order_Item_ID|           Seller_ID|Shipping_Limit_Date| Price|Freight_Value|Order_Purchase_Timestamp|Product_Category_Name|Revenue|
+--------------------+--------------------+-------------+--------------------+-------------------+------+-------------+------------------------+---------------------+-------+
|c777355d18b72b67a...|000229ec398224ef6...|            1|5b51032eddd242adc...|2024-01-18 14:48:00| 199.0|        17.87|     2024-01-14 14:33:00|      Furniture_Decor| 216.87|
|7634da152a4610f15...|00024acbcdf0a6daa...|            1|9d7a1d34a50524090...|2024-08-15 10:10:00| 12.99|        12.79|     2024-08-08 10:00:00|            Perfumery|  25.78|
|557d850972a7d6f79...|000576fe39319847c...|            1|5996cddab893a4652...|2024-07-10 12:30:00| 810.0|        70.75|     2

In [101]:
order_revenue_2024_df = order_items_revenue_2024_df.groupBy(
    "Product_Category_Name"
).agg(
    round(sum("Revenue"), 2).alias("Revenue")
).orderBy(
    "Revenue",
    ascending=False
)

In [102]:
order_revenue_2024_df.show(5)

+---------------------+---------+
|Product_Category_Name|  Revenue|
+---------------------+---------+
|        Health_Beauty|885191.12|
|        Watches_Gifts|771986.75|
|       Bed_Bath_Table| 650794.7|
|       Sports_Leisure|621999.34|
| Computers_Accesso...|594771.04|
+---------------------+---------+
only showing top 5 rows



### Câu 7. Xác định sản phẩm có số lượng bán ra cao nhất và tính điểm đánh giá trung bình cho từng sản phẩm

In [103]:
order_items_df.show(1)
order_reviews_df.show(1)
products_df.show(1)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            Order_ID|Order_Item_ID|          Product_ID|           Seller_ID|Shipping_Limit_Date|Price|Freight_Value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2023-09-19 09:45:00| 58.9|        13.29|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
only showing top 1 row

+--------------------+--------------------+------------+-----------------------+-------------------------+--------------------+-----------------------+
|           Review_ID|            Order_ID|Review_Score|Review_Comment_Title_En|Review_Comment_Message_En|Review_Creation_Date|Review_Answer_Timestamp|
+--------------------+--------------------+------------+---------------------

In [104]:
products_with_review = order_items_df.join(
    order_reviews_df.select("Order_ID", "Review_Score"),
    on="Order_ID",
    how="inner"
).join(
    products_df.select("Product_ID", "Product_Category_Name"),
    on="Product_ID",
    how="inner"
)

In [105]:
products_with_review.show(5)

+--------------------+--------------------+-------------+--------------------+-------------------+-----+-------------+------------+---------------------+
|          Product_ID|            Order_ID|Order_Item_ID|           Seller_ID|Shipping_Limit_Date|Price|Freight_Value|Review_Score|Product_Category_Name|
+--------------------+--------------------+-------------+--------------------+-------------------+-----+-------------+------------+---------------------+
|4244733e06e7ecb49...|00010242fe8c5a6d1...|            1|48436dade18ac8b2b...|2023-09-19 09:45:00| 58.9|        13.29|           5|           Cool_Stuff|
|e5f2d52b802189ee6...|00018f77f2f0320c5...|            1|dd7ddc04e1b6c2c61...|2023-05-03 11:05:00|239.9|        19.93|           4|             Pet_Shop|
|c777355d18b72b67a...|000229ec398224ef6...|            1|5b51032eddd242adc...|2024-01-18 14:48:00|199.0|        17.87|           5|      Furniture_Decor|
|7634da152a4610f15...|00024acbcdf0a6daa...|            1|9d7a1d34a50524090..

In [106]:
product_highest_review = products_with_review.groupBy(
    "Product_Category_Name"
).agg(
    count("Order_ID").alias("Order_Count"),
    avg("Review_Score").alias("Review Score") 
).orderBy(
    "Order_Count",
    ascending=False
).limit(1)

In [107]:
product_highest_review.show()

+---------------------+-----------+------------------+
|Product_Category_Name|Order_Count|      Review Score|
+---------------------+-----------+------------------+
|       Bed_Bath_Table|      11137|3.8956631049654304|
+---------------------+-----------+------------------+



### Câu 8. Tính toán hiệu số giữa ngày giao hàng thực tế (Order_Delivered_Carrier_Date) và ngày giao hàng dự kiến (ví dụ: Shipping_Limit_Date từ bảng Order_Items) để đánh giá hiệu suất giao hàng.

In [108]:
from pyspark.sql.functions import date_diff

In [109]:
order_items_df.show(1)
orders_df.show(1)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            Order_ID|Order_Item_ID|          Product_ID|           Seller_ID|Shipping_Limit_Date|Price|Freight_Value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2023-09-19 09:45:00| 58.9|        13.29|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
only showing top 1 row

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            Order_ID|     Customer_Trx_ID|Order_Status|Order_Purchase_Timestamp|  Order_Approved_At|Order_Delivered_Carrier_Date|Order_Delivered_Customer_Date|Order_Estimated_Delivery_Date|

In [110]:
order_items_with_delivered = order_items_df.join(
    orders_df.select("Order_ID", "Order_Delivered_Carrier_Date"),
    on="Order_ID",
    how="inner"
)

In [111]:
Shipping_Date_Diff = order_items_with_delivered.withColumn(
    "Shipping_Diff", date_diff(start="Shipping_Limit_Date", end="Order_Delivered_Carrier_Date")
).select(
    "Order_ID",
    "Shipping_Diff"
)

In [112]:
Shipping_Date_Diff.show()

+--------------------+-------------+
|            Order_ID|Shipping_Diff|
+--------------------+-------------+
|00010242fe8c5a6d1...|            0|
|00018f77f2f0320c5...|            1|
|000229ec398224ef6...|           -2|
|00024acbcdf0a6daa...|           -5|
|00042b26cf59d7ce6...|            3|
|00048cc3ae777c65d...|           -6|
|00054e8431b9d7675...|           -2|
|000576fe39319847c...|           -5|
|0005a1a1728c9d785...|            2|
|0005f50442cb953dc...|           -3|
|00061f2a7bc09da83...|           -2|
|00063b381e2406b52...|           -1|
|0006ec9db01a64e59...|           -1|
|0008288aa423d2a3f...|           -1|
|0008288aa423d2a3f...|           -1|
|0009792311464db53...|           -2|
|0009c9a17f916a706...|           -5|
|000aed2e25dbad2f9...|            0|
|000c3e6612759851c...|           -6|
|000e562887b1f2006...|           -2|
+--------------------+-------------+
only showing top 20 rows



- Mốc là ngày giao hàng dự tính là `Shipping_Limit_Date`, nếu:
    - giao hàng trước ngày đó thì giá trị `Shipping_Diff` < 0
    - giao hàng đúng ngày dự tính thì giá trị `Shipping_Diff` = 0
    - giao hàng sau thì giá trị `Shipping_Diff` > 0
- Đánh giá hiệu suất:
    - Nếu chênh lệch < 0 ngày: `Good`
    - Nếu chênh lệch = 0 ngày: `On time`
    - Nếu chênh lệch > 0 ngày: `Late`

In [113]:
Shipping_Date_Diff

DataFrame[Order_ID: string, Shipping_Diff: int]

### Câu 9. Nhóm khách hàng dựa trên số lượng đơn hàng, giá trị trung bình của đơn hàng và tần suất mua sắm. 

In [None]:
from pyspark.sql.functions import col, countDistinct, sum, avg, round, min, max, datediff, when, lit, concat_ws

# (1) Tính "giá trị đơn hàng" ở mức Order_ID: Revenue = sum(Price + Freight_Value)
order_value_df = order_items_df \
    .withColumn("Item_Revenue", col("Price") + col("Freight_Value")) \
    .groupBy("Order_ID") \
    .agg(
        round(sum("Item_Revenue"), 2).alias("Order_Value")
    )

# (2) Gắn Customer_Trx_ID + timestamp vào từng Order, rồi join với order_value_df
orders_enriched_df = orders_df.select(
    "Order_ID", "Customer_Trx_ID", "Order_Purchase_Timestamp"
).join(
    order_value_df,
    on="Order_ID",
    how="inner"
)

# (3) Gom theo Customer_Trx_ID để lấy 3 chỉ số cần thiết
customer_metrics_df = orders_enriched_df.groupBy("Customer_Trx_ID").agg(
    countDistinct("Order_ID").alias("Total_Orders"),
    round(avg("Order_Value"), 2).alias("Avg_Order_Value"),
    min("Order_Purchase_Timestamp").alias("First_Purchase"),
    max("Order_Purchase_Timestamp").alias("Last_Purchase")
).withColumn(
    # số ngày hoạt động mua sắm (để tính tần suất)
    "Active_Days", datediff(col("Last_Purchase"), col("First_Purchase")) + lit(1)
).withColumn(
    # tần suất mua sắm: số đơn / số ngày hoạt động
    "Purchase_Frequency", round(col("Total_Orders") / col("Active_Days"), 6)
)

# (4) Chia nhóm (bucket) theo 3 thước đo
# - Số đơn: Low / Mid / High
# - Avg order value: Low / Mid / High (theo tertile)
# - Frequency: Low / Mid / High (theo tertile)

# Lấy ngưỡng theo phân vị để “tự khớp” dữ liệu (không đoán ngưỡng cố định)
q_value_33, q_value_66 = customer_metrics_df.approxQuantile("Avg_Order_Value", [0.33, 0.66], 0.01)
q_freq_33, q_freq_66   = customer_metrics_df.approxQuantile("Purchase_Frequency", [0.33, 0.66], 0.01)

customer_segment_df = customer_metrics_df \
    .withColumn(
        "Orders_Segment",
        when(col("Total_Orders") == 1, "Low")
        .when(col("Total_Orders").between(2, 3), "Mid")
        .otherwise("High")
    ) \
    .withColumn(
        "Value_Segment",
        when(col("Avg_Order_Value") <= lit(q_value_33), "Low")
        .when(col("Avg_Order_Value") <= lit(q_value_66), "Mid")
        .otherwise("High")
    ) \
    .withColumn(
        "Frequency_Segment",
        when(col("Purchase_Frequency") <= lit(q_freq_33), "Low")
        .when(col("Purchase_Frequency") <= lit(q_freq_66), "Mid")
        .otherwise("High")
    ) \
    .withColumn(
        # gộp thành 1 nhãn cụm để thống kê dễ
        "Customer_Group",
        concat_ws("-", col("Orders_Segment"), col("Value_Segment"), col("Frequency_Segment"))
    )

# (5) Xem kết quả chi tiết
customer_segment_df.select(
    "Customer_Trx_ID",
    "Total_Orders",
    "Avg_Order_Value",
    "Purchase_Frequency",
    "Orders_Segment",
    "Value_Segment",
    "Frequency_Segment",
    "Customer_Group"
).orderBy(col("Total_Orders").desc(), col("Avg_Order_Value").desc()).show(20, truncate=False)

# (6) Thống kê số khách theo từng nhóm
customer_segment_df.groupBy("Customer_Group").count().orderBy(col("count").desc()).show(50, truncate=False)


+--------------------------------+------------+---------------+------------------+--------------+-------------+-----------------+--------------+
|Customer_Trx_ID                 |Total_Orders|Avg_Order_Value|Purchase_Frequency|Orders_Segment|Value_Segment|Frequency_Segment|Customer_Group|
+--------------------------------+------------+---------------+------------------+--------------+-------------+-----------------+--------------+
|1617b1357756262bfa56ab541c47bc16|1           |13664.08       |1.0               |Low           |High         |Low              |Low-High-Low  |
|ec5b2ba62e574342386871631fafd3fc|1           |7274.88        |1.0               |Low           |High         |Low              |Low-High-Low  |
|c6e2731c5b391845f6800c97401a43a9|1           |6929.31        |1.0               |Low           |High         |Low              |Low-High-Low  |
|f48d464a0baaea338cb25f816991ab1f|1           |6922.21        |1.0               |Low           |High         |Low              |L

### Câu 10. Xếp hạng các seller dựa trên tổng doanh thu và số lượng đơn hàng bán được.

In [115]:
from pyspark.sql.functions import col, round, sum as _sum, countDistinct
# 1) add Revenue per item
order_items_rev = order_items_df.withColumn("Revenue", col("Price") + col("Freight_Value"))

# 2) aggregate per seller: total revenue and number of distinct orders sold
seller_metrics = order_items_rev.groupBy("Seller_ID").agg(",
Revenue"),2).alias("Total_Revenue"),",
Order_ID").alias("Order_Count")",
,
,
3
,
Total_Revenue").desc(), col("Order_Count").desc())

# 4) show top 20 sellers
seller_ranking.show(20)

# 5) optional: save results to CSV
seller_ranking.coalesce(1).write.mode("overwrite").option("header","true").csv("seller_ranking")

SyntaxError: unterminated string literal (detected at line 6) (2625702579.py, line 6)