In [27]:
import time
from operator import itemgetter

import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

#### Prepare dataset

In [28]:
X, y = fetch_california_housing(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=.2, random_state=201
)

In [29]:
X.head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25


#### Set number of models to train

In [30]:
NUM_MODELS = 20

#### Implement Function to train and score model

In [31]:
def train_and_score_model(
    train_set: pd.DataFrame,
    test_set: pd.DataFrame,
    train_labels: pd.Series,
    test_labels: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set, train_labels)
    y_pred = model.predict(test_set)
    score = mean_squared_error(test_labels, y_pred)

    time_delta = time.time() - start_time
    print(f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds")
    return n_estimators, score



This function takes data, creates a `RandomForestRegressor` model, trains it and scores the model on the test set.
`train_and_score_model` returns a tuple: (n_estimators, mse_score)

#### Implement fucntion that runs **sequential** model training

In [32]:
def run_sequential(n_models: int) -> list[tuple[int, float]]:
    return [
        train_and_score_model(
            train_set=X_train,
            test_set=X_test,
            train_labels=y_train,
            test_labels=y_test,
            n_estimators=8 + 4* j,
        )
        for j in range(n_models)
    ]

This function trains `n_models` sequentially for an increasing number of `n_estimators` (increasing by 4 each model, e.e. 8, 12, 16, 20...).)
`run_sequential` returns a list of tuples

#### Run sqeuential model training

In [33]:
%%time

mse_scores = run_sequential(n_models=NUM_MODELS)

n_estimators=8, mse=0.2983, took: 0.77 seconds
n_estimators=12, mse=0.2826, took: 1.13 seconds
n_estimators=16, mse=0.2761, took: 1.51 seconds
n_estimators=20, mse=0.2716, took: 1.88 seconds
n_estimators=24, mse=0.2694, took: 2.26 seconds
n_estimators=28, mse=0.2686, took: 2.64 seconds
n_estimators=32, mse=0.2662, took: 3.02 seconds
n_estimators=36, mse=0.2663, took: 3.42 seconds
n_estimators=40, mse=0.2635, took: 3.83 seconds
n_estimators=44, mse=0.2622, took: 4.14 seconds
n_estimators=48, mse=0.2616, took: 4.53 seconds
n_estimators=52, mse=0.2609, took: 4.91 seconds
n_estimators=56, mse=0.2615, took: 5.32 seconds
n_estimators=60, mse=0.2608, took: 5.67 seconds
n_estimators=64, mse=0.2614, took: 6.06 seconds
n_estimators=68, mse=0.2616, took: 6.44 seconds
n_estimators=72, mse=0.2617, took: 6.81 seconds
n_estimators=76, mse=0.2614, took: 7.20 seconds
n_estimators=80, mse=0.2607, took: 7.62 seconds
n_estimators=84, mse=0.2601, took: 7.98 seconds
CPU times: user 1min 26s, sys: 131 ms, to

#### Analyze results

In [34]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Best model: mse=0.2601, n_estimators=84


#### Parallel implementation

In contrast to the prior approach, can now utilize all resources available, to train models in parallel. Ray will automaatically detect the number of cores on the computer or the amount of resources in a cluster to distribute each defined task.

#### Initialize Ray runtime

In [35]:
import ray

if ray.is_initialized:
    ray.shutdown()
ray.init()

2024-05-29 14:08:38,275	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.8
Ray version:,2.23.0
Dashboard:,http://127.0.0.1:8265


[36m(train_and_score_model pid=8650)[0m n_estimators=8, mse=0.2983, took: 1.51 seconds 


[33m(raylet)[0m [2024-05-29 14:08:47,265 E 8639 20079552] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-05-29_14-08-34_958763_4746 is over 95% full, available space: 7207268352; capacity: 245107195904. Object creation will fail if spilling is required.


[36m(train_and_score_model pid=8646)[0m n_estimators=36, mse=0.2663, took: 6.50 seconds [32m [repeated 7x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m
[36m(train_and_score_model pid=8644)[0m n_estimators=52, mse=0.2609, took: 9.14 seconds [32m [repeated 4x across cluster][0m


[33m(raylet)[0m [2024-05-29 14:08:57,354 E 8639 20079552] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-05-29_14-08-34_958763_4746 is over 95% full, available space: 7204950016; capacity: 245107195904. Object creation will fail if spilling is required.


[36m(train_and_score_model pid=8650)[0m n_estimators=72, mse=0.2617, took: 10.70 seconds [32m [repeated 5x across cluster][0m
[36m(train_and_score_model pid=8644)[0m n_estimators=84, mse=0.2601, took: 10.89 seconds 


[33m(raylet)[0m [2024-05-29 14:09:07,450 E 8639 20079552] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-05-29_14-08-34_958763_4746 is over 95% full, available space: 7204757504; capacity: 245107195904. Object creation will fail if spilling is required.
[33m(raylet)[0m [2024-05-29 14:09:17,548 E 8639 20079552] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-05-29_14-08-34_958763_4746 is over 95% full, available space: 7204671488; capacity: 245107195904. Object creation will fail if spilling is required.
[33m(raylet)[0m [2024-05-29 14:09:27,645 E 8639 20079552] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-05-29_14-08-34_958763_4746 is over 95% full, available space: 7204470784; capacity: 245107195904. Object creation will fail if spilling is required.
[33m(raylet)[0m [2024-05-29 14:09:37,652 E 8639 20079552] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-05-29_14-08-34_958763_4746 is over 95% full, available space: 7204282368; cap

In a distributed system, object references are pointers to objects in memory. Object references can be used to access objects taht are stored on different machines, allowing them to communicate and share data

In [36]:
X_train_ref = ray.put(X_train)
X_test_ref = ray.put(X_test)
y_train_ref = ray.put(y_train)
y_test_ref = ray.put(y_test)

By placing the training and testing data into Ray's object store, these objects are now available to all remote actors in the cluster

#### Implement function to train and score model

In [37]:
@ray.remote
def train_and_score_model(
    train_set_ref: pd.DataFrame,
    test_set_ref: pd.DataFrame,
    train_labels_ref: pd.Series,
    test_labels_ref: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()
    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set_ref, train_labels_ref)
    y_pred = model.predict(test_set_ref)
    score = mean_squared_error(test_labels_ref, y_pred)

    time_delta = time.time() - start_time
    print(f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds ")

    return n_estimators, score

Adding the `@ray.remote` decorator to specifies the function will be executed in a distibuted manner

#### Implement function that runs **parallel** model training

In [38]:
def run_parallel(n_models: int) -> list[tuple[int, float]]:
    results_ref = [
        train_and_score_model.remote(
            train_set_ref=X_train_ref,
            test_set_ref=X_test_ref,
            train_labels_ref=y_train_ref,
            test_labels_ref=y_test_ref,
            n_estimators=8 + 4* j,
        )
        for j in range(n_models)
    ]
    return ray.get(results_ref)

#### Run parallel model training

In [39]:
%%time

mse_scores = run_parallel(n_models=NUM_MODELS)

CPU times: user 109 ms, sys: 79.2 ms, total: 189 ms
Wall time: 26.3 s


The result is **6x performance gain**:
+ Parallel took 26 seconds
+ Sequential took over 1 minute

#### Analyze Results

In [42]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best Model: mse={best[1]:.4f}, n_estimators={best[0]}")

Best Model: mse=0.2601, n_estimators=84


#### Shutdown Ray runtime

In [43]:
ray.shutdown()