# BÀI TẬP THỰC HÀNH: PHÂN TÍCH VÀ XỬ LÝ DỮ LIỆU LỚN

## Phần I: Tiền xử lý dữ liệu

### Bước 1 - Import các thư viện cần thiết

In [1]:
# Import libraries (import các thư viện cần thiết)
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go

# Set up plotting style (định dạng biểu đồ)
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 10

### Bước 2 - Load dữ liệu

In [2]:
DATA_PATH = f"yellow_tripdata_2025-07.parquet"
ZONES_PATH = f"taxi_zone_lookup.csv"

### Bước 3 - Khởi tạo spark session

In [3]:
# Initialize Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("NYC_Taxi_Analysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

print("Spark session created successfully!")
print(f"Spark version: {spark.version}")
print(f"Python version: {sys.version}")

Spark session created successfully!
Spark version: 3.5.1
Python version: 3.13.7 | packaged by Anaconda, Inc. | (main, Sep  9 2025, 19:54:37) [MSC v.1929 64 bit (AMD64)]


### Bước 4 - Đọc dữ liệu

In [4]:
# Load taxi data (đọc dữ liệu taxi)
df = spark.read.parquet(DATA_PATH)
print(f"Loaded taxi data: {df.count():,} records")

# Load zones data (đọc dữ liệu khu vực)
zones = spark.read.csv(ZONES_PATH, header=True, inferSchema=True)
print(f"Loaded zones data: {zones.count()} zones")

# Show taxi data schema (lược đồ tập dữ liệu)
print("\nTaxi Data Schema:")
df.printSchema()

# Show sample data (xem thử vài mẫu dữ liệu)
print("\nSample Data:")
df.show(5, truncate=False)

Loaded taxi data: 3,898,963 records
Loaded zones data: 265 zones

Taxi Data Schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable 

### Bước 5 - Loại bỏ các mẫu dữ liệu chứa giá trị null

In [5]:
# Cleaning Step 1: Drop nulls in important columns
important_cols = ["passenger_count", "trip_distance", "PULocationID",
                 "DOLocationID", "fare_amount", "tpep_pickup_datetime"]

print("Cleaning data...")
print("Step 1: Dropping nulls in important columns...")
df_no_nulls = df.na.drop(subset=important_cols)
print(f"Remaining after dropna: {df_no_nulls.count():,} records")

Cleaning data...
Step 1: Dropping nulls in important columns...
Remaining after dropna: 2,860,208 records


### Bước 6 - Loại bỏ các mẫu dữ liệu fare_amounts không hợp lý

In [6]:
print("Step 2: Filtering reasonable fare_amounts (0 < fare < 200)...")
df_fare_filtered = df_no_nulls.filter(
    (df_no_nulls['fare_amount'] > 0) & (df_no_nulls['fare_amount'] < 200)
)

print(f"Remaining after fare filter: {df_fare_filtered.count():,} records")

Step 2: Filtering reasonable fare_amounts (0 < fare < 200)...
Remaining after fare filter: 2,780,802 records


### Bước 7 - Loại bỏ các mẫu dữ liệu trip_distance không hợp lý

In [7]:
print("Step 3: Filtering reasonable trip distances (0 < miles < 100)...")
df_clean = df_fare_filtered.filter(
    (df_fare_filtered['trip_distance'] > 0) & (df_fare_filtered['trip_distance'] < 100)
)

print(f"Remaining after trip_distance filter: {df_clean.count():,} records")

Step 3: Filtering reasonable trip distances (0 < miles < 100)...
Remaining after trip_distance filter: 2,746,747 records


## Phần II: Bài tập thực hành

## Câu 1 - Tạo bảng dữ liệu chuẩn + Tối ưu join

### Mục tiêu: 
Luyện DataFrame + join + tối ưu hiệu năng, không trùng phần ML.

### Yêu cầu:
- Tạo cột mới:
   - trip_duration_min = chênh lệch tpep_dropoff_datetime - tpep_pickup_datetime (phút)
   - avg_speed_kmh = trip_distance / (trip_duration_min/60)

- Lọc dữ liệu hợp lý (tự đề xuất ngưỡng cho duration và speed) và giải thích vì sao chọn ngưỡng (2–4 dòng).

- Join với taxi_zone_lookup.csv để lấy tên vùng cho cả pickup & dropoff.

- So sánh 2 cách join:
   - join thường
   - broadcast join (khi join với zone lookup)


### 1.1. Tạo cột mới

In [8]:
df_clean = df_clean.withColumn(
    'trip_duration_min', 
    (col('tpep_dropoff_datetime') - col('tpep_pickup_datetime')).cast('long') / 60
).withColumn(
    'avg_speed_kmh',
    col('trip_distance') / (col('trip_duration_min') / 60)
)
print("Hoàn thành tạo cột mới!")

Hoàn thành tạo cột mới!


In [9]:
print("Dữ liệu sau khi tạo cột:")
df_clean.show(10, truncate=False)

Dữ liệu sau khi tạo cột:
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|trip_duration_min |avg_speed_kmh     |
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+------------------+-----------

### 1.2. Lọc dữ liệu hợp lý

1. trip_duration_min: giữ lại ngưỡng 1 - 300 phút

- Lý do:

   - Dưới 1 phút: Những chuyến dưới 1 phút là dữ liệu lỗi hoặc đặt taxi nhầm. Thực tế một chuyến taxi cần ít nhất vài phút để thực hiện.

   - Trên 300 phút (5 giờ): Chuyến đi quá dài hiếm khi xảy ra. Nguyên nhân cho những chuyến taxi quá dài thường là do dữ liệu GPS bị gián đoạn hoặc các giá trị bất thường cần loại bỏ.

2. avg_speed_kmh: giữ lại ngưỡng 0 - 60 km/h

- Lý do: Tốc độ ô tô được phép chạy trên đường phố đô thị thường bị giới hạn không quá 60 km/h.

In [10]:
print("Tiến hành lọc dữ liệu theo hai thuộc tính: `trip_duration_min` và `avg_speed_kmh`:")
df_filtered1 = df_clean.filter(
    (df_clean['trip_duration_min'] > 1) & (df_clean['trip_duration_min'] < 300)
)
df_filtered = df_filtered1.filter(
    (df_filtered1['avg_speed_kmh'] > 0) & (df_filtered1['avg_speed_kmh'] < 60)
)
print(f"Dữ liệu sau khi lọc: {df_filtered.count():,} dòng")

Tiến hành lọc dữ liệu theo hai thuộc tính: `trip_duration_min` và `avg_speed_kmh`:
Dữ liệu sau khi lọc: 2,680,690 dòng


### 1.3. Join với taxi_zone_lookup.csv để lấy tên vùng cho cả pickup & dropoff.


So sánh 2 cách join:
   - join thường
   - broadcast join (khi join với zone lookup)

#### 1.3.1. Join thường

In [11]:
# Sử dụng alias để tránh ambiguous columns
zones_pu = zones.withColumnRenamed("LocationID", "PULocationID") \
                 .withColumnRenamed("Zone", "PU_Zone") \
                 .withColumnRenamed("Borough", "PU_Borough")

zones_do = zones.withColumnRenamed("LocationID", "DOLocationID") \
                 .withColumnRenamed("Zone", "DO_Zone") \
                 .withColumnRenamed("Borough", "DO_Borough") 

df_joined_regular = df_filtered.join(zones_pu, on="PULocationID", how="left")
df_joined_regular = df_joined_regular.join(zones_do, on="DOLocationID", how="left")

print(f"Dữ liệu sau khi gộp: {df_joined_regular.count():,} dòng")
df_joined_regular.select("PULocationID", "PU_Zone", "DOLocationID", "DO_Zone", 
                         "trip_duration_min", "avg_speed_kmh").show(5, truncate=False)

Dữ liệu sau khi gộp: 2,680,690 dòng
+------------+-----------------+------------+-----------------------------+------------------+------------------+
|PULocationID|PU_Zone          |DOLocationID|DO_Zone                      |trip_duration_min |avg_speed_kmh     |
+------------+-----------------+------------+-----------------------------+------------------+------------------+
|138         |LaGuardia Airport|74          |East Harlem North            |15.883333333333333|27.57607555089192 |
|132         |JFK Airport      |142         |Lincoln Square East          |44.266666666666666|23.990963855421686|
|138         |LaGuardia Airport|48          |Clinton East                 |33.36666666666667 |17.946053946053947|
|138         |LaGuardia Airport|229         |Sutton Place/Turtle Bay North|17.1              |36.03508771929824 |
|211         |SoHo             |97          |Fort Greene                  |14.533333333333333|12.137614678899082|
+------------+-----------------+------------+-------

#### 1.3.2. Broadcast join

In [12]:
from pyspark.sql.functions import broadcast

df_joined_broadcast = df_filtered.join(broadcast(zones_pu), on="PULocationID", how="left") \
                                  .join(broadcast(zones_do), on="DOLocationID", how="left")

print(f"Dữ liệu sau khi gộp: {df_joined_broadcast.count():,} dòng")
df_joined_broadcast.select("PULocationID", "PU_Zone", "DOLocationID", "DO_Zone", 
                           "trip_duration_min", "avg_speed_kmh").show(5, truncate=False)

Dữ liệu sau khi gộp: 2,680,690 dòng
+------------+-----------------+------------+-----------------------------+------------------+------------------+
|PULocationID|PU_Zone          |DOLocationID|DO_Zone                      |trip_duration_min |avg_speed_kmh     |
+------------+-----------------+------------+-----------------------------+------------------+------------------+
|138         |LaGuardia Airport|74          |East Harlem North            |15.883333333333333|27.57607555089192 |
|132         |JFK Airport      |142         |Lincoln Square East          |44.266666666666666|23.990963855421686|
|138         |LaGuardia Airport|48          |Clinton East                 |33.36666666666667 |17.946053946053947|
|138         |LaGuardia Airport|229         |Sutton Place/Turtle Bay North|17.1              |36.03508771929824 |
|211         |SoHo             |97          |Fort Greene                  |14.533333333333333|12.137614678899082|
+------------+-----------------+------------+-------

### So sánh 2 cách join
#### 1. Regular join
Đây là cơ chế mặc định khi join hai bảng lớn với nhau, bao gồm các bước:
- Shuffle: Chuyển dữ liệu giữa các máy (nodes) sao cho các bản ghi có cùng key nằm trên cùng một máy. 

- Sort: Dữ liệu trên mỗi máy được sắp xếp theo key.

- Merge: Spark đi qua hai danh sách đã sắp xếp và gộp chúng lại.

#### 2. Broadcast join
Đây là kỹ thuật tối ưu hóa khi join một bảng lớn với một bảng rất nhỏ, bao gồm các bước:
- Broadcast: Copy toàn bộ bảng nhỏ và gửi một bản sao đến mọi máy.

- Local join: Mỗi máy giữ một phần của bảng lớn và đã có sẵn bảng nhỏ. Nó tự thực hiện join trên bộ nhớ của chính nó mà không cần gửi dữ liệu sang máy khác.

#### Kết luận: Dùng broadcast join cho zones vì dữ liệu zones nhỏ.

## Câu 2 -  (EDA nâng cao) — Top tuyến đường theo “khung giờ” + biến động theo ngày trong tuần

### Yêu cầu: 
1. Tạo hour và day_of_week. 

2. Chia ngày thành 4 khung giờ:
0–5 (đêm), 6–10 (sáng), 11–16 (trưa/chiều), 17–23 (tối) 

3. Với mỗi khung giờ, tìm Top 10 tuyến (pickup_zone → dropoff_zone) theo số 
chuyến. 
(Gợi ý: groupBy pickup+dropoff + count, rồi lọc theo khung giờ) 

4. Thêm 1 phân tích “chất lượng tuyến”: 
○ Tính median(trip_duration_min) và median(avg_speed_kmh) cho từng tuyến 
trong Top 10 của mỗi khung giờ. 

5. Viết 5–8 dòng nhận xét: tuyến nào lặp lại nhiều nhất giữa các khung giờ, và tốc độ/độ 
dài chuyến thay đổi ra sao.

### 2.1. Tạo `hour` và `day_of_week`

In [13]:
# Tạo cột hour và day_of_week - GHI ĐÈ vào df_add
df_add = df_joined_broadcast.withColumn("hour", hour("tpep_pickup_datetime")) \
                              .withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))

print("Đã tạo hai thuộc tính: `hour` và `day_of_week`!")

Đã tạo hai thuộc tính: `hour` và `day_of_week`!


### 2.2. Chia ngày thành 4 khung giờ: 
- 0–5 (đêm)
- 6–10 (sáng)
- 11–16 (trưa/chiều)
- 17–23 (tối) 

In [14]:
# Tạo cột time_period dựa trên hour
df_with_period = df_add.withColumn(
    "time_period",
    when((col("hour") >= 0) & (col("hour") <= 5), "Đêm")
    .when((col("hour") >= 6) & (col("hour") <= 10), "Sáng")
    .when((col("hour") >= 11) & (col("hour") <= 16), "Trưa/Chiều")
    .when((col("hour") >= 17) & (col("hour") <= 23), "Tối")
    .otherwise("Unknown")
)

print("Dữ liệu sau khi thêm cột time_period:")
df_with_period.select("hour", "time_period", "PU_Zone", "DO_Zone").show(10, truncate=False)

Dữ liệu sau khi thêm cột time_period:
+----+-----------+-----------------------+-----------------------------+
|hour|time_period|PU_Zone                |DO_Zone                      |
+----+-----------+-----------------------+-----------------------------+
|0   |Đêm        |LaGuardia Airport      |East Harlem North            |
|0   |Đêm        |JFK Airport            |Lincoln Square East          |
|0   |Đêm        |LaGuardia Airport      |Clinton East                 |
|0   |Đêm        |LaGuardia Airport      |Sutton Place/Turtle Bay North|
|0   |Đêm        |SoHo                   |Fort Greene                  |
|0   |Đêm        |JFK Airport            |Marine Park/Mill Basin       |
|0   |Đêm        |East Village           |Yorkville West               |
|0   |Đêm        |Lenox Hill East        |Yorkville East               |
|0   |Đêm        |Greenwich Village South|Clinton West                 |
|0   |Đêm        |JFK Airport            |Richmond Hill                |
+----+-------

### 2.3. Với mỗi khung giờ, tìm Top 10 tuyến (pickup_zone → dropoff_zone) theo số chuyến. 


In [15]:
# Lấy danh sách các khung giờ
time_periods = ["Đêm", "Sáng", "Trưa/Chiều", "Tối"]

# Tạo dictionary để lưu Top 10 tuyến cho mỗi khung giờ
top_routes_by_period = {}

for period in time_periods:
    print(f"Top 10 tuyến trong khung giờ: {period}")
    print(f"{'='*80}")
    
    # Filter dữ liệu theo khung giờ
    df_period = df_with_period.filter(col("time_period") == period)
    
    top_10 = df_period.groupBy("PU_Zone", "DO_Zone") \
                      .count() \
                      .withColumnRenamed("count", "num_trips") \
                      .orderBy(desc("num_trips")) \
                      .limit(10)
    
    # Lưu vào dictionary
    top_routes_by_period[period] = top_10
    
    # Hiển thị kết quả
    top_10.show()
    
    print(f"\nTổng chuyến trong khung giờ {period}: {df_period.count():,}\n")

Top 10 tuyến trong khung giờ: Đêm
+--------------------+--------------------+---------+
|             PU_Zone|             DO_Zone|num_trips|
+--------------------+--------------------+---------+
|         JFK Airport|      Outside of NYC|     1246|
|        West Village|        East Village|      787|
|     Lower East Side|        East Village|      759|
|        East Village|         Murray Hill|      706|
|        East Village|            Gramercy|      654|
|        East Village|        East Village|      647|
|         JFK Airport|Flushing Meadows-...|      617|
|        East Village|            Kips Bay|      602|
|        East Village|     Lower East Side|      595|
|Greenwich Village...|        East Village|      546|
+--------------------+--------------------+---------+


Tổng chuyến trong khung giờ Đêm: 187,187

Top 10 tuyến trong khung giờ: Sáng
+--------------------+--------------------+---------+
|             PU_Zone|             DO_Zone|num_trips|
+--------------------+-

### 2.4. Thêm 1 phân tích “chất lượng tuyến”: 
Tính median (trip_duration_min) và median (avg_speed_kmh) cho từng tuyến 
trong Top 10 của mỗi khung giờ. 

In [16]:
# Lưu chất lượng chuyến của mỗi khung giờ
quality_routes_by_period = {}

for period in time_periods:
    print(f"\nChất lượng tuyến - Khung giờ: {period}")
    print(f"{'='*80}")
    
    df_period = df_with_period.filter(col("time_period") == period)
    
    # Lấy Top 10 tuyến và tính chất lượng (thêm median_distance)
    quality = df_period.groupBy("PU_Zone", "DO_Zone") \
        .agg(
            count("*").alias("num_trips"),
            round(percentile_approx("trip_duration_min", 0.5), 2).alias("median_duration"),
            round(percentile_approx("avg_speed_kmh", 0.5), 2).alias("median_speed"),
            round(percentile_approx("trip_distance", 0.5), 2).alias("median_distance")
        ) \
        .orderBy(desc("num_trips")) \
        .limit(10)
    
    quality_routes_by_period[period] = quality
    quality.show(10, truncate=False)


Chất lượng tuyến - Khung giờ: Đêm
+-----------------------+----------------------------+---------+---------------+------------+---------------+
|PU_Zone                |DO_Zone                     |num_trips|median_duration|median_speed|median_distance|
+-----------------------+----------------------------+---------+---------------+------------+---------------+
|JFK Airport            |Outside of NYC              |1246     |24.52          |35.98       |14.28          |
|West Village           |East Village                |787      |9.0            |8.23        |1.21           |
|Lower East Side        |East Village                |759      |6.12           |8.32        |0.86           |
|East Village           |Murray Hill                 |706      |7.62           |12.82       |1.6            |
|East Village           |Gramercy                    |654      |5.27           |10.84       |0.93           |
|East Village           |East Village                |647      |4.43           |8.32 

### 2.5. Phân tích tuyến nào lặp lại nhiều nhất giữa các khung giờ, và tốc độ/độ dài chuyến thay đổi ra sao.

#### 2.5.1. Phân tích các chỉ số của từng khung giờ 

In [17]:
# Tạo bảng tóm tắt các chỉ số chính cho mỗi khung giờ
print("\n=== BẢNG TÓM TẮT CÁC CHỈ SỐ CHO TỪNG KHUNG GIỜ ===\n")
print(f"{'Khung giờ':<15} {'Tổng chuyến':<15} {'Median thời gian (phút)':<25} {'Median tốc độ (km/h)':<23} {'Median khoảng cách (km)':<25}")
print("-" * 103)

all_routes = {}
for period in time_periods:
    df_pandas = quality_routes_by_period[period].toPandas()
    all_routes[period] = df_pandas
    
    total_trips = df_pandas['num_trips'].sum()
    median_duration = df_pandas['median_duration'].mean()
    median_speed = df_pandas['median_speed'].mean()
    median_distance = df_pandas['median_distance'].mean()
    
    print(f"{period:<15} {total_trips:<15,.0f} {median_duration:<25.2f} {median_speed:<23.2f} {median_distance:<25.2f}")


=== BẢNG TÓM TẮT CÁC CHỈ SỐ CHO TỪNG KHUNG GIỜ ===

Khung giờ       Tổng chuyến     Median thời gian (phút)   Median tốc độ (km/h)    Median khoảng cách (km)  
-------------------------------------------------------------------------------------------------------
Đêm             7,159           8.99                      15.07                   3.07                     
Sáng            24,442          7.34                      8.68                    1.09                     
Trưa/Chiều      47,420          8.57                      7.05                    0.96                     
Tối             36,070          13.88                     12.30                   4.25                     


#### 2.5.2. Tìm chuyến lặp lại nhiều nhất trong các khung giờ

In [18]:
route_frequency = {}
for period, df_pandas in all_routes.items():
    for _, row in df_pandas.iterrows():
        route_key = f"{row['PU_Zone']} → {row['DO_Zone']}"
        if route_key not in route_frequency:
            route_frequency[route_key] = {"count": 0, "total_trips": 0}
        route_frequency[route_key]["count"] += 1
        route_frequency[route_key]["total_trips"] += row['num_trips']

sorted_routes = sorted(route_frequency.items(), key=lambda x: x[1]["count"], reverse=True)

print(f"{'Tuyến đường':<50} {'Xuất hiện':<15} {'Tổng chuyến':<15}")
print("-" * 80)
for idx, (route, info) in enumerate(sorted_routes[:5], 1):
    print(f"{route:<50} {info['count']}/4{'':<12} {info['total_trips']:<15,.0f}")

Tuyến đường                                        Xuất hiện       Tổng chuyến    
--------------------------------------------------------------------------------
Upper East Side North → Upper East Side South      3/4             13,242         
Upper East Side South → Upper East Side North      3/4             16,763         
Upper East Side South → Upper East Side South      3/4             12,188         
Upper East Side North → Upper East Side North      3/4             9,605          
JFK Airport → Outside of NYC                       2/4             4,848          


### Phân tích
#### Tuyến lặp lại nhiều nhất: Upper East Side South ↔ North
Tuyến này xuất hiện ở 3/4 khung giờ (sáng, trưa/chiều, tối) với tổng số chuyến lớn nhất. Điều này cho thấy khu Upper East Side có lượng giao thông nội bộ rất cao, phản ánh tính chất dân cư đông đúc và nhu cầu đi lại hàng ngày của khu vực.

#### Đêm có tốc độ cao nhất (15.07 km/h)
Mặc dù lượng chuyến ít nhất (7,159 chuyến), đêm lại có tốc độ cao nhất. Nguyên nhân chính có thể do giao thông thưa thớt nên các xe chạy nhanh hơn.

#### Trưa/Chiều có tốc độ thấp nhất (7.05 km/h) 
Khung giờ 11-16 xử lý 47,420 chuyến, nhiều nhất trong tất cả các khung giờ, tuy nhiên lại có tốc độ chậm nhất. Điều này phản ánh tình trạng tắc nghẽn tại NYC tại giờ cao điểm.

#### Đêm có độ dài chuyến đi dài nhất
Điều này ngược lại với Trưa/Chiều (độ dài chuyến đi ngắn nhất)

## Câu 3 - Phân loại: “chuyến dài” hay “chuyến ngắn” 


### Yêu cầu:
- Tạo label nhị phân is_long_trip: 
   - is_long_trip = 1 nếu trip_duration_min >= P75 (tự tính percentile 75) 
   - ngược lại 0 
- Model: LogisticRegression hoặc RandomForestClassifier.

- Báo cáo: Accuracy + F1 + Confusion Matrix (Spark ML evaluator). 
Bonus (+ điểm): làm “error analysis” theo borough: nhóm theo borough pickup và xem 
RMSE/F1 chênh lệch thế nào (2–4 nhận xét).

### 3.1. Thêm các đặc trưng cần thiết cho dự đoán

Mô hình sẽ sử dụng tất cả các đặc trưng để dự đoán biến mục tiêu `trip_duration_min`, bao gồm cả các đặc trưng như `hour`, `day_of_week` và `is_weekend`.

In [19]:
df_features = (
    df_filtered
    .withColumn("hour", hour("tpep_pickup_datetime"))
    .withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))
    .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), 1).otherwise(0))
)

# Quick peek (xem nhanh) các đặc trưng mới
print("Sample of engineered features:")
df_features.select(
    "hour", "day_of_week", "is_weekend",
    "passenger_count", "trip_distance", "fare_amount"
).show(5)

Sample of engineered features:
+----+-----------+----------+---------------+-------------+-----------+
|hour|day_of_week|is_weekend|passenger_count|trip_distance|fare_amount|
+----+-----------+----------+---------------+-------------+-----------+
|   0|          3|         0|              1|          7.3|       29.6|
|   0|          3|         0|              1|         17.7|       70.0|
|   0|          3|         0|              1|         9.98|       43.6|
|   0|          3|         0|              1|        10.27|       38.7|
|   0|          3|         0|              1|         2.94|       17.0|
+----+-----------+----------+---------------+-------------+-----------+
only showing top 5 rows



#### 3.2. Tạo label nhị phân `is_long_trip` + chuẩn bị dữ liệu

In [20]:
# Tính P75 của trip_duration_min
p75_duration = df_features.approxQuantile("trip_duration_min", [0.75], 0.01)[0]
print(f"P75 của trip_duration_min: {p75_duration:.2f} phút")

# Tạo label nhị phân
df_with_label = df_features.withColumn(
    "is_long_trip",
    when(col("trip_duration_min") >= p75_duration, 1).otherwise(0)
)

P75 của trip_duration_min: 20.65 phút


#### 3.3. Xây dựng ML Pipeline cho classification - Logistic Regression

In [21]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline

print("Building Classification Pipeline...")

categorical_cols = ["VendorID", "RatecodeID", "store_and_fwd_flag", "payment_type", "PULocationID", "DOLocationID"]

indexer = [
    StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
    for col in categorical_cols
]

encoder = [
    OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec")
    for col in categorical_cols
]

# 2) VectorAssembler - gộp các features
all_features = [
    "passenger_count", "trip_distance", "hour", "day_of_week", "is_weekend", 
    "extra", "mta_tax", "improvement_surcharge", "congestion_surcharge", "tolls_amount", # Các phí thành phần
    "VendorID_vec", "RatecodeID_vec", "store_and_fwd_flag_vec", "PULocationID_vec", "DOLocationID_vec", "payment_type_vec"
]


assembler = VectorAssembler(
    inputCols=all_features,
    outputCol="features",
    handleInvalid="skip"
)

lr_classifier = LogisticRegression(
    featuresCol="features",
    labelCol="is_long_trip",
    maxIter=100,
    regParam=0.1
)

# 4) Pipeline: Index -> Encode -> Assemble -> Model
all_stages = indexer + encoder + [assembler, lr_classifier]
classification_pipeline = Pipeline(stages=all_stages)

Building Classification Pipeline...


### 3.4. Chia tập Train/Test

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# 1) Split data: 80% train, 20% test
train_data, test_data = df_with_label.randomSplit([0.8, 0.2], seed=42)
print(f"Training set size: {train_data.count():,}")
print(f"Test set size: {test_data.count():,}")

Training set size: 2,145,334
Test set size: 535,356


### 3.5. Huấn luyện mô hình

In [23]:
# 2) Fit model trên training data
print("\nTiến hành huấn luyện...")
classification_model = classification_pipeline.fit(train_data)
print("Huấn luyện mô hình thành công!")


Tiến hành huấn luyện...
Huấn luyện mô hình thành công!


### 3.6. Kiểm nghiệm mô hình trên tập Test

In [24]:
# 3) Dự đoán trên test data
predictions = classification_model.transform(test_data)
predictions.select("trip_duration_min", "is_long_trip", "probability", "prediction").show(5)

+------------------+------------+--------------------+----------+
| trip_duration_min|is_long_trip|         probability|prediction|
+------------------+------------+--------------------+----------+
|              6.55|           0|[0.83086324741332...|       0.0|
|20.083333333333332|           0|[0.50454481325821...|       0.0|
| 4.533333333333333|           0|[0.87500554035109...|       0.0|
|11.133333333333333|           0|[0.84561287067516...|       0.0|
|              16.3|           0|[0.70695959099373...|       0.0|
+------------------+------------+--------------------+----------+
only showing top 5 rows



### 3.7. Đánh giá mô hình theo Accuracy + F1 + Confusion Matrix

In [26]:
# 4) Evaluate: Accuracy + F1
multiclass_evaluator = MulticlassClassificationEvaluator(
    labelCol="is_long_trip",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = multiclass_evaluator.evaluate(predictions)
print(f"\nAccuracy: {accuracy:.4f}")

f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="is_long_trip",
    predictionCol="prediction",
    metricName="f1"
)
f1 = f1_evaluator.evaluate(predictions)
print(f"F1 Score: {f1:.4f}")

# 5) Confusion Matrix
from pyspark.sql.functions import col

# Cache predictions
predictions.cache()

# Tính TP, FP, FN, TN dùng SQL
tp = predictions.filter((col("prediction") == 1) & (col("is_long_trip") == 1)).count()
fp = predictions.filter((col("prediction") == 1) & (col("is_long_trip") == 0)).count()
fn = predictions.filter((col("prediction") == 0) & (col("is_long_trip") == 1)).count()
tn = predictions.filter((col("prediction") == 0) & (col("is_long_trip") == 0)).count()

print("\nConfusion Matrix:")
print(f"   [[{int(tn)}, {int(fp)}],")
print(f"    [{int(fn)}, {int(tp)}]]")


Accuracy: 0.8598
F1 Score: 0.8455

Confusion Matrix:
   [[391358, 8499],
    [66533, 68966]]


#### Lưu bảng metrics (F1 Score + Accuracy + Confusion Matrix)

In [27]:
# Lưu bảng F1 score và accuracy
metrics_data = {
    "Metric": ["Accuracy", "F1 Score"],
    "Value": [f"{accuracy:.4f}", f"{f1:.4f}"]
}

# Lưu bảng confusion matrix
df_metrics = pd.DataFrame(metrics_data)
metrics_output_path = "metrics_summary.csv"
df_metrics.to_csv(metrics_output_path, index=False, encoding='utf-8-sig')

confusion_matrix_data = {
    "Actual/Predicted": ["Short Trip (0)", "Long Trip (1)"],
    "Short Trip (0)": [int(tn), int(fn)],
    "Long Trip (1)": [int(fp), int(tp)]
}

df_cm = pd.DataFrame(confusion_matrix_data)
cm_output_path = "confusion_matrix.csv"
df_cm.to_csv(cm_output_path, index=False, encoding='utf-8-sig')

### Bonus - Error Analysis theo Borough Pickup

In [28]:
zones_for_borough = zones.withColumnRenamed("LocationID", "PULocationID") \
                          .select("PULocationID", "Borough") \
                          .withColumnRenamed("Borough", "PU_Borough")

predictions_with_borough = predictions.join(zones_for_borough, on="PULocationID", how="left") \
                                      .filter(col("PU_Borough").isNotNull())

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator

borough_list = predictions_with_borough.select("PU_Borough").distinct().collect()
borough_performance = []

for row in borough_list:
    borough_name = row['PU_Borough']
    df_borough = predictions_with_borough.filter(col("PU_Borough") == borough_name)
    
    # Tính F1 Score
    f1_eval = MulticlassClassificationEvaluator(
        labelCol="is_long_trip",
        predictionCol="prediction",
        metricName="f1"
    )
    borough_f1 = f1_eval.evaluate(df_borough)
    
    # Tính RMSE:
    rmse_eval = RegressionEvaluator(
        labelCol="is_long_trip",
        predictionCol="prediction",
        metricName="rmse"
    )
    borough_rmse = rmse_eval.evaluate(df_borough)
    
    borough_count = df_borough.count()
    
    borough_performance.append({
        "Borough": borough_name,
        "F1 Score": borough_f1,
        "RMSE": borough_rmse,
        "Sample Count": borough_count
    })

# Chuyển thành pandas DataFrame để hiển thị đẹp
import pandas as pd
df_perf = pd.DataFrame(borough_performance)
df_perf = df_perf.sort_values("F1 Score", ascending=False)

print(df_perf.to_string(index=False))

      Borough  F1 Score     RMSE  Sample Count
          EWR  1.000000 0.000000            12
       Queens  0.859885 0.361021         57858
        Bronx  0.849835 0.366983          1886
      Unknown  0.837755 0.387119          1081
    Manhattan  0.822810 0.374948        467529
          N/A  0.813114 0.431595           102
     Brooklyn  0.804304 0.438937          6872
Staten Island  0.750000 0.500000            16


### Nhận xét:

- EWR (12 chuyến) có F1 score = 1.0000 và RMSE = 0.0000 là kết quả quá lý tưởng, không đáng tin cậy vì sample quá nhỏ.

- Các borough có dưới 100 chuyến không nên dùng để đánh giá hiệu năng thực do lượng dữ liệu quá nhỏ, mô hình dễ overfit.

- Khu vực EWR có RMSE = 0.0000 là quá khả quan (do sample quá nhỏ). Trong khi đó, Staten Island có RMSE = 0.5000 (lỗi lớn, mô hình dự đoán kém). Điều này cho thấy model không tổng quát hóa tốt cho tất cả các Borough.

### Kết Luận

Bài tập hoàn thành pipeline ML end-to-end trên dữ liệu taxi NYC với 3 phần chính: 
1. Tiền xử lý dữ liệu.

2. EDA phân tích top tuyến xe theo khung giờ và khám phá các chất lượng chuyến của từng khung giờ.

3. Xây dựng mô hình Logistic Regression với Accuracy 86% và F1 84% để dự đoán biến `trip_duration_min` 
Kết quả chứng minh hiệu quả của broadcast join trong tối ưu hóa join, cũng như tầm quan trọng của error analysis để phát hiện bias đối với dữ liệu nhỏ và kết quả thuật toán không đồng đều giữa các Borough, chứng minh những hạn chế của mô hình hiện tại.