In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, udf, avg, when
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
from datetime import datetime

In [None]:
# Start Spark Session
spark = SparkSession.builder.appName("CarAnalytics").getOrCreate()

# Load the dataset
file_path = "dataset_cleaned.csv"  # Update with the correct path
df = spark.read.csv(file_path, header=True, inferSchema=True)

# -----------------------
# 1. Data Preparation
# -----------------------

# drop column
df = df.drop("Name").drop("Engine").drop("Transmission").drop("condition").drop("Type")

df = df.withColumn("Mileage", regexp_replace(col("Mileage"), "[^0-9]", "").cast(FloatType()))

# Convert 'Giá' (Price) to numeric (e.g., "520 triệu" -> 520.000.000)
def clean_price(price):
    if price:
        price = price.replace(" tỉ ", "").replace(",", "").strip()
        price = price.replace(" triệu", "").replace(",", "").strip()
        return float(price) * 1e6 if price.isnumeric() else None
    return None

clean_price_udf = udf(clean_price, FloatType())
df = df.withColumn("Price", clean_price_udf(col("Price")))

# Add a new column for car age
df = df.withColumn("Car Age", datetime.now().year - col("Year"))

# convert null mileage to 0
df = df.withColumn("Mileage", when(col("Mileage").isNull(), 0).otherwise(col("Mileage")))

print(df.show())

NameError: name 'SparkSession' is not defined

In [None]:
# -----------------------
# 2. Complex Aggregations
# -----------------------

# Window Function: Average price by brand and year
window_spec = Window.partitionBy("Thương hiệu").orderBy("Năm SX")
df = df.withColumn("Avg Price by Brand", avg("Giá").over(window_spec))

# Pivot Operation: Average price by fuel type and body style
pivot_df = df.groupBy("Nhiên liệu").pivot("Kiểu dáng").avg("Giá")

print(df.show())

+-----------+--------+------+---------+----------+----------+--------+----------+----------+----------+------------+-------+--------------------+
|Thương hiệu|   Model|Năm SX|Kiểu dáng|Tình trạng|   Xuất xứ|Km đã đi|Tỉnh thành|Quận huyện|Nhiên liệu|         Giá|Car Age|  Avg Price by Brand|
+-----------+--------+------+---------+----------+----------+--------+----------+----------+----------+------------+-------+--------------------+
|        BMW|    Khác|  2011|    Sedan|     Xe cũ| Nhập khẩu| 83000.0|    Hà Nội| Hoàng Mai|  Máy xăng|      6.35E8|     13|              6.35E8|
|        BMW|    Khác|  2016|    Sedan|     Xe cũ| Nhập khẩu| 79000.0|    Hà Nội| Hoàng Mai|  Máy xăng|      5.99E8|      8|              6.17E8|
|        BMW|    Khác|  2020|    Sedan|     Xe cũ| Nhập khẩu| 44300.0|    Hà Nội| Hoàng Mai|  Máy xăng|3.19000013E9|      4|1.4746667093333333E9|
|        BMW|      X3|  2021|      SUV|     Xe cũ| Nhập khẩu| 61000.0|    Hà Nội| Hoàng Mai|  Máy xăng|      1.46E9|      3|

In [None]:
# 3.Transformations

df = df.filter(col("Năm SX") >= 2015)

# Custom UDF: Categorize cars by price range
def categorize_price(price):
    if not price:
        return None
    if price < 500000000:
        return "Low"
    elif price < 1000000000:
        return "Mid"
    return "High"

categorize_price_udf = udf(categorize_price)
df = df.withColumn("Price Category", categorize_price_udf(col("Giá")))
print(df.show())

+-----------+--------+------+---------+----------+----------+--------+----------+----------+----------+------------+-------+--------------------+--------------+
|Thương hiệu|   Model|Năm SX|Kiểu dáng|Tình trạng|   Xuất xứ|Km đã đi|Tỉnh thành|Quận huyện|Nhiên liệu|         Giá|Car Age|  Avg Price by Brand|Price Category|
+-----------+--------+------+---------+----------+----------+--------+----------+----------+----------+------------+-------+--------------------+--------------+
|        BMW|    Khác|  2016|    Sedan|     Xe cũ| Nhập khẩu| 79000.0|    Hà Nội| Hoàng Mai|  Máy xăng|      5.99E8|      8|              6.17E8|           Mid|
|        BMW|    Khác|  2020|    Sedan|     Xe cũ| Nhập khẩu| 44300.0|    Hà Nội| Hoàng Mai|  Máy xăng|3.19000013E9|      4|1.4746667093333333E9|          High|
|        BMW|      X3|  2021|      SUV|     Xe cũ| Nhập khẩu| 61000.0|    Hà Nội| Hoàng Mai|  Máy xăng|      1.46E9|      3|       1.471000032E9|          High|
|        BMW|    Khác|  2024|    S

In [None]:
# Stop Spark Session
spark.stop()