In [1]:
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
import sklearn.model_selection

  from numpy.core.umath_tests import inner1d


## 1 Use a Databricks Connect enable environment

1) Setup databricks connect

- Cluster Spark configuration (Azure port 8787)

    ```
    spark.databricks.mlflow.trackMLlib.enabled true
    spark.databricks.service.server.enabled true
    spark.databricks.service.port 8787
    ```
    
    
- Laptop:

    ```bash
    pip install databricks-connect

    cat ~/.databricks-connect
    {
      "port": "8787",
      "org_id": "......",
      "token": "dapi......",
      "host": "https://......azuredatabricks.net",
      "cluster_id": "....."
    }
    ```
    
2) **Ensure** that `scikit-learn` on remote cluster is the same as got installed when installing `spark-sklearn`

3) Check connectivity: Call remote and get the hostname of the workers. They should start with the cluster name as given by `~/.databricks-connect`

In [None]:
import socket

from pyspark.sql import SparkSession
import spark_sklearn

spark = SparkSession.builder.getOrCreate()

print(spark.version)
spark.sparkContext.range(4).map(lambda x: socket.gethostname()).collect()

## 2 Setup a local Tracking Server

- Local tracking server

    ```bash
    cd /opt/mlflow-tracking-server/
    mkdir -p backend
    mkdir -p artifacts
    mlflow server --backend-store-uri ./backend --default-artifact-root ./artifacts/  --host 0.0.0.0
    ```


- In the project folder (ensure that the local path to artifacts is the same as for the local tracking server)
    
    ```
    ln -s /opt/mlflow-tracking-server/artifacts artifacts
    ```

## 3 Define a cross validation function to switch between local and remote cross validation

In [6]:
import mlflow
import mlflow.sklearn
import os
import pandas as pd
import datetime
import tempfile
import warnings

class GridSearchCV():

    def __init__(self, estimator, param_grid, *args, remote=False, **kwargs):
        self.estimator = estimator
        self.grid_size = reduce(lambda a,b: a*b, [len(p) for p in param_grid.values()])
        self.remote = remote
        self.results = None
        
        if remote:
            self.gs = spark_sklearn.GridSearchCV(spark.sparkContext, estimator, param_grid, *args, **kwargs)
        else:
            self.gs = sklearn.model_selection.GridSearchCV(estimator, param_grid, *args, **kwargs)

    def fit(self, x, y):
        if self.remote:
            print("Remote crossvalidation,", end=" ")
        else:
            print("Local crossvalidation,", end=" ")
        print("paramter grid size: %d\n" % self.grid_size)
        
        self.results = self.gs.fit(x, y)
        return self.results
    
    def log_cv(self, tracking_uri, experiment, name):
        cv_results = self.results.cv_results_
        best = self.results.best_index_
        
        timestamp = datetime.datetime.now().isoformat().split(".")[0].replace(":", ".")

        num_runs = len(cv_results["rank_test_score"])
        run_name = "run %d (best run of %d):" % (self.results.best_index_, num_runs)

        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment)

        with mlflow.start_run(run_name=run_name) as run:

            mlflow.log_param("folds", self.results.cv)

            print("Logging parameters")
            params = list(self.results.param_grid.keys())
            for param in params:
                mlflow.log_param(param, cv_results["param_%s" % param][best])

            print("Logging metrics")
            # mlflow.log_metric("rank_test_score" , cv_results["rank_test_score"][best])
            # mlflow.log_metric("mean_train_score", cv_results["mean_train_score"][best])
            # mlflow.log_metric("std_train_score",  cv_results["std_train_score"][best])
            mlflow.log_metric("mean_test_score",  cv_results["mean_test_score"][best])
            mlflow.log_metric("std_test_score",   cv_results["std_test_score"][best])

            print("Logging model")
            mlflow.sklearn.log_model(self.results.best_estimator_, "model")

            print("Logging CV results matrix")
            tempdir = tempfile.TemporaryDirectory().name
            os.mkdir(tempdir)
            filename = "%s-%s-cv_results.csv" % (name, timestamp)
            csv = os.path.join(tempdir, filename)
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                pd.DataFrame(cv_results).sort_values(by='rank_test_score').to_csv(csv, index=False)
                
            mlflow.log_artifact(csv, "cv_results")


## 4 Load data

In [7]:
from functools import reduce

digits = datasets.load_digits()
X, y = digits.data, digits.target

## 5 Local model development

### Local cross validation

Use a small subset of the paramter space for development

In [8]:
%%time

param_grid = {
    "max_depth": [3, None],
    "max_features": [1, 3],
    "min_samples_split": [2, 10],
    "min_samples_leaf": [1, 10],
    "n_estimators": [10, 20]
}

cv = GridSearchCV(RandomForestClassifier(), param_grid, remote=False)
cv.fit(X,y)

Local crossvalidation, paramter grid size: 32

CPU times: user 3.24 s, sys: 67 ms, total: 3.3 s
Wall time: 3.36 s


### Local tracking

Only relevant for the model development, e.g. feature selection and transformation. No need to share with others.

In [10]:
cv.log_cv(
    tracking_uri="http://localhost:5000", 
    experiment="digits-spark-sklearn", 
    name="digits-01")

Logging parameters
Logging metrics
Logging model
Logging CV results matrix


## 6 Final cross validation

### Remote cross validation 

Increase the parameter grid to the maximum that is of interest

In [12]:
%%time

param_grid = {
    "max_depth": [3, None],
    "max_features": [1, 3, 10],
    "min_samples_split": [2, 5, 10],
    "min_samples_leaf": [1, 3, 10],
    "bootstrap": [True, False],
    "criterion": ["gini", "entropy"],
    "n_estimators": [10, 20, 40, 80]
}

cv = GridSearchCV(RandomForestClassifier(), param_grid, remote=True)
cv.fit(X,y)

Remote crossvalidation, paramter grid size: 864

CPU times: user 351 ms, sys: 21.4 ms, total: 373 ms
Wall time: 12.9 s


### Remote tracking

In [14]:
cv.log_cv(
    tracking_uri="databricks://westeu", 
    experiment="/Shared/experiments/digits-spark-sklearn", 
    name="digits-01")

Logging parameters
Logging metrics
Logging model
Logging CV results matrix
