# 0 - Setting

## 0.0 Tải dữ liệu

In [1]:
!gdown 1Lpc9y4VAJtoWNjYIem8TrxB6GyxQclFX

Downloading...
From (original): https://drive.google.com/uc?id=1Lpc9y4VAJtoWNjYIem8TrxB6GyxQclFX
From (redirected): https://drive.google.com/uc?id=1Lpc9y4VAJtoWNjYIem8TrxB6GyxQclFX&confirm=t&uuid=a8cbf442-ca6a-4210-976a-72f6180c03b3
To: /content/retail_transactions_dataset.zip
100% 37.3M/37.3M [00:00<00:00, 93.0MB/s]


In [2]:

!unzip retail_transactions_dataset.zip

Archive:  retail_transactions_dataset.zip
  inflating: Retail_Transactions_Dataset.csv  


## 0.1 Import thư viện

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
from datetime import datetime

## 0.2 Khởi tạo Spark

In [4]:
spark = SparkSession.builder \
    .appName("CustomerTransactionAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark

## 0.3 Load dữ liệu

In [5]:
df = spark.read.csv('Retail_Transactions_Dataset.csv', header=True, inferSchema=True)

In [6]:
print("Schema:")
df.printSchema()

Schema:
root
 |-- Transaction_ID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Total_Items: integer (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Store_Type: string (nullable = true)
 |-- Discount_Applied: boolean (nullable = true)
 |-- Customer_Category: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Promotion: string (nullable = true)



In [7]:
print("\nFirst 5 rows:")
df.show(5)


First 5 rows:
+--------------+-------------------+-----------------+--------------------+-----------+----------+--------------+-------------+----------------+----------------+-----------------+------+--------------------+
|Transaction_ID|               Date|    Customer_Name|             Product|Total_Items|Total_Cost|Payment_Method|         City|      Store_Type|Discount_Applied|Customer_Category|Season|           Promotion|
+--------------+-------------------+-----------------+--------------------+-----------+----------+--------------+-------------+----------------+----------------+-----------------+------+--------------------+
|    1000000000|2022-01-21 06:27:29|     Stacey Price|['Ketchup', 'Shav...|          3|     71.65|Mobile Payment|  Los Angeles|  Warehouse Club|            true|        Homemaker|Winter|                None|
|    1000000001|2023-03-01 13:01:21| Michelle Carlson|['Ice Cream', 'Mi...|          2|     25.93|          Cash|San Francisco| Specialty Store|         

In [8]:
print(f"\nTotal records: {df.count()}")


Total records: 1000000


# 1.0 Các thao tác với Column

## Add

In [9]:
# Code add column (try by your self)
df.withColumn("Avg_Item_Price", col("Total_Cost") / col("Total_Items"))

DataFrame[Transaction_ID: int, Date: timestamp, Customer_Name: string, Product: string, Total_Items: int, Total_Cost: double, Payment_Method: string, City: string, Store_Type: string, Discount_Applied: boolean, Customer_Category: string, Season: string, Promotion: string, Avg_Item_Price: double]

In [10]:
from pyspark.sql.functions import col, sum as F_sum, count as F_count

# Dòng 1: Tính giá trung bình của từng mặt hàng (cú pháp đúng)
df_with_avg_price = df.withColumn("Avg_Item_Price", col("Total_Cost") / col("Total_Items"))

# Dòng 2: Cách tính tổng doanh thu (Total Sales)
# Cách 1: Tạo DataFrame mới với một hàng và một cột duy nhất chứa tổng doanh thu
total_sales_df = df.agg(F_sum("Total_Cost").alias("Total_Sales"))

# Cách 2: Lấy giá trị tổng doanh thu dưới dạng số
total_sales_value = df.select(F_sum("Total_Cost")).collect()[0][0]

# Dòng 3: Cách đếm tổng số giao dịch (Transaction Count)
# Cách 1: Tạo DataFrame mới với một hàng và một cột duy nhất chứa tổng số giao dịch
transaction_count_df = df.agg(F_count("Transaction_ID").alias("Transaction_Count"))

# Cách 2: Lấy giá trị tổng số giao dịch dưới dạng số
transaction_count_value = df.count()

In [11]:
total_sales_df

DataFrame[Total_Sales: double]

In [12]:
total_sales_value

52455220.40000056

In [13]:
transaction_count_df

DataFrame[Transaction_Count: bigint]

In [14]:
sales_by_city = df.groupBy("City").agg(F_sum("Total_Cost").alias("Total_Sales"))
sales_by_city.orderBy(desc("Total_Sales")).show()

+-------------+------------------+
|         City|       Total_Sales|
+-------------+------------------+
|       Dallas| 5277111.530000005|
|       Boston| 5263307.959999996|
|      Chicago| 5263187.449999998|
|     New York| 5252469.919999966|
|      Houston| 5247054.779999969|
|San Francisco|5241099.8599999845|
|        Miami| 5240498.440000016|
|      Seattle| 5235365.430000044|
|  Los Angeles| 5232393.189999955|
|      Atlanta| 5202731.840000003|
+-------------+------------------+



In [15]:
df_analysis = df.groupBy("Product").agg(
    sum("Total_Items").alias("Total_Units_Sold"),       # Tổng số lượng sản phẩm bán ra
    sum("Total_Cost").alias("Total_Revenue"),           # Tổng doanh thu
    avg("Total_Cost").alias("Avg_Product_Price"),       # Giá trung bình mỗi giao dịch
    countDistinct("Customer_Name").alias("Unique_Customers") # Số khách hàng duy nhất
)

df_result = df_analysis.orderBy(desc("Total_Revenue"))

df_result.show()

top_product = df_result.first()
print("Sản phẩm có Total_Revenue cao nhất là:", top_product["Product"])
print("Với tổng doanh thu là:", top_product["Total_Revenue"])

+-------------------+----------------+------------------+------------------+----------------+
|            Product|Total_Units_Sold|     Total_Revenue| Avg_Product_Price|Unique_Customers|
+-------------------+----------------+------------------+------------------+----------------+
|     ['Toothpaste']|           26891|253689.00999999975| 51.84733496832204|            4728|
|          ['Honey']|           13834|135409.84999999998| 53.31096456692912|            2498|
|      ['Deodorant']|           13753|         133082.77|52.374171585989764|            2492|
|          ['Pasta']|           13820| 131765.6899999999|52.854267950260684|            2450|
|           ['Eggs']|           13632| 131565.7099999999|  52.3124095427435|            2469|
|        ['Vinegar']|           13736|131538.38999999998| 52.51033532934131|            2456|
|           ['Soap']|           13685|131183.27999999997| 52.93917675544793|            2437|
|         ['Razors']|           13618|130956.56999999992|53.

In [16]:
window_spec = Window.partitionBy("City").orderBy(desc("Total_Cost"))

df_ranked = df.withColumn("rank_in_city", rank().over(window_spec))
# df_ranked.show()
df_ranked.filter(col("City") == "Boston").show(10)

+--------------+-------------------+---------------+--------------------+-----------+----------+--------------+------+-----------------+----------------+-----------------+------+--------------------+------------+
|Transaction_ID|               Date|  Customer_Name|             Product|Total_Items|Total_Cost|Payment_Method|  City|       Store_Type|Discount_Applied|Customer_Category|Season|           Promotion|rank_in_city|
+--------------+-------------------+---------------+--------------------+-----------+----------+--------------+------+-----------------+----------------+-----------------+------+--------------------+------------+
|    1000219055|2023-01-13 16:43:32|   John Roberts|['Extension Cords...|          7|     100.0|   Credit Card|Boston|      Supermarket|           false|      Middle-Aged|Spring|Discount on Selec...|           1|
|    1000354310|2021-03-08 23:23:58|     Lisa Young|['Chips', 'Hand S...|          5|     100.0|    Debit Card|Boston|Convenience Store|           f

In [17]:
window_spec_customer = Window.partitionBy("Customer_Name").orderBy("Date").rowsBetween(Window.unboundedPreceding, 0)

df_running_total = df.withColumn("Running_Total", sum("Total_Cost").over(window_spec_customer))

In [18]:
# df_running_total.filter((col("Customer_Name") == "Aaron Allen") & (col("Date") == "2020-10-08")).show()
df_running_total_1 = df_running_total.filter(col("Customer_Name") == "Aaron Allen")
df_running_total_2 = df_running_total_1.filter(col("Date") == "2020-10-08").show()

+--------------+----+-------------+-------+-----------+----------+--------------+----+----------+----------------+-----------------+------+---------+-------------+
|Transaction_ID|Date|Customer_Name|Product|Total_Items|Total_Cost|Payment_Method|City|Store_Type|Discount_Applied|Customer_Category|Season|Promotion|Running_Total|
+--------------+----+-------------+-------+-----------+----------+--------------+----+----------+----------------+-----------------+------+---------+-------------+
+--------------+----+-------------+-------+-----------+----------+--------------+----+----------+----------------+-----------------+------+---------+-------------+



## Drop

In [19]:
df_reduced = df.drop("Index")

## Rename

In [20]:
df_renamed = df.withColumnRenamed("Total_Items", "Quantity") \
               .withColumnRenamed("Total_Cost", "Amount")

## User Define Functions

In [21]:
# Custom UDF for customer segmentation
def categorize_customer(total_spent):
    if total_spent > 5000:
        return "VIP"
    elif total_spent > 2000:
        return "Premium"
    elif total_spent > 500:
        return "Regular"
    else:
        return "Occasional"

In [22]:
categorize_udf = udf(categorize_customer, StringType())

In [23]:
df_with_segment = df.withColumn("Spend_Category", categorize_udf(col("Total_Cost")))
df_with_segment.show(5)

+--------------+-------------------+-----------------+--------------------+-----------+----------+--------------+-------------+----------------+----------------+-----------------+------+--------------------+--------------+
|Transaction_ID|               Date|    Customer_Name|             Product|Total_Items|Total_Cost|Payment_Method|         City|      Store_Type|Discount_Applied|Customer_Category|Season|           Promotion|Spend_Category|
+--------------+-------------------+-----------------+--------------------+-----------+----------+--------------+-------------+----------------+----------------+-----------------+------+--------------------+--------------+
|    1000000000|2022-01-21 06:27:29|     Stacey Price|['Ketchup', 'Shav...|          3|     71.65|Mobile Payment|  Los Angeles|  Warehouse Club|            true|        Homemaker|Winter|                None|    Occasional|
|    1000000001|2023-03-01 13:01:21| Michelle Carlson|['Ice Cream', 'Mi...|          2|     25.93|          

# 2.0 Các thao tác với DataFrame

## Select

In [24]:
selected_df = df.select("Transaction_ID", "Date", "Customer_Name", "Product", "Total_Cost")

## Filter

In [25]:
high_value_transactions = df.filter(col("Total_Cost") > 1000)
credit_card_transactions = df.filter(col("Payment_Method") == "Credit Card")

## Transform

In [26]:
# Transform data - create new calculated columns
# Your code here

# 3.0 Aggregations

## Group By

In [27]:
# Basic aggregations
# Your code here

## Statistical Calculations

In [28]:
# Product performance analysis
# Your code here

# 4.0 Window Functions for Advanced Analytics

In [29]:
# Customer spending ranking by city
# Your code here

In [30]:
# Running total by customer
# Your code here

# 5.0 RFM Analysis (Recency, Frequency, Monetary)

In [31]:
current_date = datetime.now()

In [34]:
# RFM Base Calculation
from pyspark.sql.functions import max as spark_max, datediff, lit, countDistinct, current_date, sum

rfm_df = df.groupBy("Customer_Name").agg(
    spark_max("Date").alias("last_purchase_date"),
    countDistinct("Transaction_ID").alias("Frequency"),
    sum("Total_Cost").alias("Monetary")
)

rfm_df = rfm_df.withColumn("Recency", datediff(current_date(), col("last_purchase_date")))

rfm_df = rfm_df.drop("last_purchase_date")

rfm_df.filter(col("Customer_Name") == "Christopher Jimenez").show()

+-------------------+---------+--------+-------+
|      Customer_Name|Frequency|Monetary|Recency|
+-------------------+---------+--------+-------+
|Christopher Jimenez|       16|  774.73|    656|
+-------------------+---------+--------+-------+



In [41]:
# 1. Tính R_Score
rfm_df = rfm_df.withColumn(
    "R_Score",
    when(col("Recency") <= 30, lit(5))
    .when(col("Recency") <= 60, lit(4))
    .when(col("Recency") <= 90, lit(3))
    .when(col("Recency") <= 180, lit(2))
    .otherwise(lit(1))
)

# 2. Tính F_Score
rfm_df = rfm_df.withColumn(
    "F_Score",
    when(col("Frequency") >= 20, lit(5))
    .when(col("Frequency") >= 10, lit(4))
    .when(col("Frequency") >= 5, lit(3))
    .when(col("Frequency") >= 2, lit(2))
    .otherwise(lit(1))
)

# 3. Tính M_Score
rfm_df = rfm_df.withColumn(
    "M_Score",
    when(col("Monetary") >= 5000, lit(5))
    .when(col("Monetary") >= 2000, lit(4))
    .when(col("Monetary") >= 1000, lit(3))
    .when(col("Monetary") >= 500, lit(2))
    .otherwise(lit(1))
)

# 4. Tính RFM_Score cuối cùng
rfm_score = rfm_df.withColumn(
    "RFM_Score",
    col("R_Score") * 100 + col("F_Score") * 10 + col("M_Score")
)

# Hiển thị kết quả cho khách hàng Christopher Jimenez sau khi đã có điểm số
rfm_score.filter(col("Customer_Name") == "Thomas Oliver").show()

# Hiển thị tất cả kết quả để xem xét
rfm_score.show()

+-------------+---------+--------+-------+-------+-------+-------+---------+
|Customer_Name|Frequency|Monetary|Recency|R_Score|F_Score|M_Score|RFM_Score|
+-------------+---------+--------+-------+-------+-------+-------+---------+
|Thomas Oliver|        7|   421.1|    664|      1|      3|      1|      131|
+-------------+---------+--------+-------+-------+-------+-------+---------+

+--------------------+---------+------------------+-------+-------+-------+-------+---------+
|       Customer_Name|Frequency|          Monetary|Recency|R_Score|F_Score|M_Score|RFM_Score|
+--------------------+---------+------------------+-------+-------+-------+-------+---------+
|         Martha Cole|        3|              83.6|    957|      1|      2|      1|      121|
|      Courtney Burns|        3|            105.12|   1143|      1|      2|      1|      121|
|           John Dean|       16|            852.88|    515|      1|      4|      2|      142|
|Margaret Fitzpatrick|        3|210.33999999999997

In [37]:
customer_segments = rfm_df.withColumn(
    "Segment",
    when((col("R_Score") >= 4) & (col("F_Score") >= 4) & (col("M_Score") >= 4), lit("Champions"))
    .when((col("R_Score") >= 3) & (col("F_Score") >= 3), lit("Loyal Customers"))
    .when(col("R_Score") >= 4, lit("New Customers"))
    .when((col("R_Score") <= 2) & (col("F_Score") <= 2), lit("At Risk"))
    .otherwise(lit("Regular Customers"))
)

customer_segments.filter(col("Customer_Name") == "Sydney Waller").show()
customer_segments.show()

+-------------+---------+--------+-------+-------+-------+-------+---------+-------+
|Customer_Name|Frequency|Monetary|Recency|R_Score|F_Score|M_Score|RFM_Score|Segment|
+-------------+---------+--------+-------+-------+-------+-------+---------+-------+
|Sydney Waller|        2|   65.87|    904|      1|      2|      1|      121|At Risk|
+-------------+---------+--------+-------+-------+-------+-------+---------+-------+

+--------------------+---------+------------------+-------+-------+-------+-------+---------+-----------------+
|       Customer_Name|Frequency|          Monetary|Recency|R_Score|F_Score|M_Score|RFM_Score|          Segment|
+--------------------+---------+------------------+-------+-------+-------+-------+---------+-----------------+
|         Martha Cole|        3|              83.6|    957|      1|      2|      1|      121|          At Risk|
|      Courtney Burns|        3|            105.12|   1143|      1|      2|      1|      121|          At Risk|
|           Jo

In [None]:
# # Customer Segmentation based on RFM
# def segment_customer(r_score, f_score, m_score):
#     # Your code here


# segment_udf = udf(segment_customer, StringType())

In [None]:
# customer_segments = rfm_scored.withColumn("Segment",
#     segment_udf(col("R_Score"), col("F_Score"), col("M_Score")))

# customer_segments.show()

# 6.0 Joins with Multiple Tables

In [38]:
# Customer demographics table
customer_demo_data = [("John Doe", "Male", 35, "Engineer"),
                     ("Jane Smith", "Female", 28, "Doctor"),
                     ("Bob Johnson", "Male", 42, "Teacher")]

customer_demo_df = spark.createDataFrame(customer_demo_data,
                                       ["Customer_Name", "Gender", "Age", "Occupation"])

In [39]:
# Product category table
product_cat_data = [("Product_A", "Electronics"),
                   ("Product_B", "Clothing"),
                   ("Product_C", "Home")]

product_cat_df = spark.createDataFrame(product_cat_data,
                                     ["Product", "Category"])

In [40]:
# Perform joins
# Inner join with customer demographics
first_join_df = df.join(
    customer_demo_df,
    on="Customer_Name",
    how="inner"
)

# Left join with product category
final_df = first_join_df.join(
    product_cat_df,
    on="Product",
    how="left"
)

final_df.show(5)
customer_info = final_df.filter(col("Transaction_ID") == 1000036245).select("Customer_Name").show()

+--------------------+-------------+--------------+-------------------+-----------+----------+--------------+-----------+----------------+----------------+-----------------+------+--------------------+------+---+----------+--------+
|             Product|Customer_Name|Transaction_ID|               Date|Total_Items|Total_Cost|Payment_Method|       City|      Store_Type|Discount_Applied|Customer_Category|Season|           Promotion|Gender|Age|Occupation|Category|
+--------------------+-------------+--------------+-------------------+-----------+----------+--------------+-----------+----------------+----------------+-----------------+------+--------------------+------+---+----------+--------+
|['Shaving Cream',...|  Bob Johnson|    1000425331|2023-06-10 06:19:04|         10|     28.22|    Debit Card|   New York|Department Store|           false|      Middle-Aged|Summer|BOGO (Buy One Get...|  Male| 42|   Teacher|    NULL|
|['Deodorant', 'BB...|  Bob Johnson|    1000036245|2021-06-28 16:03:

# Advanced Analytics: Statistical Calculations

In [None]:
# Correlation analysis
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [None]:
numeric_cols = ["Total_Items", "Total_Cost"]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")

vector_df = assembler.transform(df).select("features")

In [None]:
# Calculate correlation matrix
correlation_matrix = Correlation.corr(vector_df, "features").head()
print("Correlation matrix:\n", correlation_matrix[0])

In [None]:
# Descriptive statistics
df.describe(["Total_Items", "Total_Cost"]).show()

In [None]:
# Seasonality analysis
seasonal_analysis = df.groupBy("Season") \
    .agg(
        sum("Total_Cost").alias("Seasonal_Revenue"),
        avg("Total_Cost").alias("Avg_Seasonal_Transaction"),
        count("Transaction_ID").alias("Seasonal_Transactions")
    ) \
    .orderBy(desc("Seasonal_Revenue"))

seasonal_analysis.show()

# Bonus

In [None]:
# # Save processed data
# customer_segments.write.mode("overwrite").csv("customer_segments", header=True)
# product_performance.write.mode("overwrite").csv("product_performance", header=True)

# # Convert to Pandas for visualization (optional)
# customer_segments_pd = customer_segments.toPandas()
# product_performance_pd = product_performance.toPandas()

# # Stop Spark session
# spark.stop()