In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, year, unix_timestamp, col, udf
from pyspark.sql.types import DoubleType, IntegerType, TimestampType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import math

In [4]:
spark = SparkSession.builder \
    .appName("Taxi Fare Prediction") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .getOrCreate()


In [5]:

csv_files_path = "hdfs://localhost:9000/Bigdata/*.csv"
combined_df = spark.read.csv(csv_files_path, header=True, inferSchema=True)


In [6]:
combined_df.count()

73050975

In [7]:
combined_df = combined_df.withColumn("passenger_count", combined_df["passenger_count"].cast(IntegerType())) \
    .withColumn("trip_distance", combined_df["trip_distance"].cast(DoubleType())) \
    .withColumn("pickup_longitude", combined_df["pickup_longitude"].cast(DoubleType())) \
    .withColumn("pickup_latitude", combined_df["pickup_latitude"].cast(DoubleType())) \
    .withColumn("dropoff_longitude", combined_df["dropoff_longitude"].cast(DoubleType())) \
    .withColumn("dropoff_latitude", combined_df["dropoff_latitude"].cast(DoubleType())) \
    .withColumn("fare_amount", combined_df["fare_amount"].cast(DoubleType())) \
    .withColumn("tpep_pickup_datetime", combined_df["tpep_pickup_datetime"].cast(TimestampType())) \
    .withColumn("tpep_dropoff_datetime", combined_df["tpep_dropoff_datetime"].cast(TimestampType()))


In [8]:
# Chọn các cột cần thiết
df = combined_df.select("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", 
               "trip_distance", "pickup_longitude", "pickup_latitude", 
               "RatecodeID", "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount")


In [9]:
# Thêm cột thời gian chuyến đi, giờ đón, và năm
df = df.withColumn("trip_duration", 
                   (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60) \
       .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
       .withColumn("pickup_year", year("tpep_pickup_datetime"))

In [10]:
# Hàm Haversine tính khoảng cách giữa hai điểm dựa trên kinh độ và vĩ độ
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0  # Bán kính trái đất (km)
    
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    
    a = math.sin(dlat / 2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    
    distance = R * c
    return distance


In [11]:
# Tạo UDF (User Defined Function) để tính khoảng cách
haversine_udf = udf(haversine, DoubleType())


In [12]:
# Áp dụng hàm Haversine để tính khoảng cách giữa điểm đón và trả khách
df = df.withColumn("distance_haversine", 
                   haversine_udf(col("pickup_latitude"), col("pickup_longitude"), 
                                 col("dropoff_latitude"), col("dropoff_longitude")))

# Chọn cột để dự đoán
selected_columns = ["passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", 
                   "dropoff_longitude", "dropoff_latitude", "trip_duration", "pickup_hour", 
                   "pickup_year", "distance_haversine"]


In [13]:

# Xây dựng Vector Assembler với cột mới
vector_assembler = VectorAssembler(inputCols=selected_columns, outputCol="features")
data = vector_assembler.transform(df)


In [14]:
from pyspark.sql import functions as F
# Xáo trộn dữ liệu
data = data.orderBy(F.rand())

# Chia tập dữ liệu thành tập train và test
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

train_data.cache()
test_data.cache()
# Xây dựng mô hình Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="fare_amount", 
                           numTrees=100, maxDepth=10, minInstancesPerNode=4,featureSubsetStrategy="sqrt"
                           )

# Huấn luyện mô hình
model = rf.fit(train_data)

In [15]:
# Dự đoán trên tập kiểm tra
predictions = model.transform(test_data)

# Hiển thị một số dự đoán
predictions.select("fare_amount", "prediction").show(10)

# Đánh giá độ chính xác mô hình
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")

# Tính toán độ chính xác (RMSE)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


+-----------+------------------+
|fare_amount|        prediction|
+-----------+------------------+
|       15.5|15.485866171693132|
|        2.5|13.607108843983537|
|       18.7|19.977570323979986|
|       28.5| 31.81938474843964|
|       15.5|15.525637311764033|
|        9.0|    9.446476202003|
|        8.5| 8.400292167161938|
|        8.5|  8.64997674930264|
|       19.5|18.966954890177295|
|       15.5|15.474539573711981|
+-----------+------------------+
only showing top 10 rows

Root Mean Squared Error (RMSE): 69.18726898822794


In [17]:
# Đặt đường dẫn để lưu mô hình (thư mục cục bộ)
local_model_path = "file:///C:/Model_Taxi/taxi_fare_rf_model"

# Lưu mô hình
model.save(local_model_path)


In [18]:
from pyspark.ml.regression import RandomForestRegressionModel

# Đường dẫn tới mô hình đã lưu
model_path = "file:///C:/Model_Taxi/taxi_fare_rf_model"

# Tải mô hình
loaded_model = RandomForestRegressionModel.load(model_path)

# Sử dụng mô hình đã tải để dự đoán
new_predictions = loaded_model.transform(test_data)

# Hiển thị kết quả dự đoán
new_predictions.show(10)


+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+-----------------+------------+-----------+------------------+-----------+-----------+------------------+--------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID| dropoff_longitude| dropoff_latitude|payment_type|fare_amount|     trip_duration|pickup_hour|pickup_year|distance_haversine|            features|        prediction|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+-----------------+------------+-----------+------------------+-----------+-----------+------------------+--------------------+------------------+
|       1| 2015-01-01 00:07:37|  2015-01-01 00:28:59|              1|          3.4|-74.00212860107422| 40.739

In [14]:
spark.stop()
