# Setup

The livecodes of the lecture are based on the code used by the students during the challenges.

We will use the **lifecycle lecture** challenge for all the livecodes of the lecture.

Myriad batches:

``` bash
cd mlops-lifecycle-lecture
```

Legacy batches:

``` bash
cd data-challenges/07-ML-Ops/03-Automate-model-lifecycle/00-Lecture-livecode
```

Then use VSCode:

``` bash
code .
```

# Local and Big Query setup

If you do not already have a valid local conf in `~/.lewagon/mlops` and a valid Big Query dataset for the taxifare data, you can use the `Makefile` in order to create them.

Edit the `DATASET_SIZE` and `VALIDATION_DATASET_SIZE` variables in your `.env`, then `direnv allow .` and `direnv reload`.

You will be able to retrieve the latest version of the data using either:
- `make reset_sources_all` in order to reset datasets of all sizes in local disk + Big Query
- `make reset_sources_env` in order to reset datasets of size `DATASET_SIZE` / `VALIDATION_DATASET_SIZE` in local disk + Big Query

# Application parameters

Copy the `.env.sample` to a `.env` file with the default parameters to:
- Load data from Big Query
- Store the experiment in MLflow
- Chunk size set to 100k allows to train faster (single chunk for the training)

``` bash
LOCAL_DATA_PATH=~/.lewagon/mlops/data
LOCAL_REGISTRY_PATH=~/.lewagon/mlops/training_outputs

DATASET_SIZE=10k
VALIDATION_DATASET_SIZE=10k
CHUNK_SIZE=2000

DATA_SOURCE="big query"
MODEL_TARGET=mlflow
PREFECT_BACKEND=local

PROJECT=le-wagon-ds

DATASET=taxifare_dataset

MLFLOW_TRACKING_URI=https://mlflow.lewagon.ai

MLFLOW_EXPERIMENT="lecli #941 wagoncab"
MLFLOW_MODEL_NAME=wagoncab_taxifare_941
```

🚨 `PROJECT`: change to your GCP project id

🚨 `MLFLOW_MODEL_NAME`: must not contain spaces or the prediction will not work

🚨 Do not forget to `direnv allow .` and `direnv reload` whenever required

# MLflow

**interface/main.py**

In [None]:
if __name__ == '__main__':
    preprocess()
    preprocess(source_type='val')
    train()
    pred()
    # evaluate()


**ml_logic/registry.py in `save_model()`**

In [None]:
    if os.environ.get("MODEL_TARGET") == "mlflow":

        print(Fore.BLUE + "\nSave model to mlflow..." + Style.RESET_ALL)

        mlflow_tracking_uri = os.environ.get("MLFLOW_TRACKING_URI")
        mlflow_experiment = os.environ.get("MLFLOW_EXPERIMENT")
        mlflow_model_name = os.environ.get("MLFLOW_MODEL_NAME")

        mlflow.set_tracking_uri(mlflow_tracking_uri)
        mlflow.set_experiment(experiment_name=mlflow_experiment)

        with mlflow.start_run():

            if params is not None:
                mlflow.log_params(params)

            if metrics is not None:
                mlflow.log_metrics(metrics)

            if model is not None:

                mlflow.keras.log_model(keras_model=model,
                                       artifact_path="model",
                                       keras_module="tensorflow.keras",
                                       registered_model_name=mlflow_model_name)

        print("\n✅ data saved to mlflow")

        return


**ml_logic/registry.py in `load_model()`**

In [None]:
    if os.environ.get("MODEL_TARGET") == "mlflow":

        print(Fore.BLUE + "\nLoad model from mlflow..." + Style.RESET_ALL)

        # load model from mlflow
        mlflow.set_tracking_uri(os.environ.get("MLFLOW_TRACKING_URI"))

        mlflow_model_name = os.environ.get("MLFLOW_MODEL_NAME")

        stage = "Production"

        model_uri = f"models:/{mlflow_model_name}/{stage}"
        print(f"- uri: {model_uri}")

        try:
            model = mlflow.keras.load_model(model_uri=model_uri)
            print("\n✅ model loaded from mlflow")
        except:
            print(f"\n❌ no model in stage {stage} on mlflow")
            return None

        return model


👉 Train the model:

``` bash
make run_all
```

# Workflow

🚨 In **MLflow**, set the latest trained model in **Production** so that it can be retrieved by `evaluate()`

Create `workflow.py` and fill its content.

👉 Run the code with `python -m taxifare.interface.workflow`

**interface/workflow.py**

In [None]:
from taxifare.interface.main import (preprocess, train, evaluate)

from colorama import Fore, Style


def eval_perf(next_row):

    # evaluate latest production model on new data
    past_perf = evaluate()

    print(Fore.GREEN + "\n🔥 Ran task: EVAL PERF:" + Style.RESET_ALL
          + f"\n- Past model performance: {past_perf}")

    return past_perf


def train_model(next_row):

    # preprocess data chunk by chunk
    preprocess()
    preprocess(source_type="val")

    # train model chunk by chunk
    new_perf = train()

    print(Fore.GREEN + "\n🔥 Ran task: TRAIN MODEL:" + Style.RESET_ALL
          + f"\n- New model performance: {new_perf}")

    return new_perf


def notify(past_perf, new_perf):

    print(Fore.GREEN + f"\n🔥 Run task: NOTIF" + Style.RESET_ALL
          + f"\n- Past performance: {past_perf}"
          + f"\n- New performance: {new_perf}")


if __name__ == "__main__":

    next_row = 0

    # evaluate the performance of the past model
    past_perf = eval_perf(next_row)

    # retrain the model with new lines
    new_perf = train_model(next_row)

    # print results
    notify(past_perf, new_perf)


# Prefect

INCREMENTAL **interface/workflow.py**

In [None]:
import datetime
import os

from prefect import task, Flow
from prefect.schedules import IntervalSchedule
from prefect.executors import LocalDaskExecutor

@task
...


def build_flow(schedule):

    with Flow(name="wagonwab taxifare workflow", schedule=schedule) as flow:

        ...

    return flow


if __name__ == "__main__":

    schedule = None
    # schedule = IntervalSchedule(interval=datetime.timedelta(minutes=300))

    flow = build_flow(schedule)

    flow.visualize()

    flow.run()
    # flow.executor = LocalDaskExecutor()
    # flow.register("wagoncab project")


🚨 Set prefect backend to local server instead of cloud

``` bash
prefect backend server
```

🚨 Start **Docker**, then start the prefect backend in a new shell:

``` bash
prefect server start \
    --postgres-port 5433 \
    --ui-port 8088
```

🚨 Start the prefect agent in yet another shell

``` bash
prefect agent local start
```

🚨 Create prefect project

``` bash
prefect create project "wagoncab project"
```

👉 Run with `python -m taxifare.interface.workflow`

👉 See the results in `http://localhost:8088` or on the appropriate address

FULL **interface/workflow.py**

In [None]:
from taxifare.interface.main import (preprocess, train, evaluate)

from colorama import Fore, Style

import datetime
import os

from prefect import task, Flow
from prefect.schedules import IntervalSchedule
from prefect.executors import LocalDaskExecutor


@task
def eval_perf(next_row):

    # evaluate latest production model on new data
    past_perf = evaluate()

    print(Fore.GREEN + "\n🔥 Ran task: EVAL PERF:" + Style.RESET_ALL +
          f"\n- Past model performance: {past_perf}")

    return past_perf


@task
def train_model(next_row):

    # preprocess data chunk by chunk
    preprocess()
    preprocess(source_type="val")

    # train model chunk by chunk
    new_perf = train(stage="Production")

    print(Fore.GREEN + "\n🔥 Ran task: TRAIN MODEL:" + Style.RESET_ALL +
          f"\n- New model performance: {new_perf}")

    return new_perf


@task
def notify(past_perf, new_perf):

    print(Fore.GREEN + f"\n🔥 Run task: NOTIF" + Style.RESET_ALL +
          f"\n- Past performance: {past_perf}" +
          f"\n- New performance: {new_perf}")


def build_flow(schedule):

    with Flow(name="wagonwab taxifare workflow", schedule=schedule) as flow:

        next_row = 0

        # evaluate the performance of the past model
        past_perf = eval_perf(next_row)

        # retrain the model with new lines
        new_perf = train_model(next_row)

        # print results
        notify(past_perf, new_perf)

    return flow


if __name__ == "__main__":

    # schedule = None
    schedule = IntervalSchedule(interval=datetime.timedelta(minutes=300))

    flow = build_flow(schedule)

    flow.visualize()

    # flow.run()
    flow.executor = LocalDaskExecutor()
    flow.register("wagoncab project")
