In [None]:
import time
import subprocess
import sys
import threading
from queue import Queue, Empty

from functools import partial

import mlflow
import mlflow.sklearn

from cuml.metrics.accuracy import accuracy_score
from cuml.preprocessing.model_selection import train_test_split
from cuml.ensemble import RandomForestClassifier

### Pull sample airline data

In [None]:
!wget -N https://rapidsai-cloud-ml-sample-data.s3-us-west-2.amazonaws.com/airline_small.parquet

### Define data loader, using cuDF

In [None]:
def load_data(fpath):
    """
    Simple helper function for loading data to be used by CPU/GPU models.

    :param fpath: Path to the data to be ingested
    :return: DataFrame wrapping the data at [fpath]. Data will be in either a Pandas or RAPIDS (cuDF) DataFrame
    """
    import cudf

    df = cudf.read_parquet(fpath)
    X = df.drop(["ArrDelayBinary"], axis=1)
    y = df["ArrDelayBinary"].astype('int32')
    
    return train_test_split(X, y, test_size=0.2)

### Define our training routine.

In [None]:
def train(fpath, max_detph, max_features, n_estimators):
    """
    :param fpath: Path or URL for the training data used with the model.
    :max_detph: int Max tree depth
    :max_features: float percentage of features to use in classification
    :n_estimators: int number of trees to create
    :return: Trained Model
    """
    X_train, X_test, y_train, y_test = load_data(fpath)
    mod = RandomForestClassifier(max_depth=max_depth, max_features=max_features, n_estimators=n_estimators)
    acc_scorer = accuracy_score

    mod.fit(X_train, y_train)
    preds = mod.predict(X_test)
    acc = acc_scorer(y_test, preds)

    mlparams = {"max_depth": str(max_depth),
                "max_features": str(max_features),
                "n_estimators": str(n_estimators),
                }
    mlflow.log_params(mlparams)

    mlmetrics = {"accuracy": acc}
    mlflow.log_metrics(mlmetrics)

    return mod

### Implement our MLFlow training loop, and save our best model to the tracking server.

In [None]:
conda_env = f'conda.yaml'
fpath     = f'airline_small.parquet'

max_depth = 10
max_features = 0.75
n_estimators = 500

with mlflow.start_run():
    mlflow.set_tag("mlflow.runName", "RAPIDS-MLFlow")
    
    model = train(fpath, max_depth, max_features, n_estimators)
    
    mlflow.sklearn.log_model(model,
                             artifact_path="RAPIDS-MLFlow",
                             conda_env='conda.yaml')

### Model Serving
Basic file based storage does not support model registry features, so we will need to manually identify our saved model for serving. When running with a database backed MLFLow Tracking sever, model identification can be done using the [MlFlow Tracking Client](https://www.mlflow.org/docs/latest/tracking.html).

1. Run `mlflow ui` and connect to `localhost:5000`
1. Identify the model we just created, labeled 'RAPIDS-MLFlow'
    1. Select the model
    1. Select Artifacts
    1. Copy 'Full Path'

In [None]:
full_path = 'file:///[PATH TO LOCAL MLFLOW RUN]'

### Helper to track our server output.

In [None]:
def queue_descriptor_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

def follow_subprocess(cmd, timeout=1000, line_timeout=60.00):
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    q = Queue()
    t = threading.Thread(target=queue_descriptor_output, args=(p.stdout, q))
    t.daemon = True
    t.start()

    elapsed = 0
    line_elapsed = 0
    last_line_time = time.perf_counter()
    while (p.poll() is None and elapsed < timeout and line_elapsed < line_timeout):
        try:
            time.sleep(2)
            elapsed += 2
            while (True):
                line = q.get(timeout=0.1)
                line_elapsed = 0
                last_line_time = time.perf_counter()
                sys.stdout.write(line.decode())

        except Empty:
            line_elapsed = (time.perf_counter() - last_line_time)
        except KeyboardInterrupt:
            sys.stderr.write("\nCaught ctrl+c, killing subprocess ({})\n".format(' '.join(cmd)))
            p.kill()
            raise

    try:
        p.kill()
    except:
        pass

    t.join(2)

    ## Drain any remaining text
    try:
        while (True):
            line = q.get(timeout=0.1)
            sys.stdout.write(line)

    except Empty:
        pass

### Begin serving our trained model using MLFlow
**Note:** The serving thread will continue to run after cell execution. Select the cell and click 'interrupt the kernel' to stop it.

In [None]:
port = 55755
host = 'localhost'

command = f"mlflow models serve --no-conda -m {full_path} -p {port} -h {host}".split()
kwargs = { "cmd": command, "timeout":float('Inf'), "line_timeout": float('Inf') }

threading.Thread(target=follow_subprocess, kwargs=kwargs).start()

### Make requests against the deployed model

In [None]:
import json
import requests

headers = {
    "Content-Type": "application/json",
    "format": "pandas-split"
}

data = { 
    "columns": ["Year", "Month", "DayofMonth", "DayofWeek", "CRSDepTime", "CRSArrTime", "UniqueCarrier", "FlightNum", "ActualElapsedTime", "Origin", "Dest", "Distance", "Diverted"],
    "data": [[1987, 10, 1, 4, 1, 556, 0, 190, 247, 202, 162, 1846, 0]]
}

## Pause to let server start
time.sleep(5)

while (True):
    try:
        resp = requests.post(url=f"http://{host}:{port}/invocations", data=json.dumps(data), headers=headers)
        print(f'Classification: {"ON-Time" if resp.text == "[0.0]" else "LATE"}')
        break
    except Exception as e:
        errmsg = f"Caught exception attempting to call model endpoint: {e}"
        print(f"{errmsg}", end='')
        print(f"Sleeping")
        time.sleep(20)