<a href="https://colab.research.google.com/github/PhongDangHocCode/gioithieu.md/blob/main/lab_03022026.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**PYSPARK ESSENTIALS - PHÂN TÍCH DỮ LIỆU BÁN LẺ**

**Mục tiêu**: Sinh viên hiểu và vận dụng được các nhóm hàm cốt lõi của Spark: Inspection, Transformation, Aggregation và I/O. Bối cảnh: Bạn là Data Engineer cho chuỗi siêu thị "TechMart". Bạn cần xử lý dữ liệu giao dịch để báo cáo doanh thu

##**PHẦN 0: CHUẨN BỊ MÔI TRƯỜNG & DỮ LIỆU**
Chạy cell này để cài đặt Spark và tạo dữ liệu giả lập (Mock Data)

In [None]:
# 1. Cài đặt PySpark (Chạy trên Google Colab)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

# 2. Khởi tạo Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import random

spark = SparkSession.builder.appName("Lab_PySpark_Functions").getOrCreate()


# 3. Tạo dữ liệu giả lập (DataFrame)
data = []
cities = ["Hanoi", "HCM", "Danang", "Cantho"]
products = ["Laptop", "Mouse", "Keyboard", "Headphone", "Monitor"]
categories = ["Computer", "Accessory", "Accessory", "Audio", "Display"]

for i in range(1, 101): # Tạo 100 dòng
    prod_idx = random.randint(0, 4)
    row = (
        f"TRX_{i:03d}",                 # TransactionID
        products[prod_idx],             # Product
        categories[prod_idx],           # Category
        random.randint(10, 50) * 10,    # Price ($100 - $500)
        random.randint(1, 5),           # Quantity
        random.choice(cities)           # City
    )
    data.append(row)

columns = ["TransactionID", "Product", "Category", "Price", "Quantity", "City"]
df = spark.createDataFrame(data, columns)

print("✅ Đã tạo xong dữ liệu mẫu!")

✅ Đã tạo xong dữ liệu mẫu!


##**PHẦN 1: KHÁM PHÁ DỮ LIỆU (INSPECTION)**

**Các hàm**: show(), printSchema(), select(), describe()
###**1. Ví dụ minh họa**
Để hiểu dữ liệu đang có gì, chúng ta cần xem cấu trúc và một vài dòng mẫu.

In [None]:
# Xem cấu trúc dữ liệu (Tên cột, kiểu dữ liệu)
print("--- Schema ---")
df.printSchema()

# Xem 5 dòng đầu tiên
print("--- 5 Dòng đầu ---")
df.show(5)

# Thống kê mô tả (Count, Mean, Min, Max) cột Giá
print("--- Thống kê giá ---")
df.describe("Price").show()

--- Schema ---
root
 |-- TransactionID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Price: long (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- City: string (nullable = true)

--- 5 Dòng đầu ---
+-------------+--------+---------+-----+--------+------+
|TransactionID| Product| Category|Price|Quantity|  City|
+-------------+--------+---------+-----+--------+------+
|      TRX_001|Keyboard|Accessory|  210|       5|Cantho|
|      TRX_002|   Mouse|Accessory|  280|       5| Hanoi|
|      TRX_003| Monitor|  Display|  360|       1|   HCM|
|      TRX_004|   Mouse|Accessory|  360|       1|   HCM|
|      TRX_005|Keyboard|Accessory|  170|       1| Hanoi|
+-------------+--------+---------+-----+--------+------+
only showing top 5 rows
--- Thống kê giá ---
+-------+------------------+
|summary|             Price|
+-------+------------------+
|  count|               100|
|   mean|             303.6|
| stddev|122.39627643375

###**Bài tập 1 (Sinh viên code tại đây)**
**Yêu cầu:** Hãy dùng hàm select() để chỉ hiển thị 2 cột là Product và City, và chỉ hiện 10 dòng đầu tiên.

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
# Goi y: df.select(..., ...).show(...)
df.select("Product", "City").show(10)

+--------+------+
| Product|  City|
+--------+------+
|Keyboard|Cantho|
|   Mouse| Hanoi|
| Monitor|   HCM|
|   Mouse|   HCM|
|Keyboard| Hanoi|
|  Laptop|Danang|
| Monitor|   HCM|
|  Laptop|Danang|
|  Laptop|   HCM|
|  Laptop|Cantho|
+--------+------+
only showing top 10 rows


##**PHẦN 2: LỌC DỮ LIỆU (FILTERING)**

**Các hàm:** filter() (hoặc where), toán tử & (AND), | (OR).

###**2. Ví dụ minh họa**
Yêu cầu tìm các đơn hàng bán tại "Hanoi" có giá trị sản phẩm trên $300.

In [None]:
# Lọc theo 2 điều kiện kết hợp
# Lưu ý: Mỗi điều kiện phải để trong ngoặc đơn ()
high_value_hanoi = df.filter((col("City") == "Hanoi") & (col("Price") > 300))

high_value_hanoi.show()

+-------------+---------+---------+-----+--------+-----+
|TransactionID|  Product| Category|Price|Quantity| City|
+-------------+---------+---------+-----+--------+-----+
|      TRX_012|Headphone|    Audio|  400|       5|Hanoi|
|      TRX_017| Keyboard|Accessory|  470|       5|Hanoi|
|      TRX_022|   Laptop| Computer|  400|       3|Hanoi|
|      TRX_028|   Laptop| Computer|  430|       4|Hanoi|
|      TRX_038|    Mouse|Accessory|  450|       3|Hanoi|
|      TRX_044|   Laptop| Computer|  330|       1|Hanoi|
|      TRX_045|    Mouse|Accessory|  370|       2|Hanoi|
|      TRX_054|   Laptop| Computer|  440|       1|Hanoi|
|      TRX_069|  Monitor|  Display|  470|       2|Hanoi|
|      TRX_076|Headphone|    Audio|  450|       4|Hanoi|
|      TRX_078|Headphone|    Audio|  480|       2|Hanoi|
|      TRX_079|  Monitor|  Display|  430|       2|Hanoi|
|      TRX_080|  Monitor|  Display|  380|       1|Hanoi|
|      TRX_096|    Mouse|Accessory|  500|       3|Hanoi|
|      TRX_099|   Laptop| Compu

###**Bài tập 2 (Sinh viên code tại đây)**

**Yêu cầu:** Hãy lọc ra các đơn hàng thuộc danh mục (Category) là "Accessory" HOẶC đơn hàng có số lượng (Quantity) lớn hơn 3.

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
filtered_df = df.filter((col("Category") == "Accessory") & (col("Quantity") > 3))
filtered_df.show(20)


+-------------+--------+---------+-----+--------+------+
|TransactionID| Product| Category|Price|Quantity|  City|
+-------------+--------+---------+-----+--------+------+
|      TRX_001|Keyboard|Accessory|  210|       5|Cantho|
|      TRX_002|   Mouse|Accessory|  280|       5| Hanoi|
|      TRX_011|Keyboard|Accessory|  440|       4|Danang|
|      TRX_014|   Mouse|Accessory|  430|       5|Cantho|
|      TRX_017|Keyboard|Accessory|  470|       5| Hanoi|
|      TRX_020|   Mouse|Accessory|  470|       5|Danang|
|      TRX_063|Keyboard|Accessory|  420|       5|Danang|
|      TRX_071|   Mouse|Accessory|  290|       4|   HCM|
|      TRX_072|   Mouse|Accessory|  480|       5|Cantho|
|      TRX_077|   Mouse|Accessory|  310|       5|Cantho|
|      TRX_082|   Mouse|Accessory|  230|       5|Danang|
|      TRX_084|Keyboard|Accessory|  500|       5|Cantho|
|      TRX_093|   Mouse|Accessory|  200|       5|Danang|
+-------------+--------+---------+-----+--------+------+



##**PHẦN 3: BIẾN ĐỔI CỘT (TRANSFORMATION)**
**Các hàm:** withColumn() (Thêm/Sửa cột), drop() (Xóa cột)
###**3. Ví dụ minh họa**
Chúng ta cần tính tổng tiền cho mỗi đơn hàng. Công thức: Total = Price * Quantity.

In [None]:
# Tạo cột mới tên là "TotalValue"
df_processed = df.withColumn("TotalValue", col("Price") * col("Quantity"))

df_processed.show(5)

+-------------+---------+---------+-----+--------+------+----------+
|TransactionID|  Product| Category|Price|Quantity|  City|TotalValue|
+-------------+---------+---------+-----+--------+------+----------+
|      TRX_001|  Monitor|  Display|  230|       1|Danang|       230|
|      TRX_002|   Laptop| Computer|  190|       3|Danang|       570|
|      TRX_003|    Mouse|Accessory|  100|       3|Danang|       300|
|      TRX_004|Headphone|    Audio|  250|       3|Cantho|       750|
|      TRX_005|   Laptop| Computer|  460|       1|   HCM|       460|
+-------------+---------+---------+-----+--------+------+----------+
only showing top 5 rows


###**Bài tập 3 (Sinh viên code tại đây)**
**Yêu cầu**: Nhân dịp khuyến mãi, hãy tạo thêm một cột mới tên là DiscountedPrice. Giá trị cột này bằng 90% giá gốc (Price * 0.9).

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
df.discoutedPrice = df.withColumn("DiscountedPrice", col("Price") * 0.9)
df.show(10)

+-------------+--------+---------+-----+--------+------+
|TransactionID| Product| Category|Price|Quantity|  City|
+-------------+--------+---------+-----+--------+------+
|      TRX_001|Keyboard|Accessory|  210|       5|Cantho|
|      TRX_002|   Mouse|Accessory|  280|       5| Hanoi|
|      TRX_003| Monitor|  Display|  360|       1|   HCM|
|      TRX_004|   Mouse|Accessory|  360|       1|   HCM|
|      TRX_005|Keyboard|Accessory|  170|       1| Hanoi|
|      TRX_006|  Laptop| Computer|  150|       1|Danang|
|      TRX_007| Monitor|  Display|  110|       5|   HCM|
|      TRX_008|  Laptop| Computer|  160|       4|Danang|
|      TRX_009|  Laptop| Computer|  460|       2|   HCM|
|      TRX_010|  Laptop| Computer|  170|       5|Cantho|
+-------------+--------+---------+-----+--------+------+
only showing top 10 rows


##**PHẦN 4: TỔNG HỢP & THỐNG KÊ (AGGREGATION) - QUAN TRỌNG**
**Các hàm**: groupBy(), agg(), sum(), count(), avg(), max().

###**4. Ví dụ minh họa**
Tính tổng doanh thu theo từng Thành phố để xem nơi nào bán tốt nhất.

In [None]:
# Bước 1: Phải có cột TotalValue trước (lấy từ phần trước)
df_cal = df.withColumn("TotalValue", col("Price") * col("Quantity"))

# Bước 2: GroupBy và Sum
report_city = df_cal.groupBy("City") \
                  .agg(sum("TotalValue").alias("Revenue")) \
                  .orderBy(col("Revenue").desc()) # Sắp xếp giảm dần

report_city.show()

+------+-------+
|  City|Revenue|
+------+-------+
| Hanoi|  24370|
|Cantho|  23200|
|Danang|  20960|
|   HCM|  20590|
+------+-------+



###**Bài tập 4 (Sinh viên code tại đây)**
**Yêu cầu**: Hãy tính Tổng số lượng sản phẩm (Quantity) đã bán được theo từng Danh mục (Category).

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
# Goi y: groupBy("Category").agg(sum("..."))
df.groupBy("Category").agg(sum("Quantity").alias("TotalQuantity")).show()

+---------+-------------+
| Category|TotalQuantity|
+---------+-------------+
|  Display|           46|
|Accessory|          115|
| Computer|           63|
|    Audio|           65|
+---------+-------------+



##**PHẦN 5: SQL TRONG SPARK (SPARK SQL)**
**Khái niệm**: Nếu bạn quen dùng SQL, Spark cho phép bạn viết query trực tiếp.

###**5. Ví dụ minh họa**
Tìm các sản phẩm có giá > 400 bằng câu lệnh SQL.

In [None]:
# Bước 1: Tạo một "View" tạm thời (giống như bảng ảo)
df.createOrReplaceTempView("SalesTable")

# Bước 2: Viết SQL
sql_result = spark.sql("SELECT Product, Price, City FROM SalesTable WHERE Price > 400")

sql_result.show()

+---------+-----+------+
|  Product|Price|  City|
+---------+-----+------+
|   Laptop|  460|   HCM|
| Keyboard|  440|Danang|
|    Mouse|  430|Cantho|
|Headphone|  410|Danang|
| Keyboard|  470| Hanoi|
|    Mouse|  470|Danang|
|   Laptop|  440|Cantho|
|Headphone|  430|Danang|
|   Laptop|  430| Hanoi|
|  Monitor|  440|   HCM|
|    Mouse|  490|   HCM|
|Headphone|  430|Danang|
|    Mouse|  450| Hanoi|
|    Mouse|  480|   HCM|
|   Laptop|  440| Hanoi|
| Keyboard|  420|Danang|
| Keyboard|  470|   HCM|
|  Monitor|  470| Hanoi|
|    Mouse|  480|Cantho|
|  Monitor|  500|   HCM|
+---------+-----+------+
only showing top 20 rows


###**Bài tập 5 (Sinh viên code tại đây)**
**Yêu cầu**: Dùng spark.sql để tính trung bình cộng giá (AVG(Price)) của các sản phẩm bán tại 'HCM'.

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
# Goi y: Query mau: "SELECT AVG(...) as AvgPrice FROM SalesTable WHERE ..."
spark.sql("SELECT AVG(Price) as AvgPrice FROM SalesTable WHERE City = 'HCM'").show()

+-----------------+
|         AvgPrice|
+-----------------+
|307.3076923076923|
+-----------------+



##**PHẦN KẾT: LƯU TRỮ DỮ LIỆU (PARQUET)**
Sau khi xử lý xong, Data Engineer phải lưu dữ liệu lại.

In [None]:
# Lưu kết quả ra file Parquet (định dạng tối ưu cho Big Data)
output_path = "processed_sales_data"

# mode("overwrite"): Ghi đè nếu folder đã tồn tại
df.write.mode("overwrite").parquet(output_path)

print(f"✅ Đã lưu dữ liệu vào thư mục: {output_path}")

# Kiểm tra lại bằng cách đọc lên
spark.read.parquet(output_path).show(3)

✅ Đã lưu dữ liệu vào thư mục: processed_sales_data
+-------------+-------+---------+-----+--------+----+
|TransactionID|Product| Category|Price|Quantity|City|
+-------------+-------+---------+-----+--------+----+
|      TRX_051|Monitor|  Display|  140|       2| HCM|
|      TRX_052|  Mouse|Accessory|  480|       3| HCM|
|      TRX_053|Monitor|  Display|  120|       4| HCM|
+-------------+-------+---------+-----+--------+----+
only showing top 3 rows
