<div style="align: center;">
    <br>
    <img src="https://www.nyc.gov/assets/tlc/images/content/hero/MRP-Closing-Week.jpg" style="display:block; margin:auto; width:65%; height:250px;">
</div><br><br> 

<div style="letter-spacing:normal; opacity:1.;">
<!--   https://xkcd.com/color/rgb/   -->
  <p style="text-align:center; background-color: lightsalmon; color: Jaguar; border-radius:10px; font-family:monospace; 
            line-height:1.4; font-size:32px; font-weight:bold; text-transform: uppercase; padding: 9px;">
            <strong>TLC Trip Record Data</strong></p>  
  
  <p style="text-align:center; background-color:romance; color: Jaguar; border-radius:10px; font-family:monospace; 
            line-height:1.4; font-size:22px; font-weight:normal; text-transform: capitalize; padding: 5px;"
     >Machine Learning Module: PREFECT - Ride Duration Prediction using Regression Analysis<br>( MLFLOW & PREFECT )</p>    
</div>

- https://github.com/discdiver/prefect-mlops-zoomcamp
- https://mlflow.org/docs/0.7.0/index.html

**Dataset Info**


**Context**

Yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data used in the attached datasets were collected and provided to the NYC Taxi and Limousine Commission (TLC) by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs (TPEP/LPEP). The trip data was not created by the TLC, and TLC makes no representations as to the accuracy of these data.

For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are generated from the FHV Trip Record submissions made by bases. Note: The TLC publishes base trip record data as submitted by the bases, and we cannot guarantee or confirm their accuracy or completeness. Therefore, this may not represent the total amount of trips dispatched by all TLC-licensed bases. The TLC performs routine reviews of the records and takes enforcement actions when necessary to ensure, to the extent possible, complete and accurate information.


**ATTENTION!**

On 05/13/2022, we are making the following changes to trip record files:

- All files will be stored in the PARQUET format. Please see the ‘Working With PARQUET Format’ under the Data Dictionaries and MetaData section.
- Trip data will be published monthly (with two months delay) instead of bi-annually.
- HVFHV files will now include 17 more columns (please see High Volume FHV Trips Dictionary for details). Additional columns will be added to the old files as well. The earliest date to include additional columns: February 2019.
- Yellow trip data will now include 1 additional column (‘airport_fee’, please see Yellow Trips Dictionary for details). The additional column will be added to the old files as well. The earliest date to include the additional column: January 2011.


**Download the data for January and February 2023**

Dataset: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page


**Data Dictionaries and MetaData**

- We'll use the same `NYC taxi dataset`, but instead of "Yellow Taxi Trip Records", we'll use `"Green Taxi Trip Records"`.

> `Green Trips Data Dictionary`: https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf

**TASK**

The goal of this homework is to familiarize users with workflow orchestration. 

Start with the orchestrate.py file in the 03-orchestration/3.4 folder
of the course repo: https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/03-orchestration/3.4/orchestrate.py<br>

Questions: https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/cohorts/2023/03-orchestration/homework.md

- https://sagarthacker.com/posts/mlops/intro_workflow_orchestration.html
- https://sagarthacker.com/posts/mlops/prefect-blocks.html
- https://sagarthacker.com/posts/mlops/prefect-deployment.html


**Table of Content**


1. Import Libraries and Ingest Data
    - Q1. Human-readable name<br>    
2. Recognizing and Understanding Data
    - Q2. Cron<br>
    

<div style="letter-spacing:normal; opacity:1.;">
  <h1 style="text-align:center; background-color: lightsalmon; color: Jaguar; border-radius:10px; font-family:monospace; border-radius:20px;
            line-height:1.4; font-size:32px; font-weight:bold; text-transform: uppercase; padding: 9px;">
            <strong>1. Import Libraries & Ingest Data</strong></h1>   
</div>

> ⚠️ Not Recommended conda `base` env, work on `venv`

- https://docs.conda.io/projects/conda/en/4.6.0/_downloads/52a95608c49671267e40c689e0bc00ca/conda-cheatsheet.pdf

```
pip freeze > requirements.txt
conda list -e > requirements.txt

# new conda virtual environment
conda create --name "prefect-ops" python=3.10 jupyter -y
conda activate "prefect-ops"

# install all package dependencies
pip install -r requirements.txt
conda install -c conda-forge --file=requirements.txt      # mostly not work
conda install -c conda-forge pandas==2.0.2 -q -y

# if The environment is inconsistent, try below
conda update -n base -c defaults conda --force-reinstall
conda install anaconda --force-reinstall

```

**You must use the `--no-deps` option in the pip install command in order to avoid bundling dependencies into your conda-package.**

If you run pip install without the `--no-deps` option, pip will often install dependencies in your conda recipe and those dependencies will become part of your package. This wastes space in the package and `increases the risk of file overlap`, file clobbering, and broken packages.

There might be cases where you want to install a package directly from a local directory or a specific location, without relying on the package indexes. In such situations, you can use the `--no-index` option to tell pip not to look for the package in any indexes.

```
- command1 & command2  # runs simultaneously
- command1 ; command2  # runs sequentially
- command1 && command2 # runs sequentially, runs command2 only if command1 succeeds
- command1 || command2 # runs sequentially, runs command2 only if command1 fails
```

In [1]:
# # %%capture cap --no-stderr  # capture outputs  # cap.show()
# !cat /etc/os-release
# !grep -E -w 'VERSION|NAME|PRETTY_NAME' /etc/os-release

In [2]:
# check enviroment
# !conda env list
# !conda info -e
# !conda info | grep 'active env'

In [3]:
%%writefile requirements.txt 
# To get started with MLflow you'll need to install the appropriate Python package.

pandas==2.0.2
orjson==3.9.1          # orjson is a fast, correct JSON library
seaborn==0.12.2

# ML Model packages
scikit-learn==1.2.2
xgboost==1.7.3

# MLOPS packages
mlflow==2.4.1
wandb==0.15.4
prefect==2.10.18
prefect-email==0.2.2
prefect-aws==0.3.4

# Optionally
black==23.3.0          # code style

# ML Model packages
hyperopt==0.2.7

# for parquet file
pyarrow==11.0.0
fastparquet==2023.4.0

# Optionally
# jupyter
# ipykernel
# ipywidgets

Overwriting requirements.txt


In [4]:
import os, sys, platform, IPython.display

# !{sys.executable} -m pip install -Uq -r requirements.txt  #  --no-deps --no-cache-dir --force-reinstall --no-index
!jupyter nbextension enable --py widgetsnbextension

# IPython.display.clear_output()
print("Python  :", sys.version)
print("Platform:", platform.system(), platform.platform())
print("Actv Env:", os.environ['CONDA_DEFAULT_ENV'])

Enabling notebook extension jupyter-js-widgets/extension...
      - Validating: [32mOK[0m
Python  : 3.10.11 (main, May 16 2023, 00:28:57) [GCC 11.2.0]
Platform: Linux Linux-5.15.90.1-microsoft-standard-WSL2-x86_64-with-glibc2.35
Actv Env: prefect-ops


In [5]:
import os
# Get the current working directory
# current_dir = os.getcwd()

# Create a new directory for storing MLflow data
os.makedirs('./data', exist_ok=True)
os.makedirs('./pycode', exist_ok=True)
os.makedirs('./models', exist_ok=True)

In [6]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

import scipy
from scipy.stats import stats

import sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import mean_squared_error
import xgboost as xgb

import os
import wandb
import mlflow
import pickle
import pathlib
import argparse
from glob import glob
from tqdm import tqdm
tqdm._instances.clear()

from prefect import task, flow, Flow
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact

# memory management performs garbage collection 
import gc
gc.collect()

57

<div style="letter-spacing:normal; opacity:1.;">
  <h1 style="text-align:center; background-color: lightsalmon; color: Jaguar; border-radius:10px; font-family:monospace; border-radius:20px;
            line-height:1.4; font-size:32px; font-weight:bold; text-transform: uppercase; padding: 9px;">
            <strong>2. Recognizing and Understanding Data</strong></h1>   
</div>

## Ingest Data [wget](https://linuxways.net/centos/linux-wget-command-with-examples/) or [curl](https://daniel.haxx.se/blog/2020/09/10/store-the-curl-output-over-there/)

In [7]:
# "Green Taxi Trip Records" Download the data for January, February and March 2022
# !wget -q -N -P "./data" https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet
# !wget -q -N -P "./data" https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-02.parquet
# !wget -q -N -P "./data" https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-03.parquet

In [8]:
glob(f'./data/*.parquet')

['./data/green_tripdata_2023-01.parquet',
 './data/green_tripdata_2023-02.parquet',
 './data/green_tripdata_2023-03.parquet',
 './data/green_tripdata_2023-04.parquet']

## Q1. Human-readable name

You’d like to give the first task, `read_data` a nicely formatted name.
How can you specify a task name?

> Hint: look in the docs at https://docs.prefect.io or 
> check out the doc string in a code editor.

- `@task(retries=3, retry_delay_seconds=2, name="Read taxi data")`
- `@task(retries=3, retry_delay_seconds=2, task_name="Read taxi data")`
- `@task(retries=3, retry_delay_seconds=2, task-name="Read taxi data")`
- `@task(retries=3, retry_delay_seconds=2, task_name_function=lambda x: f"Read taxi data")`

**How can you specify a task name?**
- https://docs.prefect.io/2.10.13/concepts/tasks/
- @task(retries=3, retry_delay_seconds=2, name="Read taxi data")

## Q2. Cron

Cron is a common scheduling specification for workflows. 

Using the flow in `orchestrate.py`, create a deployment.
Schedule your deployment to run on the third day of every month at 9am UTC.

**What’s the cron schedule for that?**

components of the cron expression:

- The first field, 0, represents the minute of the hour. In this case, it is set to 0, meaning the deployment will run at the start of the hour.
- The second field, 9, represents the hour of the day. It is set to 9, indicating that the deployment will run at 9am.
- The third field, 3, represents the day of the month. This field is set to 3, which means the deployment will run specifically on the third day of each month.
- The fourth field, *, represents the month. It is set to *, indicating that the deployment will run every month.
- The fifth field, *, represents the day of the week. It is also set to *, meaning that the deployment will run regardless of the day of the week.

Therefore, the cron schedule for running the deployment on the third day of every month at 9am UTC is:
- `0 9 3 * *`

## Q3. RMSE 

Download the January 2023 Green Taxi data and use it for your training data.
Download the February 2023 Green Taxi data and use it for your validation data. 

Make sure you upload the data to GitHub so it is available for your deployment.

Create a custom flow run of your deployment from the UI. Choose Custom
Run for the flow and enter the file path as a string on the JSON tab under Parameters.

Make sure you have a worker running and polling the correct work pool.

View the results in the UI.

**What’s the final RMSE to five decimal places?**

mlflow-remote code:
- https://www.mlflow.org/docs/latest/tracking.html#how-runs-and-artifacts-are-recorded
- Run Terminal: 

```
# if using port 5000 or use another port
kill $(lsof -ti :5000)   # clear port 5000

# after set_experiment (building mlflow.db), go to bash cd path to (mlruns ant mlflow.db) folder
mlflow ui \
    --backend-store-uri sqlite:///mlflow.db \
    --default-artifact-root  file:mlruns \
    --host localhost --port 5000   
```

<br>

![prefec](https://docs.prefect.io/latest/img/concepts/flow-deployment-end-to-end.png)

Check Data
```sh
git remote -v
```

**Step 1: Create a Project in a New Directory**

- .prefectignore File any Github-Gitlab
```sh
prefect init
prefect project init # deprecated
```

**Step 2: Add an @flow decorator to your code's entrypoint function, give it a name and Save it to the New Directory**

- like orchestrate.py, so lets rewrite and edit orchestrate.py then push it


**Step 3: Orchestration Enviroment (Server Runs or Prefect Cloud)**

- Start the Prefect server locally

Create another window and activate your conda environment. Start the Prefect API server locally with
Start and Open Source Prefect Server
```sh
prefect server start
```

- Or Login to Prefect Cloud
```sh
prefect cloud login
```

**Step 4: Execution Enviroment (Deployed! Flows Run)**

- Start a worker that polls your work pool
```sh
# prefect worker start -p "zoom_mlops_pool" -t process --limit 2
prefect worker start --pool 'zoom_mlops_pool'
```

**Step 5: Deploy your Flow**

- Start a worker that polls your work pool
```sh
# prefect deploy ls
# prefect deploy -n zoom_mlops_deployment
prefect deploy "./pycode/orchestrate.py:main_flow" -n "zoom_mlops_deployment" -p "zoom_mlops_pool"
# prefect deploy --all  # or 
```

**Step 6: Start a run of the deployed flow from the CLI...**

- Start a worker that polls your work pool
```sh
prefect deployment ls
prefect deployment run 'Main Flow/zoom_mlops_deployment'
```

**Check...**

- another pools
```sh
prefect block ls
prefect block type ls
```

```sh
prefect block register -m prefect_aws 
```


In [11]:
%%writefile ./pycode/orchestrate.py

# Source: https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/03-orchestration/3.4/orchestrate.py

import os
import click
import pickle
import pathlib
import argparse
import requests
import urllib.request
from glob import glob
from datetime import date
from datetime import timedelta

import pandas as pd
import numpy as np
import scipy
import sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import mean_squared_error
import xgboost as xgb

import mlflow
import prefect
from prefect import task, flow
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact

# from prefect_aws import S3Bucket
# from prefect_email import EmailServerCredentials, email_send_message

import warnings
# Ignore all warnings
# warnings.filterwarnings("ignore")
# Filter the specific warning message, MLflow autologging encountered a warning
# warnings.filterwarnings("ignore", category=UserWarning, module="setuptools")
warnings.filterwarnings("ignore", category=UserWarning, message="Setuptools is replacing distutils.")


@task(name="Fetch Data", cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1),
      retries=3, log_prints=True, )
def fetch_data(raw_data_path: str, year: int, month: int, color: str) -> None:
    """Fetches data from the NYC Taxi dataset and saves it locally"""
    os.makedirs(raw_data_path, exist_ok=True)  

    # Download the data from the NYC Taxi dataset
    url      = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{color}_tripdata_{year}-{month:0>2}.parquet'
    filename = os.path.join(raw_data_path, f'{color}_tripdata_{year}-{month:0>2}.parquet')
    # urllib.request.urlretrieve(url, filename)
    # os.system(f"wget -q -N -P {raw_data_path} {url}")
    
    response = requests.get(url)
    with open(filename, "wb") as f:
        f.write(response.content)
    return None
    
    
@task(name="Read Taxi Data", retries=3, retry_delay_seconds=2, log_prints=None)
def read_data(filename: str) -> pd.DataFrame:
    """Read data into DataFrame"""
    df = pd.read_parquet(filename)

    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime  = pd.to_datetime(df.lpep_pickup_datetime)

    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration    = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical     = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df


@task(name="Add Features Taxi Data", log_prints=True)
def add_features(
    df_train: pd.DataFrame, df_val: pd.DataFrame, df_test: pd.DataFrame
) -> tuple(
    [
        scipy.sparse._csr.csr_matrix,
        scipy.sparse._csr.csr_matrix,
        np.ndarray,
        np.ndarray,
        sklearn.feature_extraction.DictVectorizer,
    ]
):
    """Add features to the model"""
    df_train["PU_DO"] = df_train["PULocationID"] + "_" + df_train["DOLocationID"]
    df_val["PU_DO"]   = df_val["PULocationID"]   + "_" + df_val["DOLocationID"]
    df_test["PU_DO"]  = df_test["PULocationID"]  + "_" + df_test["DOLocationID"]

    categorical = ["PU_DO"]  #'PULocationID', 'DOLocationID']
    numerical   = ["trip_distance"]

    dv = DictVectorizer()

    train_dicts = df_train[categorical + numerical].to_dict(orient="records")
    X_train     = dv.fit_transform(train_dicts)
    y_train     = df_train["duration"].values

    val_dicts   = df_val[categorical + numerical].to_dict(orient="records")
    X_val       = dv.transform(val_dicts)
    y_val       = df_val["duration"].values
    
    test_dicts  = df_test[categorical + numerical].to_dict(orient="records")
    X_test      = dv.transform(test_dicts)
    y_test      = df_test["duration"].values

    return (X_train, y_train), (X_val, y_val), (X_test, y_test), dv


@task(name="Train Best Model", log_prints=True)
def train_best_model(
    X_train  : scipy.sparse._csr.csr_matrix,
    X_val    : scipy.sparse._csr.csr_matrix,
    y_train  : np.ndarray,
    y_val    : np.ndarray,
    dv       : sklearn.feature_extraction.DictVectorizer,
    dest_path: str,
) -> None:
    """train a model with best hyperparams and write everything out"""        
    # Load train and test Data
    train = xgb.DMatrix(X_train, label=y_train)
    valid = xgb.DMatrix(X_val, label=y_val)

    # before your training code to enable automatic logging of sklearn metrics, params, and models
    # mlflow.xgboost.autolog()
    
    with mlflow.start_run():
        # Optional: Set some information about Model
        mlflow.set_tag("developer", "muce")
        mlflow.set_tag("algorithm", "Machine Learning")
        mlflow.set_tag("train-data-path", f'./data/green_tripdata_2023-01.parquet')
        mlflow.set_tag("valid-data-path", f'./data/green_tripdata_2023-02.parquet')
        mlflow.set_tag("test-data-path",  f'./data/green_tripdata_2023-03.parquet')

        # Set Model params information
        best_params = {
            "learning_rate": 0.09585355369315604,
            "max_depth": 30,
            "min_child_weight": 1.060597050922164,
            'objective': 'reg:squarederror',          # deprecated  "reg:linear"
            # 'objective': "reg:linear",
            "reg_alpha": 0.018060244040060163,
            "reg_lambda": 0.011658731377413597,
            "seed": 42,
        }
        mlflow.log_params(best_params)

        # Build Model   
        booster = xgb.train(
            params               = best_params,
            dtrain               = train,
            num_boost_round      = 100,
            evals                = [(valid, "validation")],
            early_stopping_rounds=20,
        )   
        
        # Set Model Evaluation Metric
        y_pred = booster.predict(valid)
        rmse   = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)       

        # Log Model two options
        # Option1: Just log model
        mlflow.xgboost.log_model(booster, artifact_path="models_mlflow")        
        
        # Option 2: save Model, Optional: Preprocessor or Pipeline         
        # Create dest_path folder unless it already exists
        # pathlib.Path(dest_path).mkdir(exist_ok=True) 
        os.makedirs(dest_path, exist_ok=True)       
        local_file = os.path.join(dest_path, "preprocessor.b")
        with open(local_file, "wb") as f_out:
            pickle.dump(dv, f_out)
            
        # whole proccess like pickle, saved Model, Optional: Preprocessor or Pipeline
        mlflow.log_artifact(local_path = local_file, artifact_path="preprocessor")        
        
        # print(f"default artifacts URI: '{mlflow.get_artifact_uri()}'")
    return None


@flow(name="Subflow Download Data", log_prints=True)
def download_data(raw_data_path: str, years: list, months: list, colors: list):
    # Download the data from the NYC Taxi dataset
    for year in years:
        for month in months:
            for color in colors:
                fetch_data(raw_data_path, year, month, color)
                

# click work on Local but it Gives ERRORS Deploy step
@click.command()
@click.option(
    "--raw_data_path",
    default="./data",
    help="Location where the raw NYC taxi trip data was saved"
)
@click.option(
    "--dest_path",
    default="./models",
    help="Location where the resulting model files will be saved"
)
@click.option(
    "--years",
    default="2023",
    help="Years where the raw NYC taxi trip data was saved (space-separated)"
)
@click.option(
    "--months",
    default="1 2 3",
    help="Months where the raw NYC taxi trip data was saved (space-separated)"
)
@click.option(
    "--colors",
    default="green yellow",
    help="Colors where the raw NYC taxi trip data was saved"
)
@flow(name="Main Flow")
def main_flow(raw_data_path: str, dest_path: str, years: str, months: str, colors: str) -> None:
    """The main training pipeline"""
    # MLflow settings
    # Build or Connect Database Offline
    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    # Build or Connect mlflow experiment
    mlflow.set_experiment("nyc-taxi-experiment")
    
    # Download data    
    years  = [int(year) for year in years.split()]
    months = [int(month) for month in months.split()]
    colors = colors.split()[:1]
    download_data(raw_data_path, years, months, colors)
    print(sorted(glob(f'./data/*')))
    
    # list parquet files
    # print(sorted(glob(f'{raw_data_path}/green*.parquet')))
    train_path, val_path, test_path = sorted(glob(f'{raw_data_path}/*.parquet'))[:3:]

    # Read parquet files
    df_train = read_data(train_path)
    df_val   = read_data(val_path)
    df_test  = read_data(test_path)
    # print(df_train.shape, df_val.shape, df_test.shape, )

    # Transform
    (X_train, y_train), (X_val, y_val), (X_test, y_test), dv = add_features(df_train, df_val, df_test)

    # Train
    train_best_model(X_train, X_val, y_train, y_val, dv, dest_path)


if __name__ == "__main__":
    # argparse work on Local but it Gives ERRORS Deploy step
    # parser = argparse.ArgumentParser(description="Main Flow")
    # parser.add_argument("--raw_data_path", default="./data",       help="Location where the raw NYC taxi trip data was saved")
    # parser.add_argument("--dest_path",     default="./models",     help="Location where the resulting model files will be saved")
    # parser.add_argument("--years",         default="2023",         help="Years where the raw NYC taxi trip data was saved (space-separated)")
    # parser.add_argument("--months",        default="1 2 3 4",      help="Months where the raw NYC taxi trip data was saved (space-separated)")
    # parser.add_argument("--colors",        default="green yellow", help="Colors where the raw NYC taxi trip data was saved")

    # args = parser.parse_args()
    # main_flow(args.raw_data_path, args.dest_path, args.years, args.months, args.colors)

    main_flow()

Overwriting ./pycode/orchestrate.py


In [12]:
# raw_data_path
# DATA_PATH = f"./data"

!python ./pycode/orchestrate.py

18:37:41.518 | [36mINFO[0m    | prefect.engine - Created flow run[35m 'blond-hamster'[0m for flow[1;35m 'Main Flow'[0m
18:37:41.521 | [36mINFO[0m    | Flow run[35m 'blond-hamster'[0m - View at [94mhttp://127.0.0.1:4200/flow-runs/flow-run/34ce9bc7-5b45-49f4-a896-068d2d1591a9[0m
2023/07/03 18:37:42 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2023/07/03 18:37:42 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
18:37:42.797 | [36mINFO[0m    | Flow run[35m 'blond-hamster'[0m - Created subflow run[35m 'abstract-taipan'[0m for flow[1;35m 'Subflow Download Data'[0m
18:37:42.800 | [36mINFO[0m    | Flow run[35m 'abstract-taipan'[0m - View at [94mhttp://127.0.0.1:4200/flow-runs/flow-run/c93

## Q4. RMSE (Markdown Artifact)

Download the February 2023 Green Taxi data and use it for your training data.
Download the March 2023 Green Taxi data and use it for your validation data. 

Create a Prefect Markdown artifact that displays the RMSE for the validation data.
Create a deployment and run it.

**What’s the RMSE in the artifact to two decimal places ?**

In [16]:
%%writefile ./pycode/create_s3_bucket_block.py

from time import sleep
from prefect_aws import S3Bucket, AwsCredentials


def create_aws_creds_block():
    my_aws_creds_obj = AwsCredentials(
        aws_access_key_id="123abc", aws_secret_access_key="abc123"
    )
    my_aws_creds_obj.save(name="my-aws-creds", overwrite=True)


def create_s3_bucket_block():
    aws_creds = AwsCredentials.load("my-aws-creds")
    my_s3_bucket_obj = S3Bucket(
        bucket_name="my-first-bucket-abc", credentials=aws_creds
    )
    my_s3_bucket_obj.save(name="s3-bucket-example", overwrite=True)


if __name__ == "__main__":
    create_aws_creds_block()
    sleep(5)
    create_s3_bucket_block()

Overwriting ./pycode/create_s3_bucket_block.py


In [17]:
%%writefile ./pycode/orchestrate.py

# Source: https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/03-orchestration/3.4/orchestrate.py

import os
import click
import pickle
import pathlib
import argparse
import requests
import urllib.request
from glob import glob
from datetime import date
from datetime import timedelta

import pandas as pd
import numpy as np
import scipy
import sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import mean_squared_error
import xgboost as xgb

import mlflow
import prefect
from prefect import task, flow
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact

# from prefect_aws import S3Bucket
# from prefect_email import EmailServerCredentials, email_send_message

import warnings
# Ignore all warnings
# warnings.filterwarnings("ignore")
# Filter the specific warning message, MLflow autologging encountered a warning
# warnings.filterwarnings("ignore", category=UserWarning, module="setuptools")
warnings.filterwarnings("ignore", category=UserWarning, message="Setuptools is replacing distutils.")


@task(name="Fetch Data", cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1),
      retries=3, log_prints=True, )
def fetch_data(raw_data_path: str, year: int, month: int, color: str) -> None:
    """Fetches data from the NYC Taxi dataset and saves it locally"""
    os.makedirs(raw_data_path, exist_ok=True)  

    # Download the data from the NYC Taxi dataset
    url      = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{color}_tripdata_{year}-{month:0>2}.parquet'
    filename = os.path.join(raw_data_path, f'{color}_tripdata_{year}-{month:0>2}.parquet')
    # urllib.request.urlretrieve(url, filename)
    # os.system(f"wget -q -N -P {raw_data_path} {url}")
    
    response = requests.get(url)
    with open(filename, "wb") as f:
        f.write(response.content)
    return None
    
    
@task(name="Read Taxi Data", retries=3, retry_delay_seconds=2, log_prints=None)
def read_data(filename: str) -> pd.DataFrame:
    """Read data into DataFrame"""
    df = pd.read_parquet(filename)

    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime  = pd.to_datetime(df.lpep_pickup_datetime)

    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration    = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical     = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df


@task(name="Add Features Taxi Data", log_prints=True)
def add_features(
    df_train: pd.DataFrame, df_val: pd.DataFrame, df_test: pd.DataFrame
) -> tuple(
    [
        scipy.sparse._csr.csr_matrix,
        scipy.sparse._csr.csr_matrix,
        np.ndarray,
        np.ndarray,
        sklearn.feature_extraction.DictVectorizer,
    ]
):
    """Add features to the model"""
    df_train["PU_DO"] = df_train["PULocationID"] + "_" + df_train["DOLocationID"]
    df_val["PU_DO"]   = df_val["PULocationID"]   + "_" + df_val["DOLocationID"]
    df_test["PU_DO"]  = df_test["PULocationID"]  + "_" + df_test["DOLocationID"]

    categorical = ["PU_DO"]  #'PULocationID', 'DOLocationID']
    numerical   = ["trip_distance"]

    dv = DictVectorizer()

    train_dicts = df_train[categorical + numerical].to_dict(orient="records")
    X_train     = dv.fit_transform(train_dicts)
    y_train     = df_train["duration"].values

    val_dicts   = df_val[categorical + numerical].to_dict(orient="records")
    X_val       = dv.transform(val_dicts)
    y_val       = df_val["duration"].values
    
    test_dicts  = df_test[categorical + numerical].to_dict(orient="records")
    X_test      = dv.transform(test_dicts)
    y_test      = df_test["duration"].values

    return (X_train, y_train), (X_val, y_val), (X_test, y_test), dv


@task(name="Train Best Model", log_prints=True)
def train_best_model(
    X_train  : scipy.sparse._csr.csr_matrix,
    X_val    : scipy.sparse._csr.csr_matrix,
    y_train  : np.ndarray,
    y_val    : np.ndarray,
    dv       : sklearn.feature_extraction.DictVectorizer,
    dest_path: str,
) -> None:
    """train a model with best hyperparams and write everything out"""        
    # Load train and test Data
    train = xgb.DMatrix(X_train, label=y_train)
    valid = xgb.DMatrix(X_val, label=y_val)

    # before your training code to enable automatic logging of sklearn metrics, params, and models
    # mlflow.xgboost.autolog()
    
    with mlflow.start_run():
        # Optional: Set some information about Model
        mlflow.set_tag("developer", "muce")
        mlflow.set_tag("algorithm", "Machine Learning")
        mlflow.set_tag("train-data-path", f'./data/green_tripdata_2023-01.parquet')
        mlflow.set_tag("valid-data-path", f'./data/green_tripdata_2023-02.parquet')
        mlflow.set_tag("test-data-path",  f'./data/green_tripdata_2023-03.parquet')

        # Set Model params information
        best_params = {
            "learning_rate": 0.09585355369315604,
            "max_depth": 30,
            "min_child_weight": 1.060597050922164,
#             'objective': 'reg:squarederror',          # deprecated  "reg:linear"
            'objective': "reg:linear",
            "reg_alpha": 0.018060244040060163,
            "reg_lambda": 0.011658731377413597,
            "seed": 42,
        }
        mlflow.log_params(best_params)

        # Build Model   
        booster = xgb.train(
            params               = best_params,
            dtrain               = train,
            num_boost_round      = 100,
            evals                = [(valid, "validation")],
            early_stopping_rounds=20,
        )   
        
        # Set Model Evaluation Metric
        y_pred = booster.predict(valid)
        rmse   = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)       

        # Log Model two options
        # Option1: Just log model
        mlflow.xgboost.log_model(booster, artifact_path="models_mlflow")        
        
        # Option 2: save Model, Optional: Preprocessor or Pipeline         
        # Create dest_path folder unless it already exists
        # pathlib.Path(dest_path).mkdir(exist_ok=True) 
        os.makedirs(dest_path, exist_ok=True)       
        local_file = os.path.join(dest_path, "preprocessor.b")
        with open(local_file, "wb") as f_out:
            pickle.dump(dv, f_out)
            
        # whole proccess like pickle, saved Model, Optional: Preprocessor or Pipeline
        mlflow.log_artifact(local_path = local_file, artifact_path="preprocessor")        
        
        # print(f"default artifacts URI: '{mlflow.get_artifact_uri()}'")

        # Create markdown artifact with RMSE value
        markdown__rmse_report = f"""
# RMSE Report

## Summary

Duration Prediction 

## RMSE XGBoost Model

| Region    | RMSE |
|:----------|-------:|
| {date.today()} | {rmse:.2f} |
"""

        create_markdown_artifact(
            key="duration-model-report", 
            markdown=markdown__rmse_report,
            description="RMSE for Validation Data Report",
        )
    return None


@flow(name="Subflow Download Data", log_prints=True)
def download_data(raw_data_path: str, years: list, months: list, colors: list):
    # Download the data from the NYC Taxi dataset
    for year in years:
        for month in months:
            for color in colors:
                fetch_data(raw_data_path, year, month, color)


# @flow(name="Email Server Crenditals", log_prints=True)
# def example_email_send_message_flow(email_addresses: list[str]):
#     email_server_credentials = EmailServerCredentials.load("email-server-credentials")
    
#     for email_address in email_addresses:
#         subject = email_send_message.with_options(name=f"email {email_address}").submit(
#             email_server_credentials=email_server_credentials,
#             subject="Example Flow Notification using Gmail",
#             msg="This proves email_send_message works!",
#             email_to=email_address,
#         )
                

@flow(name="Main Flow")
def main_flow(raw_data_path="./data", dest_path="./models", years="2023", months="1 2 3 4", colors="green yellow") -> None:
    """The main training pipeline"""
    # MLflow settings
    # Build or Connect Database Offline
    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    # Build or Connect mlflow experiment
    mlflow.set_experiment("nyc-taxi-experiment")
    
    # Download data    
    years  = [int(year) for year in years.split()]
    months = [int(month) for month in months.split()]
    colors = colors.split()[:1]
    download_data(raw_data_path, years, months, colors)
    # print(glob(f'*'))
    
    # # Download the data from AWS S3 Bucket
    # s3_bucket_block = S3Bucket.load("s3-bucket-block")
    # s3_bucket_block.download_folder_to_path(from_folder="data", to_folder="data")
    
    # list parquet files
    # print(sorted(glob(f'{raw_data_path}/green*.parquet')))
    train_path, val_path, test_path = sorted(glob(f'{raw_data_path}/*.parquet'))[-3::]

    # Read parquet files
    df_train = read_data(train_path)
    df_val   = read_data(val_path)
    df_test  = read_data(test_path)
    # print(df_train.shape, df_val.shape, df_test.shape, )

    # Transform
    (X_train, y_train), (X_val, y_val), (X_test, y_test), dv = add_features(df_train, df_val, df_test)

    # Train
    train_best_model(X_train, X_val, y_train, y_val, dv, dest_path)

    # example_email_send_message_flow(['@gmail.com'])

if __name__ == "__main__":
    main_flow()

Overwriting ./pycode/orchestrate.py


In [15]:
!python ./pycode/orchestrate.py

18:39:53.665 | [36mINFO[0m    | prefect.engine - Created flow run[35m 'cryptic-chinchilla'[0m for flow[1;35m 'Main Flow'[0m
18:39:53.671 | [36mINFO[0m    | Flow run[35m 'cryptic-chinchilla'[0m - View at [94mhttp://127.0.0.1:4200/flow-runs/flow-run/b3deb933-fbfe-4a72-b7a6-8280c5046a00[0m
2023/07/03 18:39:54 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2023/07/03 18:39:54 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
18:39:54.731 | [36mINFO[0m    | Flow run[35m 'cryptic-chinchilla'[0m - Created subflow run[35m 'invisible-goshawk'[0m for flow[1;35m 'Subflow Download Data'[0m
18:39:54.733 | [36mINFO[0m    | Flow run[35m 'invisible-goshawk'[0m - View at [94mhttp://127.0.0.1:4200/flo

- Start a worker that polls your work pool
```sh
# prefect deploy --all  # or 
prefect deploy "./pycode/orchestrate.py:main_flow" -n "zoom_mlops_deployment" -p "zoom_mlops_pool"
prefect worker start --pool 'zoom_mlops_pool'
prefect deployment run 'Main Flow/zoom_mlops_deployment'
```

```
31f95cf7
Flow run
wonderful-bat
Task run
Train Best Model-0
RMSE for Validation Data Report
RMSE for Validation Data
RMSE: 5.374495195206525


dc032057
Flow run
interesting-cougar
Task run
Train Best Model-0
Created gtm-report
```

## Q5. Emails


It’s often helpful to be notified when something with your dataflow doesn’t work
as planned. Create an email notification for to use with your own Prefect server instance.
In your virtual environment, install the prefect-email integration with 

```bash
pip install prefect-email
```

Make sure you are connected to a running Prefect server instance through your
Prefect profile.
See the docs if needed: https://docs.prefect.io/latest/concepts/settings/#configuration-profiles

Register the new block with your server with 

```bash
prefect block register -m prefect_email
```

Remember that a block is a Prefect class with a nice UI form interface.
Block objects live on the server and can be created and accessed in your Python code. 

See the docs for how to authenticate by saving your email credentials to
a block and note that you will need an App Password to send emails with
Gmail and other services. Follow the instructions in the docs.

Create and save an `EmailServerCredentials` notification block.
Use the credentials block to send an email.

Test the notification functionality by running a deployment.

**What is the name of the pre-built prefect-email task function?**

In [None]:
!prefect block register -m prefect_email

[32mSuccessfully registered 1 block[0m

┏━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃[1m [0m[1mRegistered Blocks       [0m[1m [0m┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Email Server Credentials │
└──────────────────────────┘

 To configure the newly registered blocks, go to the Blocks page in the Prefect 
UI.



In [None]:
from prefect_email import EmailServerCredentials

credentials = EmailServerCredentials(
    username="EMAIL-ADDRESS-PLACEHOLDER",
    password="PASSWORD-PLACEHOLDER",  # must be an app password
)
credentials.save("BLOCK-NAME-PLACEHOLDER")

<coroutine object Block.save at 0x7f5e362fb450>

In [None]:
from prefect_email import EmailServerCredentials

EmailServerCredentials.load("BLOCK_NAME_PLACEHOLDER")

<coroutine object Block.load at 0x7f5e362fb140>

In [None]:
# Send an email using Gmail
from prefect import flow
from prefect_email import EmailServerCredentials, email_send_message

@flow(name="Email Server Credentials")
def example_email_send_message_flow():
    email_server_credentials = EmailServerCredentials(
        username="your_email_address@gmail.com",
        password="MUST_be_an_app_password_here!",
    )
    # email_server_credentials.save("BLOCK-NAME-PLACEHOLDER")
    # email_server_credentials = EmailServerCredentials.load("BLOCK_NAME_PLACEHOLDER")
    
    subject = email_send_message(
        email_server_credentials=email_server_credentials,
        subject="Example Flow Notification using Gmail",
        msg="This proves email_send_message works!",
        email_to="someone_awesome@gmail.com",
    )
    return subject

# example_email_send_message_flow()

## Q6. Prefect Cloud

The hosted Prefect Cloud lets you avoid running your own Prefect server and
has automations that allow you to get notifications when certain events occur
or don’t occur. 

Create a free forever Prefect Cloud account at [app.prefect.cloud](https://app.prefect.cloud/) and connect
your workspace to it following the steps in the UI when you sign up. 

Set up an Automation from the UI that will send yourself an email when
a flow run completes. Run one of your existing deployments and check
your email to see the notification.

Make sure your active profile is pointing toward Prefect Cloud and
make sure you have a worker active.

**What is the name of the second step in the Automation creation process?**

**Note:**

- The name of the second step in the Automation creation process in Prefect Cloud is "Configure Triggers." This step allows you to define the conditions or events that will trigger the Automation to execute. In this case, you would configure the trigger to activate when a flow run completes.

# End of The Project