### Hyper Parameter
The purpose of hyperparameter tuning is to maximize the generalizability of the model. That is, to ensure it performs well not just on the training data but also on unseen data (test data). By appropriately adjusting hyperparameters, one can prevent overfitting and improve the predictive performance of the model.

In [1]:
from pyspark.sql import SparkSession

In [2]:
#Create a Spark instance and pre-allocate Max memory to prevent errors.
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("taxi-duration-prediction-2")\
            .config("spark.executor.memory", MAX_MEMORY)\
            .config("spark.driver.memory", MAX_MEMORY).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/04 12:33:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# 데이터를 읽기 전에 데이터 타입을 변환
data_dir = "/Users/kyungminpark/Desktop/Fall2023/CS4641/data/"

In [4]:
# 스키마를 사용하여 Parquet 파일을 읽음
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
toy_df = train_df.sample(False, 0.1, seed=1)

In [5]:
toy_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [6]:
## Categorical Feature PreProcessing Steps
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_feats = [ ##categorical features
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

## 파이프라인 stage을 통해 데이터를 통과 (파이프라인은 여러 스테이지로 구성되어 있고, 각 스테이지별로 실행됨)
stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [7]:
stages

[StringIndexer_229b6f949ac2,
 OneHotEncoder_2f34e2258ba2,
 StringIndexer_c027cf47ae58,
 OneHotEncoder_9b9181824ae5,
 StringIndexer_95d285ae9394,
 OneHotEncoder_7f1d94b0a99a]

In [8]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "pickup_time",
    "passenger_count",
    "trip_distance"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

In [9]:
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

## Hyperparameter Tuning

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(
    maxIter=30,
    solver="normal",
    labelCol='total_amount',
    featuresCol='feature_vector'
)

cv_stages = stages + [lr] ##Preprocessing + linear Regression

In [11]:
cv_pipeline = Pipeline(stages=cv_stages)

In [12]:
param_grid = ParamGridBuilder()\
                .addGrid(lr.elasticNetParam, [0.1, 0.2, 0.3, 0.4, 0.5])\
                .addGrid(lr.regParam, [0.01, 0.02, 0.03, 0.04, 0.05])\
                .build()

cross_val = CrossValidator(estimator=cv_pipeline,
                           estimatorParamMaps=param_grid,
                           evaluator=RegressionEvaluator(labelCol="total_amount"),
                           numFolds=5)

In [13]:
cv_model = cross_val.fit(toy_df) ##Train toy_def

23/12/04 12:34:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/04 12:34:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [14]:
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

## 이걸 실행하면 alpha - 0.1 , reg_param - 0.05 인걸 알게됨.
## 이 숫자들을 가지고 다시 Training Section 적용해서 성능 비교.

## Training

In [15]:
transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

vtrain_df = fitted_transformer.transform(train_df)


lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector",
    elasticNetParam=alpha, ## Newly added from the above
    regParam=reg_param,    ## Newly added from the above
)

[Stage 3024:=====>                                                 (1 + 9) / 10]                                                                                

In [16]:
vtrain_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- pickup_time_vecotr: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- passenger_count_vecotr: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vecotr: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- feature_vector: vector (nullab

In [17]:
model = lr.fit(vtrain_df)
vtest_df = fitted_transformer.transform(test_df)

predictions = model.transform(vtest_df)
predictions.cache()

                                                                                

DataFrame[passenger_count: double, pickup_location_id: string, dropoff_location_id: string, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, pickup_time_vecotr: vector, pickup_time_scaled: vector, passenger_count_vecotr: vector, passenger_count_scaled: vector, trip_distance_vecotr: vector, trip_distance_scaled: vector, feature_vector: vector, prediction: double]

In [18]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.3|    Tuesday|        12.6|14.690715913607383|
|          1.5|   Saturday|        14.0|  18.0011250791562|
|          1.9|   Thursday|       21.35| 20.18423144836835|
|          2.3|   Thursday|       29.85| 23.50635369736571|
|         16.7|     Friday|       90.55| 86.99375670121539|
|          0.9|    Tuesday|        17.0|15.940835772262776|
|          1.9|     Sunday|        22.4|19.563971842794913|
|          2.6|  Wednesday|        27.9|24.373894082368693|
|          3.3|    Tuesday|        47.0|27.207305688184444|
|          3.3|     Monday|        29.7| 26.57452001959966|
|          2.3|  Wednesday|        33.2|23.319352620882395|
|          0.6|  Wednesday|        15.4| 16.02458878822991|
|          0.8|  Wednesday|       14.25| 17.17093393251385|
|          0.9|   Thursday|       14.25|

In [19]:
model.summary.rootMeanSquaredError

6.672639084886589

In [20]:
model.summary.r2

0.9105094622761973

# 모델 저장 및 재사용

In [30]:
model_dir = "/Users/kyungminpark/Desktop/Fall2023/CS4641/data/model"
model.save(model_dir)

In [31]:
from pyspark.ml.regression import LinearRegressionModel

lr_model = LinearRegressionModel().load(model_dir)
predictions = lr_model.transform(vtest_df)

predictions.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+------------------+--------------------+----------------------+----------------------+--------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|pickup_time_vecotr|  pickup_time_scaled|passenger_count_vecotr|passenger_count_scaled|trip_distance_vecotr|trip_distance_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------

# Non-supervised Learning

In [36]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, abs

# Convert string to index
day_of_week_indexer = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_index")

# One-hot encoding
day_of_week_encoder = OneHotEncoder(inputCol="day_of_week_index", outputCol="day_of_week_encoded")

# Feature assembly with encoded day_of_week
assembler = VectorAssembler(inputCols=["passenger_count", "trip_distance", "pickup_time", "day_of_week_encoded"], outputCol="features")

# Feature scaling
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [37]:
# Random Forest Regressor
rf = RandomForestRegressor(featuresCol="scaledFeatures", labelCol="total_amount")

# Pipeline with indexing, encoding, assembling, and scaling stages
pipeline = Pipeline(stages=[day_of_week_indexer, day_of_week_encoder, assembler, scaler, rf])

# Extended hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.maxBins, [32, 64]) \
    .build()

# Using a smaller dataset for initial tuning
small_train_df = train_df.sample(False, 0.3) # 30% of the data

In [38]:
## It takes around 20minutes to finish the training.

# CrossValidator for hyperparameter tuning
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="total_amount"),
                          numFolds=3)

# Fitting the model
cvModel = crossval.fit(small_train_df)

23/12/04 13:02:36 WARN DAGScheduler: Broadcasting large task binary with size 1348.2 KiB
23/12/04 13:02:42 WARN DAGScheduler: Broadcasting large task binary with size 1389.1 KiB
23/12/04 13:02:48 WARN DAGScheduler: Broadcasting large task binary with size 1348.2 KiB
23/12/04 13:02:49 WARN DAGScheduler: Broadcasting large task binary with size 1996.8 KiB
23/12/04 13:02:50 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
23/12/04 13:02:51 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
23/12/04 13:02:52 WARN DAGScheduler: Broadcasting large task binary with size 5.1 MiB
23/12/04 13:02:54 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB
23/12/04 13:03:02 WARN DAGScheduler: Broadcasting large task binary with size 1389.1 KiB
23/12/04 13:03:02 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
23/12/04 13:03:03 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/12/04 13:03:05 WARN DAGScheduler: Br

23/12/04 13:10:10 WARN DAGScheduler: Broadcasting large task binary with size 1353.8 KiB
23/12/04 13:10:11 WARN DAGScheduler: Broadcasting large task binary with size 1977.9 KiB
23/12/04 13:10:12 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
23/12/04 13:10:13 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
23/12/04 13:10:15 WARN DAGScheduler: Broadcasting large task binary with size 4.9 MiB
23/12/04 13:10:17 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB
23/12/04 13:10:23 WARN DAGScheduler: Broadcasting large task binary with size 1393.9 KiB
23/12/04 13:10:23 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
23/12/04 13:10:25 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
23/12/04 13:10:25 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/04 13:10:27 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
23/12/04 13:10:28 WARN DAGScheduler: Broadcas

23/12/04 13:17:55 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
23/12/04 13:17:56 WARN DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/12/04 13:17:58 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/12/04 13:18:04 WARN DAGScheduler: Broadcasting large task binary with size 1398.5 KiB
23/12/04 13:18:05 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/12/04 13:18:06 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/12/04 13:18:07 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
23/12/04 13:18:09 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
23/12/04 13:18:10 WARN DAGScheduler: Broadcasting large task binary with size 1008.8 KiB
23/12/04 13:18:10 WARN DAGScheduler: Broadcasting large task binary with size 7.5 MiB
23/12/04 13:18:12 WARN DAGScheduler: Broadcasting large task binary with size 1154.6 KiB
23/12/04 13:18:31 WARN DAGScheduler: Broadcas

23/12/04 13:25:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/12/04 13:25:43 WARN DAGScheduler: Broadcasting large task binary with size 15.3 MiB
23/12/04 13:25:55 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/12/04 13:25:56 WARN DAGScheduler: Broadcasting large task binary with size 21.9 MiB
23/12/04 13:26:09 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
23/12/04 13:26:11 WARN DAGScheduler: Broadcasting large task binary with size 30.0 MiB
23/12/04 13:26:25 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
23/12/04 13:26:27 WARN DAGScheduler: Broadcasting large task binary with size 39.3 MiB
23/12/04 13:26:45 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
                                                                                

In [39]:
# Evaluate on test data
predictions = cvModel.transform(test_df)
evaluator = RegressionEvaluator(labelCol="total_amount")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data =", rmse)



Root Mean Squared Error (RMSE) on test data = 8.615856029697497




In [40]:
from pyspark.sql.functions import col, abs

# Calculate the accuracy rate within an acceptable error
acceptable_error = 5
accurate_predictions = predictions.withColumn("error", abs(col("prediction") - col("total_amount")))
accurate_count = accurate_predictions.where(col("error") <= acceptable_error).count()
total_count = predictions.count()
accuracy_rate = accurate_count / total_count * 100
print(f"Accuracy Rate within ${acceptable_error}: {accuracy_rate}%")



Accuracy Rate within $5: 75.45466414616924%




In [41]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.3|    Tuesday|        12.6|14.510216938488384|
|          1.5|   Saturday|        14.0|18.956019560942053|
|          1.9|   Thursday|       21.35|19.271994145172137|
|          2.3|   Thursday|       29.85|24.277038024548496|
|         16.7|     Friday|       90.55| 90.85347813647773|
|          0.9|    Tuesday|        17.0|16.438776624591704|
|          1.9|     Sunday|        22.4|19.763197327564306|
|          2.6|  Wednesday|        27.9| 24.69470896026036|
|          3.3|    Tuesday|        47.0| 28.74414365196836|
|          3.3|     Monday|        29.7|28.555897371492037|
|          2.3|  Wednesday|        33.2|23.771215109215902|
|          0.6|  Wednesday|        15.4|14.941718463129858|
|          0.8|  Wednesday|       14.25|16.226487560856587|
|          0.9|   Thursday|       14.25|