#**PYSPARK ESSENTIALS - PHÂN TÍCH DỮ LIỆU BÁN LẺ**

**Mục tiêu**: Sinh viên hiểu và vận dụng được các nhóm hàm cốt lõi của Spark: Inspection, Transformation, Aggregation và I/O. Bối cảnh: Bạn là Data Engineer cho chuỗi siêu thị "TechMart". Bạn cần xử lý dữ liệu giao dịch để báo cáo doanh thu

##**PHẦN 0: CHUẨN BỊ MÔI TRƯỜNG & DỮ LIỆU**
Chạy cell này để cài đặt Spark và tạo dữ liệu giả lập (Mock Data)

In [None]:
# 1. Cài đặt PySpark (Chạy trên Google Colab)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

# 2. Khởi tạo Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import random

spark = SparkSession.builder.appName("Lab_PySpark_Functions").getOrCreate()


# 3. Tạo dữ liệu giả lập (DataFrame)
data = []
cities = ["Hanoi", "HCM", "Danang", "Cantho"]
products = ["Laptop", "Mouse", "Keyboard", "Headphone", "Monitor"]
categories = ["Computer", "Accessory", "Accessory", "Audio", "Display"]

for i in range(1, 101): # Tạo 100 dòng
    prod_idx = random.randint(0, 4)
    row = (
        f"TRX_{i:03d}",                 # TransactionID
        products[prod_idx],             # Product
        categories[prod_idx],           # Category
        random.randint(10, 50) * 10,    # Price ($100 - $500)
        random.randint(1, 5),           # Quantity
        random.choice(cities)           # City
    )
    data.append(row)

columns = ["TransactionID", "Product", "Category", "Price", "Quantity", "City"]
df = spark.createDataFrame(data, columns)

print("✅ Đã tạo xong dữ liệu mẫu!")

✅ Đã tạo xong dữ liệu mẫu!


##**PHẦN 1: KHÁM PHÁ DỮ LIỆU (INSPECTION)**

**Các hàm**: show(), printSchema(), select(), describe()
###**1. Ví dụ minh họa**
Để hiểu dữ liệu đang có gì, chúng ta cần xem cấu trúc và một vài dòng mẫu.

In [None]:
# Xem cấu trúc dữ liệu (Tên cột, kiểu dữ liệu)
print("--- Schema ---")
df.printSchema()

# Xem 5 dòng đầu tiên
print("--- 5 Dòng đầu ---")
df.show(5)

# Thống kê mô tả (Count, Mean, Min, Max) cột Giá
print("--- Thống kê giá ---")
df.describe("Price").show()

--- Schema ---
root
 |-- TransactionID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Price: long (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- City: string (nullable = true)

--- 5 Dòng đầu ---
+-------------+---------+---------+-----+--------+------+
|TransactionID|  Product| Category|Price|Quantity|  City|
+-------------+---------+---------+-----+--------+------+
|      TRX_001|  Monitor|  Display|  230|       1|Danang|
|      TRX_002|   Laptop| Computer|  190|       3|Danang|
|      TRX_003|    Mouse|Accessory|  100|       3|Danang|
|      TRX_004|Headphone|    Audio|  250|       3|Cantho|
|      TRX_005|   Laptop| Computer|  460|       1|   HCM|
+-------------+---------+---------+-----+--------+------+
only showing top 5 rows
--- Thống kê giá ---
+-------+------------------+
|summary|             Price|
+-------+------------------+
|  count|               100|
|   mean|             309.2|
| stddev|120.29

###**Bài tập 1 (Sinh viên code tại đây)**
**Yêu cầu:** Hãy dùng hàm select() để chỉ hiển thị 2 cột là Product và City, và chỉ hiện 10 dòng đầu tiên.

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
# Goi y: df.select(..., ...).show(...)

##**PHẦN 2: LỌC DỮ LIỆU (FILTERING)**

**Các hàm:** filter() (hoặc where), toán tử & (AND), | (OR).

###**2. Ví dụ minh họa**
Yêu cầu tìm các đơn hàng bán tại "Hanoi" có giá trị sản phẩm trên $300.

In [3]:
# Lọc theo 2 điều kiện kết hợp
# Lưu ý: Mỗi điều kiện phải để trong ngoặc đơn ()
high_value_hanoi = df.filter((col("City") == "Hanoi") & (col("Price") > 300))

high_value_hanoi.show()

+-------------+---------+---------+-----+--------+-----+
|TransactionID|  Product| Category|Price|Quantity| City|
+-------------+---------+---------+-----+--------+-----+
|      TRX_011|   Laptop| Computer|  470|       3|Hanoi|
|      TRX_034| Keyboard|Accessory|  420|       3|Hanoi|
|      TRX_040|  Monitor|  Display|  380|       4|Hanoi|
|      TRX_051|  Monitor|  Display|  440|       1|Hanoi|
|      TRX_059|    Mouse|Accessory|  360|       1|Hanoi|
|      TRX_070|  Monitor|  Display|  330|       1|Hanoi|
|      TRX_073|  Monitor|  Display|  490|       3|Hanoi|
|      TRX_077|Headphone|    Audio|  480|       3|Hanoi|
|      TRX_085|  Monitor|  Display|  360|       2|Hanoi|
+-------------+---------+---------+-----+--------+-----+



###**Bài tập 2 (Sinh viên code tại đây)**

**Yêu cầu:** Hãy lọc ra các đơn hàng thuộc danh mục (Category) là "Accessory" HOẶC đơn hàng có số lượng (Quantity) lớn hơn 3.

In [None]:
# CODE CỦA BẠN Ở ĐÂY:

##**PHẦN 3: BIẾN ĐỔI CỘT (TRANSFORMATION)**
**Các hàm:** withColumn() (Thêm/Sửa cột), drop() (Xóa cột)
###**3. Ví dụ minh họa**
Chúng ta cần tính tổng tiền cho mỗi đơn hàng. Công thức: Total = Price * Quantity.

In [4]:
# Tạo cột mới tên là "TotalValue"
df_processed = df.withColumn("TotalValue", col("Price") * col("Quantity"))

df_processed.show(5)

+-------------+---------+---------+-----+--------+------+----------+
|TransactionID|  Product| Category|Price|Quantity|  City|TotalValue|
+-------------+---------+---------+-----+--------+------+----------+
|      TRX_001|  Monitor|  Display|  230|       1|Danang|       230|
|      TRX_002|   Laptop| Computer|  190|       3|Danang|       570|
|      TRX_003|    Mouse|Accessory|  100|       3|Danang|       300|
|      TRX_004|Headphone|    Audio|  250|       3|Cantho|       750|
|      TRX_005|   Laptop| Computer|  460|       1|   HCM|       460|
+-------------+---------+---------+-----+--------+------+----------+
only showing top 5 rows


###**Bài tập 3 (Sinh viên code tại đây)**
**Yêu cầu**: Nhân dịp khuyến mãi, hãy tạo thêm một cột mới tên là DiscountedPrice. Giá trị cột này bằng 90% giá gốc (Price * 0.9).

In [None]:
# CODE CỦA BẠN Ở ĐÂY:

##**PHẦN 4: TỔNG HỢP & THỐNG KÊ (AGGREGATION) - QUAN TRỌNG**
**Các hàm**: groupBy(), agg(), sum(), count(), avg(), max().

###**4. Ví dụ minh họa**
Tính tổng doanh thu theo từng Thành phố để xem nơi nào bán tốt nhất.

In [5]:
# Bước 1: Phải có cột TotalValue trước (lấy từ phần trước)
df_cal = df.withColumn("TotalValue", col("Price") * col("Quantity"))

# Bước 2: GroupBy và Sum
report_city = df_cal.groupBy("City") \
                  .agg(sum("TotalValue").alias("Revenue")) \
                  .orderBy(col("Revenue").desc()) # Sắp xếp giảm dần

report_city.show()

+------+-------+
|  City|Revenue|
+------+-------+
|Cantho|  30910|
|   HCM|  25520|
|Danang|  20970|
| Hanoi|  13270|
+------+-------+



###**Bài tập 4 (Sinh viên code tại đây)**
**Yêu cầu**: Hãy tính Tổng số lượng sản phẩm (Quantity) đã bán được theo từng Danh mục (Category).

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
# Goi y: groupBy("Category").agg(sum("..."))

##**PHẦN 5: SQL TRONG SPARK (SPARK SQL)**
**Khái niệm**: Nếu bạn quen dùng SQL, Spark cho phép bạn viết query trực tiếp.

###**5. Ví dụ minh họa**
Tìm các sản phẩm có giá > 400 bằng câu lệnh SQL.

In [6]:
# Bước 1: Tạo một "View" tạm thời (giống như bảng ảo)
df.createOrReplaceTempView("SalesTable")

# Bước 2: Viết SQL
sql_result = spark.sql("SELECT Product, Price, City FROM SalesTable WHERE Price > 400")

sql_result.show()

+---------+-----+------+
|  Product|Price|  City|
+---------+-----+------+
|   Laptop|  460|   HCM|
|   Laptop|  470| Hanoi|
|    Mouse|  470|Danang|
|   Laptop|  430|   HCM|
|  Monitor|  480|   HCM|
| Keyboard|  470|   HCM|
|    Mouse|  430|Danang|
|Headphone|  440|Danang|
| Keyboard|  420| Hanoi|
| Keyboard|  470|Cantho|
|  Monitor|  460|   HCM|
|   Laptop|  420|Danang|
|  Monitor|  470|Cantho|
|  Monitor|  440| Hanoi|
|  Monitor|  450|   HCM|
|   Laptop|  410|Cantho|
|  Monitor|  440|Cantho|
| Keyboard|  410|Cantho|
|   Laptop|  420|Cantho|
|   Laptop|  470|Danang|
+---------+-----+------+
only showing top 20 rows


###**Bài tập 5 (Sinh viên code tại đây)**
**Yêu cầu**: Dùng spark.sql để tính trung bình cộng giá (AVG(Price)) của các sản phẩm bán tại 'HCM'.

In [None]:
# CODE CỦA BẠN Ở ĐÂY:
# Query mau: "SELECT AVG(...) as AvgPrice FROM SalesTable WHERE ..."

##**PHẦN KẾT: LƯU TRỮ DỮ LIỆU (PARQUET)**
Sau khi xử lý xong, Data Engineer phải lưu dữ liệu lại.

In [7]:
# Lưu kết quả ra file Parquet (định dạng tối ưu cho Big Data)
output_path = "processed_sales_data"

# mode("overwrite"): Ghi đè nếu folder đã tồn tại
df.write.mode("overwrite").parquet(output_path)

print(f"✅ Đã lưu dữ liệu vào thư mục: {output_path}")

# Kiểm tra lại bằng cách đọc lên
spark.read.parquet(output_path).show(3)

✅ Đã lưu dữ liệu vào thư mục: processed_sales_data
+-------------+-------+--------+-----+--------+------+
|TransactionID|Product|Category|Price|Quantity|  City|
+-------------+-------+--------+-----+--------+------+
|      TRX_051|Monitor| Display|  440|       1| Hanoi|
|      TRX_052|Monitor| Display|  450|       1|   HCM|
|      TRX_053|Monitor| Display|  290|       3|Danang|
+-------------+-------+--------+-----+--------+------+
only showing top 3 rows
