
# Bài thực hành ôn tập: Phân tích dữ liệu NYC Yellow Taxi
## Giới thiệu
Bộ dữ liệu New York City Yellow Taxi chứa thông tin chi tiết về các chuyến đi taxi tại NYC. Mục tiêu của bài thực hành này là sử dụng Apache Spark để phân tích, làm sạch, trực quan hóa và xây dựng các mô hình học máy trên bộ dữ liệu này.

## Yêu cầu
1. Nạp dữ liệu vào Spark DataFrame.
2. Xem thông tin, kiểm tra dữ liệu trống và ngoại lệ.
3. Trực quan hóa dữ liệu.
4. Phân tích tương quan.
5. Phân cụm dữ liệu.
6. Phân lớp và dự đoán.


In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, month, hour, dayofyear, count, avg, when, year, udf, dayofweek
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .getOrCreate()

print("SparkSession created successfully.")



## 1. Nạp dữ liệu vào Spark DataFrame
Chúng ta sẽ nạp dữ liệu từ 4 file CSV có sẵn.


In [None]:

files = [
    "2015_1_yellowtrip.csv",
    "2016_1_trip.csv",
    "2016_2_trip.csv",
    "2016_3_trip.csv"
]
existing_files = [f for f in files if os.path.exists(f)]

# Nạp dữ liệu
# Lưu ý: inferSchema=True có thể tốn thời gian nhưng tiện lợi để tự động nhận diện kiểu dữ liệu ban đầu
df = spark.read.csv(existing_files, header=True, inferSchema=False)

print(f"Đã nạp {df.count()} dòng dữ liệu.")
df.printSchema()



## 2-4. Làm sạch và Khám phá dữ liệu
Chúng ta sẽ chuyển đổi kiểu dữ liệu, xử lý giá trị trống và lọc các giá trị ngoại lệ.


In [None]:

# Chuyển đổi kiểu dữ liệu
double_cols = [
    'trip_distance', 'pickup_longitude', 'pickup_latitude', 
    'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 
    'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
    'improvement_surcharge', 'total_amount'
]
int_cols = ['passenger_count', 'RateCodeID', 'payment_type']

for c in double_cols:
    df = df.withColumn(c, col(c).cast(DoubleType()))
for c in int_cols:
    df = df.withColumn(c, col(c).cast(IntegerType()))

# Chuyển đổi thời gian (Format M/d/y H:m)
df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"), "M/d/y H:m"))
df = df.withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime"), "M/d/y H:m"))

# Kiểm tra dữ liệu trống
print("Kiểm tra dữ liệu trống:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Lọc dữ liệu ngoại lệ (Outliers)
# Theo vị trí: NYC (Lat: 40.4774 - 40.9176, Lon: -74.2591 - -73.7004)
# Theo logic: Số khách > 0, Khoảng cách > 0, Tiền > 0
df_clean = df.filter(
    (col("pickup_latitude").between(40.4774, 40.9176)) &
    (col("pickup_longitude").between(-74.2591, -73.7004)) &
    (col("dropoff_latitude").between(40.4774, 40.9176)) &
    (col("dropoff_longitude").between(-74.2591, -73.7004)) &
    (col("passenger_count") > 0) &
    (col("fare_amount") > 0) &
    (col("trip_distance") > 0)
)

# Thêm cột thời gian chuyến đi (phút) và giờ trong ngày
df_clean = df_clean.withColumn("trip_duration", 
    (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60)
df_clean = df_clean.withColumn("hour", hour("tpep_pickup_datetime"))

# Loại bỏ bất kỳ hàng nào còn chứa null để đảm bảo an toàn cho ML
df_clean = df_clean.dropna()

print(f"Số lượng dòng sau khi làm sạch: {df_clean.count()}")
df_clean.describe(['passenger_count', 'trip_distance', 'fare_amount', 'trip_duration']).show()



## 5. Trực quan hóa dữ liệu
Chúng ta sẽ tổng hợp dữ liệu bằng Spark và vẽ biểu đồ bằng Matplotlib/Seaborn.


In [None]:

# a. Theo địa điểm đón (Scatter plot đơn giản của tọa độ)
sample_df = df_clean.sample(False, 0.1).select("pickup_latitude", "pickup_longitude").toPandas()

plt.figure(figsize=(10, 6))
sns.scatterplot(x="pickup_longitude", y="pickup_latitude", data=sample_df, alpha=0.5, s=10)
plt.title("Phân bố địa điểm đón khách")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.show()


In [None]:

# b. Theo khoảng cách và tổng số tiền
dist_fare_pd = df_clean.select("trip_distance", "total_amount").sample(False, 0.1).toPandas()

plt.figure(figsize=(10, 6))
sns.scatterplot(x="trip_distance", y="total_amount", data=dist_fare_pd, alpha=0.5)
plt.title("Tương quan giữa Khoảng cách và Tổng tiền")
plt.xlabel("Khoảng cách (dặm)")
plt.ylabel("Tổng tiền ($)")
plt.show()


In [None]:

# c. Tổng tiền theo ngày trong năm
daily_revenue = df_clean.withColumn("day_of_year", dayofyear("tpep_pickup_datetime")) \
    .groupBy("day_of_year").sum("total_amount").orderBy("day_of_year").toPandas()

plt.figure(figsize=(12, 6))
sns.lineplot(x="day_of_year", y="sum(total_amount)", data=daily_revenue)
plt.title("Tổng doanh thu theo ngày trong năm")
plt.xlabel("Ngày trong năm")
plt.ylabel("Doanh thu ($)")
plt.show()


In [None]:

# d. Theo giờ trong ngày
hourly_trips = df_clean.groupBy("hour").count().orderBy("hour").toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(x="hour", y="count", data=hourly_trips, palette="viridis")
plt.title("Số lượng chuyến đi theo giờ trong ngày")
plt.xlabel("Giờ")
plt.ylabel("Số chuyến")
plt.show()



## 6. Tương quan dữ liệu
Tính toán hệ số tương quan giữa các biến số.


In [None]:

print("Hệ số tương quan:")
print(f"Passenger Count - Total Amount: {df_clean.stat.corr('passenger_count', 'total_amount')}")
print(f"Trip Distance - Total Amount: {df_clean.stat.corr('trip_distance', 'total_amount')}")
print(f"Trip Duration - Total Amount: {df_clean.stat.corr('trip_duration', 'total_amount')}")



## 7. Phân cụm dữ liệu (Clustering)
Sử dụng K-Means để phân nhóm theo các tiêu chí khác nhau.


In [None]:

# Hàm hỗ trợ chạy KMeans
def run_kmeans(df, input_cols, k=5, title="Clustering"):
    print(f"--- {title} ---")
    vec_assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
    kmeans = KMeans(featuresCol="scaledFeatures", k=k, seed=1)
    
    pipeline = Pipeline(stages=[vec_assembler, scaler, kmeans])
    model = pipeline.fit(df)
    predictions = model.transform(df)
    
    print("Cluster Centers:")
    centers = model.stages[-1].clusterCenters()
    for center in centers:
        print(center)
    return predictions

# a. Theo địa điểm đón (Lat/Lon)
print("a. Phân cụm theo địa điểm đón")
pred_loc = run_kmeans(df_clean, ["pickup_latitude", "pickup_longitude"], k=5, title="Location Clustering")

# Trực quan hóa (mẫu)
cluster_pd = pred_loc.select("pickup_latitude", "pickup_longitude", "prediction").sample(False, 0.1).toPandas()
plt.figure(figsize=(10, 6))
sns.scatterplot(x="pickup_longitude", y="pickup_latitude", hue="prediction", data=cluster_pd, palette="deep")
plt.title("Phân cụm địa điểm đón khách")
plt.show()


In [None]:

# b. Theo thời gian (Giờ trong ngày)
# Chúng ta sẽ phân cụm dựa trên giờ (hour) để tìm các nhóm giờ có đặc điểm giống nhau (ví dụ: sáng sớm, cao điểm, đêm khuya)
print("b. Phân cụm theo thời gian (Giờ)")
pred_time = run_kmeans(df_clean, ["hour"], k=3, title="Time Clustering")
pred_time.groupBy("prediction").agg(avg("hour"), count("*")).show()


In [None]:

# c. Theo số hành khách, khoảng cách, số tiền
print("c. Phân cụm theo đặc điểm chuyến đi")
pred_features = run_kmeans(df_clean, ["passenger_count", "trip_distance", "total_amount"], k=4, title="Feature Clustering")
pred_features.groupBy("prediction").agg(
    avg("passenger_count"), 
    avg("trip_distance"), 
    avg("total_amount"), 
    count("*")
).show()



## 8. Phân lớp dữ liệu (Classification)
Dự đoán phương thức thanh toán (Payment Type).


In [None]:

# a. Theo phương thức thanh toán (Payment_type)
# Sử dụng Logistic Regression

feature_cols = ["fare_amount", "trip_distance", "passenger_count", "trip_duration", "extra", "mta_tax"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
label_indexer = StringIndexer(inputCol="payment_type", outputCol="label")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
pipeline_lr = Pipeline(stages=[label_indexer, assembler, lr])

(trainingData, testData) = df_clean.randomSplit([0.7, 0.3], seed=42)
model_lr = pipeline_lr.fit(trainingData)
predictions_lr = model_lr.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_lr)
print(f"Độ chính xác (Accuracy) dự đoán Payment Type: {accuracy}")



## 8b & 8c. Dự đoán số tiền và thời gian (Regression)


In [None]:

# b. Dự đoán số tiền thanh toán (Fare Amount)
feature_cols_fare = ["trip_distance", "passenger_count", "trip_duration"]
assembler_fare = VectorAssembler(inputCols=feature_cols_fare, outputCol="features")

lr_fare = LinearRegression(featuresCol="features", labelCol="fare_amount")
pipeline_fare = Pipeline(stages=[assembler_fare, lr_fare])

model_fare = pipeline_fare.fit(trainingData)
predictions_fare = model_fare.transform(testData)

evaluator_reg = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
rmse_fare = evaluator_reg.evaluate(predictions_fare)
print(f"RMSE cho dự đoán Fare Amount: {rmse_fare}")


In [None]:

# c. Dự đoán thời gian chuyến đi (Trip Duration)
# Sử dụng distance và fare_amount để dự đoán duration
feature_cols_time = ["trip_distance", "fare_amount", "passenger_count"]
assembler_time = VectorAssembler(inputCols=feature_cols_time, outputCol="features")

lr_time = LinearRegression(featuresCol="features", labelCol="trip_duration")
pipeline_time = Pipeline(stages=[assembler_time, lr_time])

model_time = pipeline_time.fit(trainingData)
predictions_time = model_time.transform(testData)

rmse_time = evaluator_reg.evaluate(predictions_time)  # Reusing evaluator but it defaults to labelCol used in prediction?
# RegressionEvaluator needs explicit label col if it differs from default, or we create new one.
evaluator_time = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="rmse")
rmse_time = evaluator_time.evaluate(predictions_time)

print(f"RMSE cho dự đoán Trip Duration (phút): {rmse_time}")



## 9. Dự đoán thời điểm chuyến đi
Phân tích thời điểm có nhiều chuyến đi (cao điểm) và ít chuyến đi.


In [None]:

hourly_stats = df_clean.groupBy("hour").agg(count("*").alias("num_trips"), avg("trip_distance").alias("avg_distance")) \
    .orderBy("num_trips", ascending=False)

print("Top 5 khung giờ CAO ĐIỂM (nhiều chuyến đi nhất):")
hourly_stats.show(5)

print("Top 5 khung giờ THẤP ĐIỂM (ít chuyến đi nhất):")
hourly_stats.orderBy("num_trips", ascending=True).show(5)

print("Khung giờ có chuyến đi xa nhất (trung bình):")
hourly_stats.orderBy("avg_distance", ascending=False).show(5)
