<a href="https://colab.research.google.com/github/HoMinhDuc05/DeTaiGit/blob/master/PTDLBL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 1. Cài đặt PySpark (Chạy trên Google Colab)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

# 2. Khởi tạo Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import random

spark = SparkSession.builder.appName("Lab_PySpark_Functions").getOrCreate()


# 3. Tạo dữ liệu giả lập (DataFrame)
data = []
cities = ["Hanoi", "HCM", "Danang", "Cantho"]
products = ["Laptop", "Mouse", "Keyboard", "Headphone", "Monitor"]
categories = ["Computer", "Accessory", "Accessory", "Audio", "Display"]

for i in range(1, 101): # Tạo 100 dòng
    prod_idx = random.randint(0, 4)
    row = (
        f"TRX_{i:03d}",                 # TransactionID
        products[prod_idx],             # Product
        categories[prod_idx],           # Category
        random.randint(10, 50) * 10,    # Price ($100 - $500)
        random.randint(1, 5),           # Quantity
        random.choice(cities)           # City
    )
    data.append(row)

columns = ["TransactionID", "Product", "Category", "Price", "Quantity", "City"]
df = spark.createDataFrame(data, columns)

print("✅ Đã tạo xong dữ liệu mẫu!")


✅ Đã tạo xong dữ liệu mẫu!


In [3]:
# Xem cấu trúc dữ liệu (Tên cột, kiểu dữ liệu)
print("--- Schema ---")
df.printSchema()

# Xem 5 dòng đầu tiên
print("--- 5 Dòng đầu ---")
df.show(10)

# Thống kê mô tả (Count, Mean, Min, Max) cột Giá
print("--- Thống kê giá ---")
df.describe("Price").show()

--- Schema ---
root
 |-- TransactionID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Price: long (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- City: string (nullable = true)

--- 5 Dòng đầu ---
+-------------+---------+---------+-----+--------+------+
|TransactionID|  Product| Category|Price|Quantity|  City|
+-------------+---------+---------+-----+--------+------+
|      TRX_001|    Mouse|Accessory|  380|       3|   HCM|
|      TRX_002|Headphone|    Audio|  260|       1|   HCM|
|      TRX_003|    Mouse|Accessory|  170|       1| Hanoi|
|      TRX_004|Headphone|    Audio|  200|       2|Danang|
|      TRX_005|  Monitor|  Display|  300|       3|Danang|
|      TRX_006|Headphone|    Audio|  300|       4| Hanoi|
|      TRX_007|  Monitor|  Display|  330|       2|Cantho|
|      TRX_008|    Mouse|Accessory|  360|       3|Danang|
|      TRX_009|    Mouse|Accessory|  500|       1|Danang|
|      TRX_010|    Mouse|Access

In [6]:
df.select("Product", "City").show(10)

+---------+------+
|  Product|  City|
+---------+------+
|    Mouse|   HCM|
|Headphone|   HCM|
|    Mouse| Hanoi|
|Headphone|Danang|
|  Monitor|Danang|
|Headphone| Hanoi|
|  Monitor|Cantho|
|    Mouse|Danang|
|    Mouse|Danang|
|    Mouse|   HCM|
+---------+------+
only showing top 10 rows


In [8]:
high_value_hanoi = df.filter((col("City") == "Hanoi") & (col("Price") > 300))
high_value_hanoi.show()

+-------------+---------+---------+-----+--------+-----+
|TransactionID|  Product| Category|Price|Quantity| City|
+-------------+---------+---------+-----+--------+-----+
|      TRX_021|    Mouse|Accessory|  410|       4|Hanoi|
|      TRX_022|    Mouse|Accessory|  450|       4|Hanoi|
|      TRX_023|  Monitor|  Display|  410|       3|Hanoi|
|      TRX_037|Headphone|    Audio|  480|       3|Hanoi|
|      TRX_041|  Monitor|  Display|  390|       2|Hanoi|
|      TRX_054|  Monitor|  Display|  400|       2|Hanoi|
|      TRX_059|    Mouse|Accessory|  370|       2|Hanoi|
|      TRX_071|  Monitor|  Display|  460|       3|Hanoi|
|      TRX_090|    Mouse|Accessory|  440|       5|Hanoi|
|      TRX_091|  Monitor|  Display|  420|       5|Hanoi|
+-------------+---------+---------+-----+--------+-----+



In [10]:
high_value_category = df.filter((col("Category") == "Accessory") & (col("Quantity") > 3))
high_value_category.show()

+-------------+--------+---------+-----+--------+------+
|TransactionID| Product| Category|Price|Quantity|  City|
+-------------+--------+---------+-----+--------+------+
|      TRX_013|Keyboard|Accessory|  300|       4|Danang|
|      TRX_021|   Mouse|Accessory|  410|       4| Hanoi|
|      TRX_022|   Mouse|Accessory|  450|       4| Hanoi|
|      TRX_053|   Mouse|Accessory|  420|       5|Cantho|
|      TRX_060|   Mouse|Accessory|  180|       4|Cantho|
|      TRX_078|Keyboard|Accessory|  170|       4|Danang|
|      TRX_090|   Mouse|Accessory|  440|       5| Hanoi|
|      TRX_100|Keyboard|Accessory|  330|       4|   HCM|
+-------------+--------+---------+-----+--------+------+



In [12]:
df_processed = df.withColumn("TotalValue", col("Price") * col("Quantity"))
df_processed.show(5)

+-------------+---------+---------+-----+--------+------+----------+
|TransactionID|  Product| Category|Price|Quantity|  City|TotalValue|
+-------------+---------+---------+-----+--------+------+----------+
|      TRX_001|    Mouse|Accessory|  380|       3|   HCM|      1140|
|      TRX_002|Headphone|    Audio|  260|       1|   HCM|       260|
|      TRX_003|    Mouse|Accessory|  170|       1| Hanoi|       170|
|      TRX_004|Headphone|    Audio|  200|       2|Danang|       400|
|      TRX_005|  Monitor|  Display|  300|       3|Danang|       900|
+-------------+---------+---------+-----+--------+------+----------+
only showing top 5 rows


In [18]:
df_DiscountedPrice = df.withColumn("DiscountedPrice", col("Price") * 0.9)
df_DiscountedPrice.show(5)

+-------------+---------+---------+-----+--------+------+---------------+
|TransactionID|  Product| Category|Price|Quantity|  City|DiscountedPrice|
+-------------+---------+---------+-----+--------+------+---------------+
|      TRX_001|    Mouse|Accessory|  380|       3|   HCM|          342.0|
|      TRX_002|Headphone|    Audio|  260|       1|   HCM|          234.0|
|      TRX_003|    Mouse|Accessory|  170|       1| Hanoi|          153.0|
|      TRX_004|Headphone|    Audio|  200|       2|Danang|          180.0|
|      TRX_005|  Monitor|  Display|  300|       3|Danang|          270.0|
+-------------+---------+---------+-----+--------+------+---------------+
only showing top 5 rows


In [20]:
# Bước 1: Phải có cột TotalValue trước (lấy từ phần trước)
df_cal = df.withColumn("TotalValue", col("Price") * col("Quantity"))

# Bước 2: GroupBy và Sum
report_city = df_cal.groupBy("City") \
                  .agg(sum("TotalValue").alias("Revenue")) \
                  .orderBy(col("Revenue").desc()) # Sắp xếp giảm dần

report_city.show()

+------+-------+
|  City|Revenue|
+------+-------+
|Cantho|  21780|
|   HCM|  21150|
| Hanoi|  17810|
|Danang|  17200|
+------+-------+



In [25]:
report_category = (
    df.groupBy("Category")
      .agg(sum("Quantity").alias("TotalQuantity"))
      .orderBy(col("TotalQuantity").desc())
)

report_category.show()

+---------+-------------+
| Category|TotalQuantity|
+---------+-------------+
|Accessory|           88|
|    Audio|           81|
|  Display|           61|
| Computer|           29|
+---------+-------------+



In [27]:
# Bước 1: Tạo một "View" tạm thời (giống như bảng ảo)
df.createOrReplaceTempView("SalesTable")

# Bước 2: Viết SQL
sql_result = spark.sql("SELECT Product, Price, City FROM SalesTable WHERE Price > 400")

sql_result.show()

+---------+-----+------+
|  Product|Price|  City|
+---------+-----+------+
|    Mouse|  500|Danang|
|    Mouse|  480|   HCM|
|    Mouse|  410| Hanoi|
|    Mouse|  450| Hanoi|
|  Monitor|  410| Hanoi|
|Headphone|  460|   HCM|
|    Mouse|  440|Cantho|
|Headphone|  500|   HCM|
|Headphone|  480| Hanoi|
|Headphone|  430|Cantho|
|   Laptop|  480|Cantho|
|Headphone|  420|Danang|
|Headphone|  430|Cantho|
|    Mouse|  420|Cantho|
|   Laptop|  490|   HCM|
|    Mouse|  490|   HCM|
|  Monitor|  460|   HCM|
|  Monitor|  460| Hanoi|
|   Laptop|  470|Cantho|
|  Monitor|  470|Cantho|
+---------+-----+------+
only showing top 20 rows


In [30]:
# Nếu df là DataFrame, nhớ tạo view trước
df.createOrReplaceTempView("SalesTable")

# Dùng spark.sql để tính AVG(Price) tại HCM
avg_price_hcm = spark.sql("""
    SELECT AVG(Price) AS AvgPrice
    FROM SalesTable
    WHERE City = 'HCM'""")
avg_price_hcm.show()


+------------------+
|          AvgPrice|
+------------------+
|315.38461538461536|
+------------------+



In [31]:
# Lưu kết quả ra file Parquet (định dạng tối ưu cho Big Data)
output_path = "processed_sales_data"

# mode("overwrite"): Ghi đè nếu folder đã tồn tại
df.write.mode("overwrite").parquet(output_path)

print(f"✅ Đã lưu dữ liệu vào thư mục: {output_path}")

# Kiểm tra lại bằng cách đọc lên
spark.read.parquet(output_path).show(3)

✅ Đã lưu dữ liệu vào thư mục: processed_sales_data
+-------------+---------+---------+-----+--------+-----+
|TransactionID|  Product| Category|Price|Quantity| City|
+-------------+---------+---------+-----+--------+-----+
|      TRX_001|    Mouse|Accessory|  380|       3|  HCM|
|      TRX_002|Headphone|    Audio|  260|       1|  HCM|
|      TRX_003|    Mouse|Accessory|  170|       1|Hanoi|
+-------------+---------+---------+-----+--------+-----+
only showing top 3 rows
