## Exercise - Part I - Train a model locally using Ray


In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import  StandardScaler
from sklearn.pipeline import Pipeline

import joblib
from ray.util.joblib import register_ray

2. Load the dataset, isolate the predictors from the target variable, and split the dataset between a training set and a validation set.

In [2]:
credit_card_data = pd.read_csv('https://lead-program-assets.s3.eu-west-3.amazonaws.com/M01-Distributed_machine_learning/datasets/creditcard.csv')

In [3]:
X = credit_card_data.loc[:, credit_card_data.columns != "Class"]
y = credit_card_data.loc[:, credit_card_data.columns == "Class"]
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=.2)


3. Build a [pipeline](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html) with two steps: a standardization, then a random forest classifier.

In [4]:
model = Pipeline(steps=[
        ("standard_scaler", StandardScaler()),
        ("classifier", RandomForestClassifier(n_estimators=100,
                                              max_depth=5))
    ], verbose=True)

4. Train the model with `joblib` using `ray` as the parallelization backend.

In [5]:
register_ray()

with joblib.parallel_backend('ray'):    
    model.fit(X_train, y_train)

  return fit_method(estimator, *args, **kwargs)
2024-06-30 15:47:12,089	INFO ray_backend.py:74 -- Starting local ray cluster


[Pipeline] ... (step 1 of 2) Processing standard_scaler, total=   0.1s


2024-06-30 15:47:17,330	INFO worker.py:1762 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


[Pipeline] ........ (step 2 of 2) Processing classifier, total=  22.3s


Congratulation, you've launched a local ray cluster to parallelize a model training ! Now let's move on to parallelizing a hyperparameter search.

5. Define a parameter search space in the form of a dictonnary as described in the [grid search documentation](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html#sklearn.model_selection.GridSearchCV)

In [6]:
param_space = {
    'classifier__n_estimators': [1,10,100],
    'classifier__max_depth': [2,3,5,10]
}

6. Using the ray tune wrapper for scikit-learn, launch your hyperparameter search.

In [7]:
# C'est MORT !!!
# https://github.com/ray-project/tune-sklearn

# from ray.tune.sklearn import TuneGridSearchCV
# tune_search = TuneGridSearchCV(
#     model, param_space
# )

# import time  # Just to compare fit times

# start = time.time()
# tune_search.fit(X_train, y_train)
# end = time.time()
# print("Tune GridSearch Fit Time:", end - start)

# print(tune_search.best_params_)



import time
import joblib
from ray.util.joblib import register_ray
from sklearn.model_selection import RandomizedSearchCV

start = time.time()
#search = RandomizedSearchCV(model, param_space, cv=5, n_iter=300, verbose=10)
search = RandomizedSearchCV(model, param_space)

register_ray()
with joblib.parallel_backend('ray'):
    search.fit(X_train, y_train)


[36m(PoolActor pid=24848)[0m   return fit_method(estimator, *args, **kwargs)


[36m(PoolActor pid=24848)[0m [Pipeline] ... (step 1 of 2) Processing standard_scaler, total=   0.1s
[36m(PoolActor pid=33952)[0m [Pipeline] ........ (step 2 of 2) Processing classifier, total=   1.1s
[36m(PoolActor pid=34328)[0m [Pipeline] ... (step 1 of 2) Processing standard_scaler, total=   0.1s[32m [repeated 29x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m
[36m(PoolActor pid=17620)[0m [Pipeline] ........ (step 2 of 2) Processing classifier, total=   9.9s[32m [repeated 10x across cluster][0m


[36m(PoolActor pid=17620)[0m   return fit_method(estimator, *args, **kwargs)[32m [repeated 30x across cluster][0m


[36m(PoolActor pid=13348)[0m [Pipeline] ... (step 1 of 2) Processing standard_scaler, total=   0.1s[32m [repeated 10x across cluster][0m
[36m(PoolActor pid=23828)[0m [Pipeline] ........ (step 2 of 2) Processing classifier, total=  18.0s[32m [repeated 15x across cluster][0m


[36m(PoolActor pid=23828)[0m   return fit_method(estimator, *args, **kwargs)[32m [repeated 10x across cluster][0m


[36m(PoolActor pid=32732)[0m [Pipeline] ... (step 1 of 2) Processing standard_scaler, total=   0.1s[32m [repeated 5x across cluster][0m
[36m(PoolActor pid=33952)[0m [Pipeline] ........ (step 2 of 2) Processing classifier, total= 1.2min[32m [repeated 10x across cluster][0m
[36m(PoolActor pid=21556)[0m [Pipeline] ........ (step 2 of 2) Processing classifier, total= 2.1min[32m [repeated 5x across cluster][0m


[36m(PoolActor pid=21556)[0m   return fit_method(estimator, *args, **kwargs)[32m [repeated 5x across cluster][0m


[36m(PoolActor pid=21556)[0m [Pipeline] ... (step 1 of 2) Processing standard_scaler, total=   0.1s


[Pipeline] ... (step 1 of 2) Processing standard_scaler, total=   0.1s


  return fit_method(estimator, *args, **kwargs)


[Pipeline] ........ (step 2 of 2) Processing classifier, total=  20.5s


7. In production environment you would most likely have to submit your parallel jobs to a remote cluster. A remote cluster in the cloud is not cheap, so for testing purposes, let's start a ray cluster on kubernetes using our local machine on minikube.

## Exercise - Part II - Train a model on a Ray Cluster

In case the local ray cluster is still in use, let's stop it.
```shell
ray stop 
```

As a reminder, here are the commands you may use to start your cluster on minikube: 

```shell
minikube start
```

```shell
minikube dashboard
```

```shell
helm repo add kuberay https://ray-project.github.io/kuberay-helm/
```

```shell
helm install kuberay-operator kuberay/kuberay-operator --version 1.0.0
```

Configure the ray cluster, you may use a .yaml file to give it more power than the default version !

```shell
helm install raycluster kuberay/ray-cluster --version 1.0.0 -f ray-cluster.yaml
```

```shell
kubectl port-forward --address 0.0.0.0 service/raycluster-kuberay-head-svc 8265:8265
```

In [None]:
ray stop
minikube start
minikube dashboard
helm repo add kuberay https://ray-project.github.io/kuberay-helm/
helm install kuberay-operator kuberay/kuberay-operator --version 1.0.0
helm install raycluster kuberay/ray-cluster --version 1.0.0 -f ray-cluster.yaml
kubectl port-forward --address 0.0.0.0 service/raycluster-kuberay-head-svc 8265:8265


# Après faut ecrire ray-train.py 
# Faire un truc du style
ray job submit --runtime-env-json='{\"working_dir\": \"./\", \"pip\": [\"requests==2.26.0\", \"numpy\", \"joblib\", \"scikit-learn\"]}' --address="http://127.0.0.1:8265" -- python ray_train.py


8. Now that our cluster is up and running, write a script to submit a hyperparameter tuning job to our cluster, and submit using the ray CLI.

## Exercise - Part III - Send the model logs to MLFLOW

For this part, you need to include mlflow in your script development so that information regarding your model be logged in an mlflow tracking server.

The solution is in the resources section of JULIE under part3_fight_against_financial_crime_mlflow_solution.