# Monitoring ML Training Pipeline: Orchestration

**Quick recap:**
- Goal: 
    - Building a classication model for loan eligibility that predicts whether a loan is to be given or refused
    - monitor data drifts and model drifts
- Download raw data: `raw/12196ecaa65e4831987aee4bfced5f60_2015-01-01_2015-05-31.csv`
- Preprocessed the data into:
    - training dataset: `preprocessed/12196ecaa65e4831987aee4bfced5f60.csv`
    - test dataset: `preprocessed/12196ecaa65e4831987aee4bfced5f60.csv`

- Trained and deployed the model
    - JobID: 12196ecaa65e4831987aee4bfced5f60
    - Missing values: 12196ecaa65e4831987aee4bfced5f60_missing_values_model.pkl
    - Purpose to Integer: 12196ecaa65e4831987aee4bfced5f60_purpose_to_int_model.json
    - Prediction model: 12196ecaa65e4831987aee4bfced5f60_rf.pkl

**Next steps:**
- orchestration of the monitoring steps and process
    - Install Docker
    - Setup Airflow in a docker container
    - Create an run Airflow DAG
    - Integrate Airflow DAG with Slack


<div>
<img src="../images/monitored_pipeline_slack_integration.png" width="850"/>
</div>

## I. [Install Docker](https://docs.docker.com/get-docker/)
## II. Deploy a docker container for Airflow
- Folder structuring
```
├── dags
│   ├── data
│   │   ├── preprocessed
│   │   └── raw
│   ├── models
│   ├── results
│   └── src
docker-compose.yaml
├── jobs
├── logs
└── plugins
```
- Services
- UI
## III. Prepare slack for integration with Airflow
- Create slack app and generate token: https://api.slack.com/apps
- Create slack connection in Airflow
- Test Airflow-Slack integration
## IV. Create training dag


In [8]:
import datetime
import sys
import os
import json
import re
import pickle
import traceback
import pandas as pd
import numpy as np


##### queries.py #####
CREATE_TABLE_ML_JOB = """
create table if not exists mljob (
    id serial primary key,
    job_id varchar(36) not null,
    job_type varchar(36), -- training, inference
    job_date date, 
    stage varchar(36) not null, -- etl, preprocess, training, deploy, predicting
    status varchar(36) not null, -- started, pass, failed
    message text not null, -- e.g: exceptions
    created_at timestamp not null default now()
);
"""

LOG_ACTIVITY = """
    insert into mljob (
        job_id,
        job_type,
        job_date,
        stage,
        status,
        message
    ) values ('{job_id}', '{job_type}', '{job_date}', '{stage}', '{status}', '{message}')
"""

GET_LATEST_DEPLOYED_JOB_ID = """
    select job_id from mljob
    where status = '{status}' and job_type = 'training' and stage = 'deploy'
    order by created_at desc
    limit 1
"""

##### helpers.py #####
# ---> job handlers
def generate_uuid() -> str:
    """
    Generate a random UUID.
    :return: str
    """
    return str(uuid.uuid4()).replace("-", "")

# ---> db handlers
def log_activity(job_id:str, job_type:str, stage:str, status:str, message:str, job_date:datetime.date=None):
    """
    Logs the activity of a job.
    :param job_id: str
    :param job_type: str
    :param stage: str
    :param status: str
    :param message: str
    :param job_date: datetime.date
    :return: None
    """
    assert stage in config.STAGES, f"[ERROR] Stage `{stage}` is not valid! Choose from {config.STAGES}"
    assert status in config.STATUS, f"[ERROR] Status `{status}` is not valid! Choose from {config.STATUS}"
    assert job_type in config.JOB_TYPES, f"[ERROR] Job type `{job_type}` is not valid! Choose from {config.JOB_TYPES}"
    message = message.replace("'", "\\")
    engine.execute(text(queries.LOG_ACTIVITY.format(job_id=str(job_id), job_type=job_type, stage=str(stage), status=str(status), message=message, job_date=job_date)).execution_options(autocommit=True))
    print(f"[INFO] Job {job_id} logged as {job_type}::{stage}::{status}::{message}")

    
def get_latest_deployed_job_id(status:str="pass") -> str:
    """
    Get the latest deployed job id by looking for the latest of all jobs with stage `deploy` and the specified status.
    :param status: str
    :return: str
    """
    try:
        return json.load(open(os.path.join(config.PATH_DIR_MODELS, "deploy_report.json"))).get("job_id")
    except Exception as e:
        assert status in config.STATUS, f"[ERROR] Status `{status}` is not valid! Choose from {config.STATUS}"
        query = text(queries.GET_LATEST_DEPLOYED_JOB_ID.format(status=status))
        r = pd.read_sql(query, engine)
        if r.shape[0] == 0:
            return None
        return str(r['job_id'].values[0])

# ---> files handlers
def load_dataset(path:str) -> pd.DataFrame:
    """
    Load data set.
    :param path: str
    :return: DataFrame
    """
    return pd.read_csv(path)


def locate_raw_data_filename(job_id:str) -> str:
    """
    Locate the raw data file.
    :param job_id: str
    :return: str
    """

    files = glob(os.path.join(config.PATH_DIR_DATA, "raw", f"{job_id}_*.csv"))
    if len(files) == 0:
        print(f"[WARNING] No raw data file found for job_id : {job_id}.")
        return None
    return files[0]

def locate_preprocessed_filenames(job_id:str) -> dict:
    """
    Locate the preprocessed data files.
    :param job_id: str
    :return: dict
    """
    files = sorted(glob(os.path.join(config.PATH_DIR_DATA, "preprocessed", f"{job_id}_*.csv")))
    if len(files) == 0:
        raise(Exception(f"No preprocessed data file found for job_id : {job_id}."))
    elif len(files) > 2:
        raise(Exception(f"More than one preprocessed data file found for job_id : {job_id} ->\n{files}"))
    elif len(files) == 1:
        training_filename = None
        inference_filename = list(filter(lambda x: "inference" in x, files))[0]
        return training_filename, inference_filename
    else:
        training_filename = list(filter(lambda x: "training" in x, files))[0]
        inference_filename = list(filter(lambda x: "inference" in x, files))[0]
        return training_filename, inference_filename

def get_model_type(job_id:str) -> str:
    """
    Get the type of a model.
    :param job_id: str
    :return: str
    """
    report_filename = os.path.join(config.PATH_DIR_MODELS, f"{job_id}_train_report.json")
    return json.load(open(report_filename, "r"))["final_model"]

def load_model_from_pickle(model_name: str):
    """
    Load a pickle model.
    :param model_name: str
    :return: model
    """
    with open(os.path.join(config.PATH_DIR_MODELS, model_name+".pkl"), "rb") as f:
        return pickle.load(f)
    