# Phase 3 | ML modeling 

## 1. Reading dataframes

In [1]:
from pyspark.sql import SparkSession

team = 'team11'

warehouse = "/user/team11/project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [2]:
spark.sql("SHOW DATABASES").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             retake1|
|             root_db|
|                show|
|     team0_projectdb|
|    team11_projectdb|
|           team12_db|
|team12_hive_proje...|
|    team12_projectdb|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
| team21_projectdb_v2|
| team21_projectdb_v3|
| team21_projectdb_v4|
+--------------------+
only showing top 20 rows



In [3]:
spark.sql("USE team11_projectdb").show()
spark.sql("SHOW TABLES").show()

++
||
++
++

+----------------+---------------+-----------+
|       namespace|      tableName|isTemporary|
+----------------+---------------+-----------+
|team11_projectdb|    mq1_results|      false|
|team11_projectdb|    mq_features|      false|
|team11_projectdb|mq_features_raw|      false|
|team11_projectdb|     mq_model_1|      false|
|team11_projectdb|     mq_model_2|      false|
|team11_projectdb|     q1_results|      false|
|team11_projectdb|     q2_results|      false|
|team11_projectdb|     q3_results|      false|
|team11_projectdb|     q4_results|      false|
|team11_projectdb|     q5_results|      false|
|team11_projectdb|     q6_results|      false|
|team11_projectdb|     q7_results|      false|
|team11_projectdb|     taxi_trips|      false|
|team11_projectdb|taxi_trips_part|      false|
+----------------+---------------+-----------+



In [4]:
taxi_trips = spark.read.format("avro").table('team11_projectdb.taxi_trips')

In [5]:
taxi_trips.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|ratecodeid|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+
|       1|       1456779600000|        1456780075000|              1|          2.5|-73.97674560546875| 40.76515197753906|         1|    

In [6]:
taxi_trips.printSchema()

root
 |-- vendorid: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- ratecodeid: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)



## 2. Feature extraction pipeline

In [7]:
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import (
    StringIndexer,
    OneHotEncoder,
    VectorAssembler,
    SQLTransformer,
    StandardScaler
)
from pyspark.ml.param.shared import HasInputCol, HasOutputCol

from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, count, when, hour, month, sin, cos, to_timestamp, from_unixtime, lit
import math
import time

In [66]:

class UnixMillisToTimestamp(Transformer):
    def __init__(self, inputCol=None, outputCol=None):
        super(UnixMillisToTimestamp, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, dataset):
        return dataset.withColumn(
            self.outputCol,
            to_timestamp(from_unixtime(col(self.inputCol)/1000))
        )

    
# Custom Transformer to extract hour and month
class ExtractHourMonth(Transformer):
    def __init__(self, inputCol=None, prefix="pickup"):
        super(ExtractHourMonth, self).__init__()
        self.inputCol = inputCol
        self.prefix = prefix

    def _transform(self, dataset):
        dataset = dataset.withColumn(f"{self.prefix}_hour", hour(col(self.inputCol)))
        return dataset.withColumn(f"{self.prefix}_month", month(col(self.inputCol)))


# Custom Transformer for cyclical encoding
class CyclicalTimeEncoder(Transformer):
    def __init__(self):
        super(CyclicalTimeEncoder, self).__init__()

    def _transform(self, dataset):
        dataset = dataset.withColumn("pickup_hour_sin", sin(2 * math.pi * col("pickup_hour") / lit(24)))
        dataset = dataset.withColumn("pickup_hour_cos", cos(2 * math.pi * col("pickup_hour") / lit(24)))
        dataset = dataset.withColumn("pickup_month_sin", sin(2 * math.pi * col("pickup_month") / lit(12)))
        dataset = dataset.withColumn("pickup_month_cos", cos(2 * math.pi * col("pickup_month") / lit(12)))
        dataset = dataset.withColumn("dropoff_hour_sin", sin(2 * math.pi * col("dropoff_hour") / lit(24)))
        dataset = dataset.withColumn("dropoff_hour_cos", cos(2 * math.pi * col("dropoff_hour") / lit(24)))
        dataset = dataset.withColumn("dropoff_month_sin", sin(2 * math.pi * col("dropoff_month") / lit(12)))
        return dataset.withColumn("dropoff_month_cos", cos(2 * math.pi * col("dropoff_month") / lit(12)))


class CyclicalGeoEncoder(Transformer):
    """Encodes latitude and longitude cyclically."""

    def _transform(self, dataset):
        # Normalize longitude from [-180,180] to [0,1]
        lon_norm = (col("pickup_longitude") + lit(180.0)) / lit(360.0)
        lat_norm = (col("pickup_latitude") + lit(90.0)) / lit(180.0)
        dataset = dataset.withColumn(
            "pickup_lon_sin", sin(2 * math.pi * lon_norm)
        ).withColumn(
            "pickup_lon_cos", cos(2 * math.pi * lon_norm)
        ).withColumn(
            "pickup_lat_sin", sin(2 * math.pi * lat_norm)
        ).withColumn(
            "pickup_lat_cos", cos(2 * math.pi * lat_norm)
        )
        # Repeat for dropoff
        lon_norm = (col("dropoff_longitude") + lit(180.0)) / lit(360.0)
        lat_norm = (col("dropoff_latitude") + lit(90.0)) / lit(180.0)
        return dataset.withColumn(
            "dropoff_lon_sin", sin(2 * math.pi * lon_norm)
        ).withColumn(
            "dropoff_lon_cos", cos(2 * math.pi * lon_norm)
        ).withColumn(
            "dropoff_lat_sin", sin(2 * math.pi * lat_norm)
        ).withColumn(
            "dropoff_lat_cos", cos(2 * math.pi * lat_norm)
        )


# Custom Transformer to select features and rename label
class SelectAndRename(Transformer):
    def _transform(self, dataset):
        cols = [
            'total_amount', 'vendorid',
            'passenger_count', 'trip_distance',
            # 'pickup_longitude', 'pickup_latitude',
            # 'dropoff_longitude', 'dropoff_latitude',
            'pickup_lon_sin', 'pickup_lon_cos', 'pickup_lat_sin', 'pickup_lat_cos',
            'dropoff_lon_sin', 'dropoff_lon_cos', 'dropoff_lat_sin', 'dropoff_lat_cos',
            'pickup_hour_sin', 'pickup_hour_cos',
            'pickup_month_sin', 'pickup_month_cos',
            'dropoff_hour_sin', 'dropoff_hour_cos',
            'dropoff_month_sin', 'dropoff_month_cos'
        ]
        dataset = dataset.select(*cols)
        return dataset.withColumnRenamed("total_amount", "label")

In [9]:
df_all = spark.read.format("avro").table('team11_projectdb.taxi_trips')

In [10]:
df_all.selectExpr(
    "mean(total_amount) as mean_amt",
    "stddev(total_amount) as std_amt",
    "min(total_amount) as min_amt",
    "max(total_amount) as max_amt"
).show()

total_amount_quantiles = df_all.stat.approxQuantile("total_amount", [0.1, 0.2, 0.5, 0.75, 0.9, 0.95, 0.99], 0.01)
print("Total_amount quantiles (50%,75%,90%,95%,99%):", total_amount_quantiles)

+------------------+-----------------+-------+---------+
|          mean_amt|          std_amt|min_amt|  max_amt|
+------------------+-----------------+-------+---------+
|16.045819673950596|134.3577728869812| -376.3|429562.25|
+------------------+-----------------+-------+---------+

Total_amount quantiles (50%,75%,90%,95%,99%): [6.8, 7.88, 11.8, 17.76, 28.3, 45.38, 429562.25]


In [11]:
df_all = df_all.filter(col("total_amount") >= 0)

In [12]:
# Step 2: Recompute quantiles on cleaned data
q1, q99 = df_all.stat.approxQuantile("total_amount", [0.01, 0.99], 0.01)
print("Filtered quantiles:", q1, q99)

Filtered quantiles: 0.0 429562.25


In [13]:
df_all.orderBy(col("total_amount").desc()).select("total_amount").show(20, truncate=False)

+------------+
|total_amount|
+------------+
|429562.25   |
|133131.2    |
|126366.58   |
|2009.34     |
|1463.12     |
|1426.8      |
|1347.39     |
|1273.3      |
|1247.3      |
|1121.3      |
|1000.8      |
|989.8       |
|983.3       |
|981.82      |
|934.37      |
|902.8       |
|901.1       |
|900.3       |
|892.8       |
|852.9       |
+------------+
only showing top 20 rows



In [14]:
# Step 3: Filter out outliers outside the 1st and 99th percentiles
df_all = df_all.filter(col("total_amount") < 100000)

In [67]:
# sample_frac = 0.5
# df_sample = df_all.sample(False, sample_frac, seed=42)
# train_df, test_df = df_sample.randomSplit([0.2, 0.8], seed=42)
# train_df, test_df, _ = df_sample.randomSplit([0.08, 0.2, 0.72], seed=42)

# 3. Build preprocessing pipeline with custom transformers
preprocessing = Pipeline(stages=[
    UnixMillisToTimestamp(inputCol="tpep_pickup_datetime", outputCol="pickup_ts"),
    UnixMillisToTimestamp(inputCol="tpep_dropoff_datetime", outputCol="dropoff_ts"),
    ExtractHourMonth(inputCol="pickup_ts", prefix="pickup"),
    ExtractHourMonth(inputCol="dropoff_ts", prefix="dropoff"),
    CyclicalTimeEncoder(),
    CyclicalGeoEncoder(),
    SelectAndRename(),
    VectorAssembler(
        inputCols=[
            'vendorid',
            'passenger_count', 'trip_distance',
            'pickup_lon_sin', 'pickup_lon_cos', 'pickup_lat_sin', 'pickup_lat_cos',
            'dropoff_lon_sin', 'dropoff_lon_cos', 'dropoff_lat_sin', 'dropoff_lat_cos',
            'pickup_hour_sin', 'pickup_hour_cos',
            'pickup_month_sin', 'pickup_month_cos',
            'dropoff_hour_sin', 'dropoff_hour_cos',
            'dropoff_month_sin', 'dropoff_month_cos'
        ], outputCol='features_raw'
    ),
    StandardScaler(inputCol='features_raw',
                   outputCol='features',
                   withMean=True)
])

## 3. Splitting dataset into train and test 

In [68]:
# sample_frac = 1.0
# df_sample = df_all.sample(False, sample_frac, seed=42)

train_df, test_df = df_all.randomSplit([0.8, 0.2], seed=42)

In [69]:
temp_model = preprocessing.fit(train_df)
train = temp_model.transform(train_df)
test = temp_model.transform(test_df)

In [202]:
# Save splits
# train.select("features", "label").coalesce(1)\
#     .write.mode("overwrite").json("project/data/train")
# test.select("features", "label").coalesce(1)\
#     .write.mode("overwrite").json("project/data/test")

## 4. Initial testing of models

In [70]:
start_time = time.time()
rf = RandomForestRegressor(featuresCol="features",
                           labelCol="label")
model_rf = rf.fit(train)
train_time = time.time() - start_time

In [71]:
start_time_test = time.time()
predictions = model_rf.transform(test)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

rmse = evaluator.setMetricName("rmse").evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
mse = evaluator.setMetricName("mse").evaluate(predictions)
mae = evaluator.setMetricName("mae").evaluate(predictions)

time_test = time.time() - start_time_test

print("=== Evaluation Results for RandomForestRegressor ===")
print(f"Training time: {train_time:.2f} seconds")
print(f"Evaluation time: {time_test:.2f} seconds")
print(f"RMSE: {rmse:.4f}")
print(f"R^2: {r2:.4f}")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}")

=== Evaluation Results for RandomForestRegressor ===
Training time: 252.23 seconds
Evaluation time: 78.13 seconds
RMSE: 6.0219
R^2: 0.8025
MSE: 36.2635
MAE: 2.8592


In [72]:
predictions.select("prediction", "label") \
    .limit(20) \
    .show(truncate=False)

+------------------+-----+
|prediction        |label|
+------------------+-----+
|9.704670779920333 |5.3  |
|9.809457028480228 |8.3  |
|10.109131283126016|13.55|
|9.650714915315234 |6.3  |
|11.04661066767791 |9.3  |
|11.17365560341319 |10.3 |
|9.704670779920333 |3.8  |
|9.456240976962246 |7.5  |
|12.118126560211985|15.35|
|19.92850204401483 |14.8 |
|11.273414656816456|9.8  |
|23.382832515995705|25.07|
|38.492080336599805|47.89|
|9.704670779920333 |7.3  |
|16.45193025073419 |13.3 |
|16.619592000769323|26.3 |
|19.240933145950162|20.15|
|12.886050183305429|11.8 |
|9.704670779920333 |5.3  |
|17.357769717123926|16.8 |
+------------------+-----+



In [56]:
preprocessing = Pipeline(stages=[
    UnixMillisToTimestamp(inputCol="tpep_pickup_datetime", outputCol="pickup_ts"),
    UnixMillisToTimestamp(inputCol="tpep_dropoff_datetime", outputCol="dropoff_ts"),
    ExtractHourMonth(inputCol="pickup_ts", prefix="pickup"),
    ExtractHourMonth(inputCol="dropoff_ts", prefix="dropoff"),
    CyclicalTimeEncoder(),
    CyclicalGeoEncoder(),
    SelectAndRename(),
    VectorAssembler(
        inputCols=[
            'vendorid',
            'passenger_count', 'trip_distance',
            'pickup_longitude', 'pickup_latitude',
            'dropoff_longitude', 'dropoff_latitude',
            'pickup_hour_sin', 'pickup_hour_cos',
            'pickup_month_sin', 'pickup_month_cos',
            'dropoff_hour_sin', 'dropoff_hour_cos',
            'dropoff_month_sin', 'dropoff_month_cos'
        ], outputCol='features_raw'
    ),
    StandardScaler(inputCol='features_raw',
                   outputCol='features',
                   withMean=True)
])


train_df, test_df = df_all.randomSplit([0.8, 0.2], seed=42)

temp_model = preprocessing.fit(train_df)
train = temp_model.transform(train_df)
test = temp_model.transform(test_df)

start_time = time.time()
rf = RandomForestRegressor(featuresCol="features",
                           labelCol="label")
model_rf = rf.fit(train)
train_time = time.time() - start_time

start_time_test = time.time()
predictions = model_rf.transform(test)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

rmse = evaluator.setMetricName("rmse").evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
mse = evaluator.setMetricName("mse").evaluate(predictions)
mae = evaluator.setMetricName("mae").evaluate(predictions)

time_test = time.time() - start_time_test

print("=== Evaluation Results for RandomForestRegressor ===")
print(f"Training time: {train_time:.2f} seconds")
print(f"Evaluation time: {time_test:.2f} seconds")
print(f"RMSE: {rmse:.4f}")
print(f"R^2: {r2:.4f}")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}")

=== Evaluation Results for RandomForestRegressor ===
Training time: 230.49 seconds
Evaluation time: 69.05 seconds
RMSE: 6.2715
R^2: 0.7858
MSE: 39.3317
MAE: 3.0398


In [60]:
start_time = time.time()
rf = GBTRegressor(featuresCol="features",
                  labelCol="label",
                  maxDepth=3)
model_rf = rf.fit(train)
train_time = time.time() - start_time

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.6/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib64/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
start_time_test = time.time()
predictions = model_rf.transform(test)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

rmse = evaluator.setMetricName("rmse").evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
mse = evaluator.setMetricName("mse").evaluate(predictions)
mae = evaluator.setMetricName("mae").evaluate(predictions)

time_test = time.time() - start_time_test

print("=== Evaluation Results for GBTRegressor ===")
print(f"Training time: {train_time:.2f} seconds")
print(f"Evaluation time: {time_test:.2f} seconds")
print(f"RMSE: {rmse:.4f}")
print(f"R^2: {r2:.4f}")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}")

In [None]:
predictions.select("prediction", "label") \
    .limit(50) \
    .show(truncate=False)

## 5. Full training pipeline: baseline models + grid search + saving results 

In [203]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

In [204]:
models_config = [
    {
        "name": "RandomForest",
        "estimator": RandomForestRegressor(featuresCol="features",
                                           labelCol="label"),
    },
    {
        "name": "GBT",
        "estimator": GBTRegressor(featuresCol="features", labelCol="label"),
    }
]

# Build parameter grids referring to the same estimator instances:
models_config[0]["param_grid"] = ParamGridBuilder()  \
    .addGrid(models_config[0]["estimator"].numTrees, [10, 40])  \
    .addGrid(models_config[0]["estimator"].maxDepth, [5, 10])  \
    .build()

models_config[1]["param_grid"] = ParamGridBuilder()  \
    .addGrid(models_config[1]["estimator"].maxDepth, [3, 8])  \
    .addGrid(models_config[1]["estimator"].maxBins, [24, 32])  \
    .build()

# Add output paths
models_config[0].update({"output_model": "model1",
                         "output_pred": "model1_predictions"})
models_config[1].update({"output_model": "model2",
                         "output_pred": "model2_predictions"})

In [None]:
import time
from pprint import pprint

results = []

for config in models_config:
    print(f"\n=== Running baseline model for: {config['name']} ===")

    # Step 1: Train baseline model (no tuning)
    base_estimator = config["estimator"]
    baseline_start = time.time()
    baseline_model = base_estimator.fit(train)
    baseline_train_time = time.time() - baseline_start
    print(f"Baseline training time: {baseline_train_time:.2f} seconds")

    baseline_start_test = time.time()
    baseline_predictions = baseline_model.transform(test)
    baseline_rmse = evaluator.setMetricName("rmse")\
        .evaluate(baseline_predictions)
    baseline_r2 = evaluator.setMetricName("r2")\
        .evaluate(baseline_predictions)
    baseline_test_time = time.time() - baseline_start_test
    print(f"Baseline RMSE: {baseline_rmse:.4f}, R²: {baseline_r2:.4f}")
    print(f"Baseline test time: {baseline_test_time:.2f} seconds")

    results.append((
        f"{config['name']}_baseline",
        "{}",
        baseline_rmse,
        baseline_r2,
        baseline_train_time,
        baseline_test_time
    ))

    # Save baseline model and predictions
    baseline_model.write().overwrite()\
        .save(f"project/models/{config['output_model']}_baseline")
    baseline_predictions.select("label", "prediction") \
        .coalesce(1) \
        .write.mode("overwrite")\
        .csv(f"project/output/{config['output_pred']}_baseline", header=True)

    # Step 2: Hyperparameter tuning with CrossValidator
    print(f"\n--- Running hyperparameter tuning for: {config['name']} ---")
    pprint(config["param_grid"])
    tuning_start = time.time()

    cv = CrossValidator(
        estimator=config["estimator"],
        estimatorParamMaps=config["param_grid"],
        evaluator=evaluator.setMetricName("rmse"),
        numFolds=3,
        parallelism=4
    )
    tuned_model = cv.fit(train).bestModel
    tuning_train_time = time.time() - tuning_start
    print(f"Tuning completed in {tuning_train_time:.2f} seconds")

    param_map = tuned_model.extractParamMap()
    params = {p.name: tuned_model.getOrDefault(p) for p in param_map.keys()}
    grid_params = config['param_grid'][0].keys()
    relevant_param_names = [gp.name for gp in grid_params]
    filtered_params = {k: v for k, v in params.items() if k in relevant_param_names}

    # Save tuned model and predictions
    tuned_model.write().overwrite()\
        .save(f"project/models/{config['output_model']}")
    tuning_start_test = time.time()
    tuned_predictions = tuned_model.transform(test)
    tuned_predictions.select('label', 'prediction') \
        .coalesce(1) \
        .write.mode('overwrite')\
        .csv(f"project/output/{config['output_pred']}", header=True)

    tuned_rmse = evaluator.setMetricName("rmse").evaluate(tuned_predictions)
    tuned_r2 = evaluator.setMetricName("r2").evaluate(tuned_predictions)
    print(f"Tuned RMSE: {tuned_rmse:.4f}, R²: {tuned_r2:.4f}")
    tuning_test_time = time.time() - tuning_start_test
    print(f"Baseline test time: {tuning_test_time:.2f} seconds")

    results.append((
        f"{config['name']}_tuned",
        str(filtered_params),
        tuned_rmse,
        tuned_r2,
        tuning_train_time,
        tuning_test_time))
# Final summary
summary = spark.createDataFrame(results, ["model", "params", "rmse", "r2", "train_time_sec", "eval_time_sec"])
summary.show(truncate=False)
summary.coalesce(1) \
    .write.mode('overwrite').csv("project/output/evaluation", header=True)


=== Running baseline model for: RandomForest ===
Baseline training time: 67.48 seconds
Baseline RMSE: 6.5116, R²: 0.7722
Baseline test time: 11.71 seconds

--- Running hyperparameter tuning for: RandomForest ---
[{Param(parent='RandomForestRegressor_4e57b2c82f26', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='RandomForestRegressor_4e57b2c82f26', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
 {Param(parent='RandomForestRegressor_4e57b2c82f26', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 10,
  Param(parent='RandomForestRegressor_4e57b2c82f26', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
 {Param(parent='RandomForestRegressor_4e57b2c82f26', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g.,

In [208]:
print(filtered_params)

{'maxDepth': 10, 'numTrees': 40}
