## Mô tả bài làm 
- Sử dụng Spark’s Structured API (spark.ml) để xây dụng 1 Decision Tree Regressor
- Dùng dữ liệu New York City Taxi Trip Duration - train để traing cho mô hình và dùng dữ liệu file test để đưa ra dự đoán 

# Bài làm chi tiết

## 1. Chuẩn bị

- Thiết lập biến môi trường cho Spark

In [2]:
import findspark
findspark.init("/home/cuong/Downloads/spark-3.5.5-bin-hadoop3")

- Import các thư viện cần thiết

In [3]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, sum, when
from pyspark.sql.functions import round


- Khởi tạo park session

In [4]:
spark = SparkSession.builder \
    .appName("NYC Taxi Duration Prediction") \
    .getOrCreate()

25/05/09 23:03:16 WARN Utils: Your hostname, cuong-Vostro-3405 resolves to a loopback address: 127.0.1.1; using 10.0.146.23 instead (on interface wlp3s0)
25/05/09 23:03:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/09 23:03:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 2. Đọc dữ liệu training và tiền xử lý

- Đọc dữ liệu training từ file

In [5]:
df = spark.read.csv("hdfs:///user/cuong/nyc-taxi-trip-duration/train.csv", header = True, inferSchema = True)

                                                                                

- In Schema của dữ liệu

In [6]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)



In [7]:
df.count()

                                                                                

1458644

### Nhận xét: Dữ liệu có 11 features và 1458644 samples

- Xem một số mẫu dữ liệu 

In [8]:
df.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

- Kiểm tra giá trị null của dữ liệu

In [9]:
null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
null_counts.show()



+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+
| id|vendor_id|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|store_and_fwd_flag|trip_duration|
+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+
|  0|        0|              0|               0|              0|               0|              0|                0|               0|                 0|            0|
+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+



                                                                                

### Nhận xét: Không có bất kỳ giá trị null nào trong tập dữ liệu

- Xóa những dòng duplicate

In [10]:
df = df.dropDuplicates()
df.count()

                                                                                

1458644

### Nhận xét: Số lượng samples vẫn không đổi => tập dữ liệu không có duplicate

- Tạo assembler để làm đầu vào training cho mô hình 

In [11]:
num_cols = ['pickup_longitude', 'pickup_latitude','dropoff_longitude', 'dropoff_latitude']
assembler = VectorAssembler(inputCols= num_cols, outputCol = "features")
assembled_df = assembler.transform(df).select("features", 'trip_duration')
# Việc scaler không cần thiết trong trường hợp này
# scaler = MinMaxScaler(inputCol= 'features', outputCol = 'scaled_features')
# scaler_model = scaler.fit(assembled_df)
# scaler_df = scaler_model.transform(assembled_df)
# scaler_df.sample(withReplacement=False, fraction=0.01).show()
assembled_df.show()




+--------------------+-------------+
|            features|trip_duration|
+--------------------+-------------+
|[-73.976951599121...|          721|
|[-73.992500305175...|         1216|
|[-73.952377319335...|          709|
|[-73.996566772460...|          952|
|[-73.956291198730...|          430|
|[-73.987808227539...|          264|
|[-74.007629394531...|         1267|
|[-73.987991333007...|          610|
|[-73.988655090332...|          586|
|[-73.989646911621...|          250|
|[-73.974662780761...|          978|
|[-73.985511779785...|          867|
|[-73.973884582519...|         1266|
|[-73.870849609375...|         2129|
|[-73.990516662597...|          425|
|[-73.982238769531...|          983|
|[-73.985633850097...|         1511|
|[-73.986000061035...|         1358|
|[-73.952537536621...|          462|
|[-73.984542846679...|          816|
+--------------------+-------------+
only showing top 20 rows



                                                                                

## 3. Huấn luyện và đánh giá mô hình

- Chia tập dữ liệu thành 2 phần training và validation tỷ lệ 80-20

In [12]:
# training, validation = scaler_df.randomSplit([0.8, 0.2], seed = 42)
training, validation = assembled_df.randomSplit([0.8, 0.2], seed = 42)


- Khởi tạo và training cho mô hình

In [13]:
lr =  DecisionTreeRegressor(featuresCol = "features", labelCol = "trip_duration")
model = lr.fit(training)

                                                                                

- Xem xét mô hình một cách trực quan

In [14]:
print(model.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_497e617e0d14, depth=5, numNodes=63, numFeatures=4
  If (feature 0 <= -73.87484359741211)
   If (feature 2 <= -73.9160041809082)
    If (feature 3 <= 40.705074310302734)
     If (feature 1 <= 40.73056983947754)
      If (feature 1 <= 40.71160125732422)
       Predict: 795.7677213637479
      Else (feature 1 > 40.71160125732422)
       Predict: 1241.3594596657254
     Else (feature 1 > 40.73056983947754)
      If (feature 3 <= 40.683753967285156)
       Predict: 2100.4525564525566
      Else (feature 3 > 40.683753967285156)
       Predict: 1643.8942233452656
    Else (feature 3 > 40.705074310302734)
     If (feature 1 <= 40.72339630126953)
      If (feature 3 <= 40.74630928039551)
       Predict: 742.7502593192869
      Else (feature 3 > 40.74630928039551)
       Predict: 1487.4097335592176
     Else (feature 1 > 40.72339630126953)
      If (feature 3 <= 40.72794151306152)
       Predict: 1068.5003276440877
      Else (feature 3 > 40

- Kiểm tra mô hình bằng dữ liệu validation

In [15]:
predictions_val = model.transform(validation)
predictions_val.show()

[Stage 43:>                                                         (0 + 1) / 1]

+--------------------+-------------+------------------+
|            features|trip_duration|        prediction|
+--------------------+-------------+------------------+
|[-76.963241577148...|         1202| 795.7677213637479|
|[-74.323921203613...|          251| 772.3981952626248|
|[-74.231010437011...|         1880|2065.8282001924927|
|[-74.194831848144...|          132| 772.3981952626248|
|[-74.178779602050...|         2939|1487.4097335592176|
|[-74.177375793457...|           15| 795.7677213637479|
|[-74.17724609375,...|            3| 795.7677213637479|
|[-74.140480041503...|         1158| 795.7677213637479|
|[-74.089508056640...|          573| 795.7677213637479|
|[-74.088569641113...|          737| 795.7677213637479|
|[-74.088218688964...|          258| 795.7677213637479|
|[-74.075500488281...|          401| 795.7677213637479|
|[-74.072662353515...|           31| 795.7677213637479|
|[-74.054946899414...|            8| 772.3981952626248|
|[-74.042556762695...|           10| 742.7502593

                                                                                

- Đánh giá mô hình bằng các độ đo

In [16]:
evaluator = RegressionEvaluator(labelCol = 'trip_duration', predictionCol = 'prediction')
rmse_val = evaluator.setMetricName('rmse').evaluate(predictions_val)
r2_val = evaluator.setMetricName('r2').evaluate(predictions_val)
print(f"[VALIDATION] RMSE: {rmse_val:.2f}, R²: {r2_val:.2f}")



[VALIDATION] RMSE: 4912.68, R²: 0.01


                                                                                

## 4. Đưa ra dự đoán cho tập test và xuất kết quả ra file

- Load dữ liệu từ file test để làm input

In [17]:
df_test = spark.read.csv("file:///home/cuong/Downloads/Big_Data/nyc-taxi-trip-duration/test.csv", header = True, inferSchema = True)
df_test.show()



+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|id3004672|        1|2016-06-30 23:59:58|              1|-73.98812866210938| 40.73202896118164|-73.99017333984375| 40.75667953491211|                 N|
|id3505355|        1|2016-06-30 23:59:53|              1|-73.96420288085938| 40.67999267578125|-73.95980834960938| 40.65540313720703|                 N|
|id1217141|        1|2016-06-30 23:59:47|              1| -73.9974365234375| 40.73758316040039|-73.98616027832031|40.729522705078125|                 N|
|id2150126|        2|2016-06-30 23:59:41|              1|-73.95606994628906| 40.77

                                                                                

- Gom dữ liệu thành assembler để phù hợp với mô hình

In [18]:
num_cols = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude','dropoff_latitude']
assembler = VectorAssembler(inputCols= num_cols, outputCol= 'features')
assembled_df = assembler.transform(df_test).select('id', 'features')
assembled_df.show()


+---------+--------------------+
|       id|            features|
+---------+--------------------+
|id3004672|[-73.988128662109...|
|id3505355|[-73.964202880859...|
|id1217141|[-73.997436523437...|
|id2150126|[-73.956069946289...|
|id1598245|[-73.97021484375,...|
|id0668992|[-73.991302490234...|
|id1765014|[-73.978309631347...|
|id0898117|[-74.012710571289...|
|id3905224|[-73.992332458496...|
|id1543102|[-73.993179321289...|
|id3024712|[-73.968528747558...|
|id3665810|[-73.982772827148...|
|id1836461|[-73.921104431152...|
|id3457080|[-73.986801147460...|
|id3376065|[-73.996345520019...|
|id3008739|[-73.968025207519...|
|id0902216|[-74.007713317871...|
|id3564824|[-73.984298706054...|
|id0820280|[-73.952598571777...|
|id0775088|[-73.966690063476...|
+---------+--------------------+
only showing top 20 rows



- Dự đoán kết quả

In [19]:
prediction_res = model.transform(assembled_df)
prediction_res.show()

+---------+--------------------+------------------+
|       id|            features|        prediction|
+---------+--------------------+------------------+
|id3004672|[-73.988128662109...| 772.3981952626248|
|id3505355|[-73.964202880859...| 795.7677213637479|
|id1217141|[-73.997436523437...| 772.3981952626248|
|id2150126|[-73.956069946289...| 772.3981952626248|
|id1598245|[-73.97021484375,...| 772.3981952626248|
|id0668992|[-73.991302490234...| 772.3981952626248|
|id1765014|[-73.978309631347...|1068.5003276440877|
|id0898117|[-74.012710571289...| 742.7502593192869|
|id3905224|[-73.992332458496...|2065.8282001924927|
|id1543102|[-73.993179321289...| 772.3981952626248|
|id3024712|[-73.968528747558...| 795.7677213637479|
|id3665810|[-73.982772827148...| 772.3981952626248|
|id1836461|[-73.921104431152...| 772.3981952626248|
|id3457080|[-73.986801147460...| 772.3981952626248|
|id3376065|[-73.996345520019...| 772.3981952626248|
|id3008739|[-73.968025207519...| 772.3981952626248|
|id0902216|[

- Chuẩn hóa theo yêu cầu

In [20]:
prediction_res = prediction_res.withColumn('trip_duration', col('prediction').cast('int')).select('id', 'trip_duration')
prediction_res.show()

+---------+-------------+
|       id|trip_duration|
+---------+-------------+
|id3004672|          772|
|id3505355|          795|
|id1217141|          772|
|id2150126|          772|
|id1598245|          772|
|id0668992|          772|
|id1765014|         1068|
|id0898117|          742|
|id3905224|         2065|
|id1543102|          772|
|id3024712|          795|
|id3665810|          772|
|id1836461|          772|
|id3457080|          772|
|id3376065|          772|
|id3008739|          772|
|id0902216|          772|
|id3564824|          772|
|id0820280|          772|
|id0775088|          772|
+---------+-------------+
only showing top 20 rows



- Lưu kết quả vào file

In [27]:
prediction_res.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", True) \
    .parquet("file:///home/cuong/res")


                                                                                