# faculty-distributed

## Example usage
In this example, we will train an XGBoost model on the Iris dataset. We will perform a grid search of the hyperparameters `learning_Rate` and `max_depth` distributed over mutilple servers using the Jobs functionality of the Faculty Platform. The accuracy metric of each classifier will be logged in the Experiments tab of Faculty Platform.

In [None]:
import os
import itertools
import tempfile
import shutil

import mlflow
import numpy as np
import xgboost as xgb
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import faculty_distributed
from faculty_distributed.utils import job_name_to_job_id
from faculty import client

### Create Environments 

First, we need to create evironments that install the faculty-distributed package and the xgboost package. It is recommended that the faculty-distributed package is installed through it's own environment, as it can then be kept clean for use on any distributed job. To create these environments, go to the Environments tab of the Faculty Platform and create two new environments called `faculty_distributed` and `xgboost`. In `faculty_distributed` copy and paste 
```bash
pip install -U git+ssh://git@github.com/facultyai/faculty-distributed.git
```
in the scripts box. In `xgboost`, enter `xgboost` where it says enter package name, and hit return.


### Create Job 

Next, create a new job definition named `distributed_example`. In the `COMMAND` section, paste the following:

```bash
faculty_distributed_job $path $worker_id
```

Then, add a `PARAMETER` with the name `path`, of type `text` and ensure that the `Make field mandatory` box is checked. Create another `PARAMETER` named `worker_id` of type `text` and ensure that the `Make field mandatory` box is checked.

Finally, under `SERVER SETTINGS`, add `faculty_distributed` to the `ENVIRONMENTS` section. Note that any libraries used in the function to be executed that are not installed automatically on Faculty servers need to be installed on the job server via a separate environment, so here we also need to add the `xgboost` environment we created earlier.

Depending on the level of parallelisation required and how long each function takes to run it may be better to use dedicated rather than shared instances. To achieve this, click on `Large and GPU servers` under `SERVER RESOURCES`, and select an appropriate server type from the dropdown menu.

Remember to click `SAVE` when you are finished.

### Get project and job IDs 

The project ID is stored as an environment variable. The job ID can be found using the `job_name_to_job_id` method.

In [None]:
project_id = os.getenv("FACULTY_PROJECT_ID")
job_id = faculty_distributed.utils.job_name_to_job_id("distributed_example")

### Load data 

Now, create temporary directory to save the Iris data as binary files during this example notebook. This directory will be deleted at the end of the notebook. (Note: it is not necessary to save files in binary format, but it is convenient in this example as xgboost uses its own `DMatrix` data structure.)

In [None]:
path = tempfile.mkdtemp(prefix="/project/temp-data-")

data = load_iris()
x_train, x_test, y_train, y_test = train_test_split(data.data, data.target)

xg_train = xgb.DMatrix(x_train, label=y_train)
xg_test = xgb.DMatrix(x_test, label=y_test)

xg_train.save_binary(os.path.join(path, "train.buffer"))
xg_test.save_binary(os.path.join(path, "test.buffer"))

### Define function to be executed on distributed workers 

Here we define a function to train an XGBoost classifier. The inputs to this function are the path to where the data is saved and the two parameters over which we will be searching over. These parameters are the learning rate and the max_depth of the tree. The function logs the accuracy of the model and outputs the predictions made by this classifier.

In [None]:
def train_and_predict(datapath, learning_rate, max_depth):
    """Train an XGBoost Classifer and return predicted classes.
    
    Parameter values and accuracy as logged in mlflow.
    
    Parameters
    ----------
    datapath: str
        path from which data will be read
    learning_rate: float
        learning rate for XGB Classifier
    max_depth: int
        maximum depth of XGB tree
    
    Returns
    -------
    predictions: list
        list of predicted classes evaluated on test set
    """
    mlflow.set_experiment("Iris XGB classifier")
    with mlflow.start_run():
        mlflow.log_param("learning_rate", str(learning_rate))
        mlflow.log_param("max_depth", str(max_depth))
        
        xg_train = xgb.DMatrix(os.path.join(datapath, "train.buffer"))
        xg_test = xgb.DMatrix(os.path.join(datapath, "test.buffer"))

        param = {}
        param['objective'] =  'multi:softmax'
        param['num_class'] = len(np.unique(y_train))
        param['learning_rate'] = learning_rate
        param['max_depth'] = max_depth

        bst = xgb.train(param, xg_train)
        predictions = bst.predict(xg_test)
        
        mlflow.log_metric("accuracy", accuracy_score(y_test, predictions))
        return predictions

### Define arguments list to be sent to each worker 

Give list of arguments to for function to run. Here we are performing a grid search of hyperparameters. The path to the data is also passed. (Note: parameter values are chosen here to deliberately produce different accuracy scores)

In [None]:
learning_rates = [0.1, 1, 5]
max_depths = [1,4,8]

args_list = [[path, *x] for x in list(itertools.product(learning_rates, max_depths))]
args_list

### Initiate `faculty_distributed.FacultyJobExecutor` class

`faculty_distributed.FacultyJobExecutor` requires project id and job id to run. Optional arguments are clean, a boolean [default = True] that determines whether temporary files created for the run are deleted immediately after the completion of the job, and tmpdir_prefix, a string [default = '/project/.faculty-distributed'] that defines the path to where the temporary directory is created. 

In [None]:
fje = faculty_distributed.FacultyJobExecutor(project_id, job_id)

### Execute function on distributed workers

Call `map`, passing the function and the list of arguments, to execute the function. Once map has been called, a job will start with as many subruns as there are arguments passed. The output of these subruns will be returned as a list.


In [None]:
predictions = fje.map(train_and_predict, args_list)

### Analyse results 

Predictions are returned in the order of the aguments list. 

In [None]:
predictions

In [None]:
[accuracy_score(y_test, prediction) for prediction in predictions]

### Remove temporary data path created for this example

In [None]:
shutil.rmtree(path)