In [1]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import os

In [2]:
spark = SparkSession.builder \
        .appName("Silver") \
        .master("local[*]") \
        .getOrCreate()

#### Import file

In [3]:
bus_ticket = spark.read.csv("../raw/bus_data_07-03-2025.csv", header=True)

bus_ticket.show(20)

+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|Bus_Key|            Bus_Name|Start_Date|           Route|Departure_Time|Arrival_Time|     Departure_Place|       Arrival_Place|Duration|            Type_Bus|      Price|
+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|      1|   Vũ Linh limousine|07-03-2025|TP.HCM - Can Tho|         10:00|       13:15| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      2|   Vũ Linh limousine|07-03-2025|TP.HCM - Can Tho|         12:00|       15:15| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      3|   Vũ Linh limousine|07-03-2025|TP.HCM - Can Tho|         11:01|       14:16| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai 

#### Check data type 

In [17]:
bus_ticket.printSchema()

root
 |-- Bus_Key: string (nullable = true)
 |-- Bus_Name: string (nullable = true)
 |-- Start_Date: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Departure_Time: string (nullable = true)
 |-- Arrival_Time: string (nullable = true)
 |-- Departure_Place: string (nullable = true)
 |-- Arrival_Place: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Type_Bus: string (nullable = true)
 |-- Price: string (nullable = true)



#### Edit Bus_Key and sort column Bus_Key

In [18]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

bus_ticket = bus_ticket.withColumn("Bus_Key", bus_ticket["Bus_Key"].cast("int"))

# Thêm cột tạm thời để tạo thứ tự (vì monotonically_increasing_id() không liên tiếp)
bus_ticket = bus_ticket.withColumn("temp_id", monotonically_increasing_id())

# Định nghĩa cửa sổ sắp xếp theo temp_id
window_spec = Window.orderBy("temp_id")

# Đánh số từ 1
bus_ticket = bus_ticket.withColumn("Bus_Key", row_number().over(window_spec))

# Xóa cột tạm
bus_ticket = bus_ticket.drop("temp_id")

bus_ticket.show(20)

+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|Bus_Key|            Bus_Name|Start_Date|           Route|Departure_Time|Arrival_Time|     Departure_Place|       Arrival_Place|Duration|            Type_Bus|      Price|
+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|      1|   Vũ Linh limousine|07-03-2025|TP.HCM - Can Tho|         10:00|       13:15| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      2|   Vũ Linh limousine|07-03-2025|TP.HCM - Can Tho|         12:00|       15:15| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      3|   Vũ Linh limousine|07-03-2025|TP.HCM - Can Tho|         11:01|       14:16| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai 

#### Convert data type column Start_Date

In [19]:
from pyspark.sql.functions import to_date

bus_ticket = bus_ticket.withColumn("Start_Date", to_date("Start_Date", "MM-dd-yyyy"))

bus_ticket.show(20)

+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|Bus_Key|            Bus_Name|Start_Date|           Route|Departure_Time|Arrival_Time|     Departure_Place|       Arrival_Place|Duration|            Type_Bus|      Price|
+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|      1|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         10:00|       13:15| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      2|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         12:00|       15:15| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      3|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         11:01|       14:16| • Văn Phòng Sài Gòn|• Văn Phòng Công ...|   3h15m|Huyndai 

#### Regex column

In [20]:
from pyspark.sql.functions import regexp_replace, lower

def standardize_string(text):
  if text is not None:
    text = lower(text)
    text = regexp_replace(text, "[^a-zA-Z0-9\sáàảãạăắằẳẵặâấầẩẫậéèẻẽẹêếềểễệíìỉĩịóòỏõọôốồổỗộơớờởỡợúùủũụưứừửữựýỳỷỹỵđ]", "")
    text = regexp_replace(text, r"\bvp\b", "văn phòng")
    return text
  else:
    return text

bus_ticket = bus_ticket.withColumn("Departure_Place", standardize_string(bus_ticket["Departure_Place"]))
bus_ticket = bus_ticket.withColumn("Arrival_Place", standardize_string(bus_ticket["Arrival_Place"]))

bus_ticket.show(20)

+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|Bus_Key|            Bus_Name|Start_Date|           Route|Departure_Time|Arrival_Time|     Departure_Place|       Arrival_Place|Duration|            Type_Bus|      Price|
+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|      1|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         10:00|       13:15|   văn phòng sài gòn| văn phòng công t...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      2|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         12:00|       15:15|   văn phòng sài gòn| văn phòng công t...|   3h15m|Huyndai Solati 11...|Từ 200.000đ|
|      3|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         11:01|       14:16|   văn phòng sài gòn| văn phòng công t...|   3h15m|Huyndai 

#### Calculate and convert column Duration

In [21]:
import re
from pyspark.sql.functions import udf, round as F_round
from pyspark.sql.types import FloatType

def convert_duration(duration):
    if duration is None:
        return None
    match = re.match(r"(\d+)h?(\d*)m?", duration)
    if match:
        hours = int(match.group(1)) if match.group(1) else 0
        minutes = int(match.group(2)) if match.group(2) else 0
        return hours + minutes / 60
    return None

convert_duration_udf = udf(convert_duration, FloatType())

bus_ticket = bus_ticket.withColumn("Duration", F_round(convert_duration_udf(bus_ticket["Duration"]), 2))
bus_ticket.show(20)

+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|Bus_Key|            Bus_Name|Start_Date|           Route|Departure_Time|Arrival_Time|     Departure_Place|       Arrival_Place|Duration|            Type_Bus|      Price|
+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+-----------+
|      1|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         10:00|       13:15|   văn phòng sài gòn| văn phòng công t...|    3.25|Huyndai Solati 11...|Từ 200.000đ|
|      2|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         12:00|       15:15|   văn phòng sài gòn| văn phòng công t...|    3.25|Huyndai Solati 11...|Từ 200.000đ|
|      3|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         11:01|       14:16|   văn phòng sài gòn| văn phòng công t...|    3.25|Huyndai 

#### Convert column Price

In [22]:
import re
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

def remove_price_chars(price_str):
    if price_str is not None:
        price_str = re.sub(r"[^0-9]", "", price_str)
        return int(price_str) if price_str else None
    return None

remove_price_chars_udf = udf(remove_price_chars, IntegerType())

bus_ticket = bus_ticket.withColumn("Price", remove_price_chars_udf(col("Price")))
bus_ticket.show(20)

+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+------+
|Bus_Key|            Bus_Name|Start_Date|           Route|Departure_Time|Arrival_Time|     Departure_Place|       Arrival_Place|Duration|            Type_Bus| Price|
+-------+--------------------+----------+----------------+--------------+------------+--------------------+--------------------+--------+--------------------+------+
|      1|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         10:00|       13:15|   văn phòng sài gòn| văn phòng công t...|    3.25|Huyndai Solati 11...|200000|
|      2|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         12:00|       15:15|   văn phòng sài gòn| văn phòng công t...|    3.25|Huyndai Solati 11...|200000|
|      3|   Vũ Linh limousine|2025-07-03|TP.HCM - Can Tho|         11:01|       14:16|   văn phòng sài gòn| văn phòng công t...|    3.25|Huyndai Solati 11...|200000|
|   

#### Data type

In [23]:
bus_ticket.printSchema()

root
 |-- Bus_Key: integer (nullable = false)
 |-- Bus_Name: string (nullable = true)
 |-- Start_Date: date (nullable = true)
 |-- Route: string (nullable = true)
 |-- Departure_Time: string (nullable = true)
 |-- Arrival_Time: string (nullable = true)
 |-- Departure_Place: string (nullable = true)
 |-- Arrival_Place: string (nullable = true)
 |-- Duration: float (nullable = true)
 |-- Type_Bus: string (nullable = true)
 |-- Price: integer (nullable = true)

