# | default_exp core

In [None]:
# | hide
import dagshub
import mlflow
import nbdev
from nbdev.showdoc import *

# | export
def foo():
    pass

In [None]:
# | hide
# this function allows us to get the experiment ID from an experiment name
def get_experiment_id(name):
    exp = mlflow.get_experiment_by_name(name)
    if exp is None:
      exp_id = mlflow.create_experiment(name)
      return exp_id
    return exp.experiment_id

In [None]:
# | hide
nbdev.nbdev_export()

## Stages of Pipeline Deployment

For LLMs, this is a data augmentation pipeline. Raw data will be augmented to compute one or more new columns. This needs to go through the familiar stages of Development, Staging, and Produciton.

### Development

LLMOps goals for Development/Evaluation are

1. track what is being done carefully for later auditing and reproducibility
2. package models or pipelines in a format which will make future deployment easier. 

We will:
* Load data
* Build an LLM pipeline
* Test applying the pipeline to data and log queries and results to MLflow Tracking
* Log the pipeline to the MLflow tracking server as an MLflow model

The EDA/desired transformations are not really done in this step. The example video mentions that the processing is done during the **course** and not in the LLMOps video. The video starts the workflow focusing on tracking.

### Staging

LLMOps goals for staging/testing/QA are
1. track the LLM's progress through testing and towards production
2. work programmatically to demonstrated the APIs needed for future CI/CD automation

We will:
* Register the pipeline to the MLflow Model Registry
* Test the pipeline on sample data
* Promote the registered model (pipeline) to production

### Production

LLMOps goals for production are 
1. write scale-out code that can meet scaling demands in the future
2. simplify deployment by using MLflow to write model-agnostic deployment code

We will:
1. Load the latest production LLM pipeline from the Model Registry
2. Apply the pipeline to an Apache Spark Dataframe
3. Append the results to a Delta Lake Table


## Notes about this workflow

### Notebook vs modular scripts
For a demo, everything in the workflow is divided into notebook sections, but this should really be split into separate notebooks or scripts

### Models vs code
Since the path here is tracked via MLflow Model Registry, this workflow promotes models over code. See "The Big Book of MLOps" for more discussion over the distinction (one difference is Model Registry vs Git)

# | Below this are blocks to use DagsHub with MLflow

In [None]:
#@markdown Enter the username of your DAGsHub account:
DAGSHUB_USER_NAME = "AaronWChen"                        #@param {type:"string"}

#@markdown Enter the email for your DAGsHub account:
DAGSHUB_EMAIL = "awc33@cornell.edu"                     #@param {type:"string"}

#@markdown Enter the repo name 
DAGSHUB_REPO_NAME= ""                                   #@param {type:"string"}

#@markdown Enter the name of the branch you are working on 
BRANCH= ""                                              #@param {type:"string"}
dagshub.init(repo_name=DAGSHUB_REPO_NAME
             , repo_owner=DAGSHUB_USER_NAME)


In [None]:
mlflow.set_tracking_uri(f'https://dagshub.com/{DAGSHUB_USER_NAME}/{DAGSHUB_REPO_NAME}.mlflow')

# starter idea for making an experiment name can be the git branch, but need more specificity
DAGSHUB_TEST_NAME = "stanza_quadgrams_small_set_v1"     #@param {type:"string"}
experiment_name = f"{DAGSHUB_EMAIL}/{DAGSHUB_TEST_NAME}"
mlflow_exp_id = get_experiment_id(experiment_name)

## DEVELOPMENT

In [None]:
# import necessary libraries to handle raw data
import dill as pickle
import dvc.api
import pandas as pd
from sklearn.feature_extraction.text import (
    CountVectorizer
    , TfidfTransformer
    , TfidfVectorizer
    ,
)
from src.custom_stanza_mlflow import StanzaWrapper
import src.dataframe_preprocessor as dfpp
import tqdm

### Prepare data

In [None]:
# load raw data and preprocess/clean
data = dvc.api.read(
        path='../data/raw/recipes-en-201706/epicurious-recipes_m2.json',
        mode='r')
raw_df = pd.read_json(data)

In [None]:
# Create subset for dev
dev_df = raw_df[0:50]

# pre_proc_df is cleaned dataframe
pre_proc_df = dfpp.preprocess_dataframe(dev_df)

### Convert data to Delta format?

In [None]:
import pyspark.pandas as ps

# save and log preprocessed dataframe(s)
prod_data_path = "../../data/processed/prod_data"
test_spark_dataset = ps.from_pandas(pre_proc_df)
test_spark_dataset.to_delta(path=prod_data_path,
                            mode='overwrite',
                            index='id')
mlflow.log_artifacts("../../data/processed/prod_data")

### Develop the pipeline

MLflow Tracking is organized as follows:
* An **experiment** generlaly corresponds to the creation of 1 primary model or pipeline. It contains some number of /*runs*/
  * A **run** generally corresponds to the creation of 1 sub-model, such as 1 trial during hyperparameter tuing in traditional ML. In this example, running the notebook once only creates 1 run, but a second execution of the notebook would create a second run. This version tracking can be useful during iterative development. Each run contains some number of logged parameters, metrics, tags, models, artifacts, and other metadata
    * A **parameter** is an input to the model or pipeline, such as a regularization parameter in traditional ML 
    * A **metric** is an output of evaluation, such as accuracy or loss
    * An **artifact** is an arbitrary file stored alongside a run's metadata, such as the serialized model itself
    * A **flavor** is an MLflow format for serializing models. This format uses the underlying ML library's (ie, PyTorch, TensorFlow, Hugging Face) format plus metadata
    * As of MLflow 2.3.1, there is an experimental API for logging Predictions
  * Wrap model development with a call to ```with mlflow.start_run():``` This context manager syntax starts and ends the MLflow run explicitly, which is a best practice for code which may be moved to production. See API doc

In [None]:
# create pipelines relevant to library used
# MLflow example uses HuggingFace
# below is example for MeaLeon with Stanza and sklearn NLP pipeline

# mlflow.set_experiment("") this can be in the start_run though

# cv_params are parameters for the sklearn CountVectorizer or TFIDFVectorizer
cv_params = {
    'strip_accents':"unicode",
    'lowercase':True,
    'analyzer': StanzaWrapper().stanza_analyzer(stanza_pipeline=nlp, minNgramLength=1, maxNgramLength=4),
    'min_df':10,
}

# pipeline_params are parameters that will be logged in MLFlow and are a superset of library parameters
pipeline_params = {
    'stanza_model': 'en',
    'language': 'english',
    'sklearn-transformer': 'TfidfVectorizer'
}

# update the pipeline parameters with the library-specific ones so that they show up in MLflow Tracking
pipeline_params.update(cv_params)

with mlflow.start_run(experiment_id=mlflow_exp_id):
    # LOG PARAMETERS
    mlflow.log_params(pipeline_params)

    # LOG INPUTS (QUERIES) AND OUTPUTS
    # MLflow example uses a list of strings or a list of str->str dicts
    # useful for complex learning, say with few-shot pipeline

    # mlflow.llm.log_predictions(inputs=, outputs=, prompts=[])
    
    # LOG MODEL
    # useful to log a "signature" with the model telling MLflow the input and output schema for the model
    # signature = mlflow.models.infer_signature(
    #     pre_proc_df["ingredients"][0],
    #     mlflow.transformers.generate_signature_output(
    #         summarizer, pre_proc_df["ingredients"][0]
    #     )
    # )
    # print(f"Signature:\n{signature}\n")

    # for mlflow.transformers, if there are inference-time configurations,
    # those need to be saved specially in the log_model call
    # this ensures that the pipeline will use these same configurations when re-loaded
    # inference_config = {
    #     "min_length": min_length,
    #     "max_length": max_length,
    #     "truncation": truncation,
    #     "do_sample": do_sample
    # }

    # logging a model returns a handle "model_info" to the model metadata in the tracking server. This 'model_info' will be useful later in the notebook to retrieve the logged model
    model_info = mlflow.transformers.log_model(
        transformers_model=summarizer,
        artifact_path='summarizer',
        task='summarization',
        inference_config=inference_config,
        signature=signature
        input_example="This is an example of a long this pipeline can summarize"
    )

    # since this uses a custom Stanza analyzer, we have to use a custom mlflow.Pyfunc.PythonModel
    # Instantiate sklearn TFIDFVectorizer
    tfidf_vectorizer_model = TfidfVectorizer(**cv_params)

    # Do fit transform on data
    test_tfidf_transform = tfidf_vectorizer_model.fit_transform(tqdm(pre_proc_df["ingredients"]))

    word_matrix = ps.DataFrame(
        test_tfidf_transform.toarray()
        , columns=tfidf_vectorizer_model.get_feature_names_out()
        , index=pre_proc_df.index
    )

    with open("../joblib/tfidf_transformer_small_test.pkl", "wb") as fo:
        pickle.dump(tfidf_vectorizer_model, fo)
        mlflow.log_artifact("../joblib/tfidf_transformer_small_test.pkl", artifact_path="sklearn_dill_pkls")

    with open("../joblib/database_word_matrix_small_test.pkl", "wb") as fo:
        pickle.dump(word_matrix, fo)
        mlflow.log_artifact("../joblib/database_word_matrix_small_test.pkl", artifact_path="sklearn_dill_pkls")


### Loading the pipeline back

In [None]:
loaded_summarizer = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
loaded_summarizer.predict(xsum_sample['document'][0])

`predict()` method can handle more than 1 document at a time

results = loaded_summarizer.predict(xsum_sample.to_pandas()['document'])
display(pd.DataFrame(results, columns=['generated_summary']))

## Moving to Staging

Begin by registering the model in the MLflow Model Registry

In [None]:
# Define the name for the model in the Model Registry
# We filter out some special characters that cannot be used in model names

model_name = f'summarizer - {DA.username}'
model_name = model_name.replace("/", "_").replace(":","-")
print(model_name)

In [None]:
# Register a new model under the given name, or a new model version if the name exists already
mlflow.register_model(model_uri=model_info.model_uri, name=model_name)

## Test the Pipeline
During the Staging step of development, our goal is to move code and/or models from Development to Production. In order to do so, we must test the code and/or models to make sure they are ready for Production

We track our progress here using the MLflow Model Registry. This metadata and model store organizes models as follows:
* **A registered model** is a named model in the registry (here, the summarization model). It may have multiple /*versions*/.
  * **A model version** is an instance of a given model. As you update your model, you will create new versions. Each version is designated as being in a particular /*stage*/ of deployment.
    * **A stage** is a stage of deployment: `None` (development), `Staging`, `Production`, or `Archived`

The model we registered above starts with 1 version in stage `None` (Development).

In the workflow below, we programmatically transition the model from development to staging to production. For more information on the Model Registry API, see the [Model Registry docs](link). Alternatively, you can edit the registry and make model stage transitions via the UI. To access the UI, click the Experiments menu option in the left-hand sidebar, and search for your model name.

In [None]:
from mlflow import MlflowClient

client = MlflowClient()

In [None]:
client.search_registered_models(filter_string=f"name = {model_name}")

In this example, we will run manual tests, but it would be reasonable to run both automated evaluation and human evaluation in practice. Once tests pass, we will promote the model to stage `Production` to mark it ready for user-facing applications

/*Model URIs*/: Below, we use model URIs to tell MLflow which model and version we are rferring to. Two common URI patterns for the MLflow Model Registry are:
* `f"models:/{model_name}/{model_version}"` to refer to a specific model version by number
* `f"models:/{model_name}/{model_stage}"` to refer to the latest model in a given stage

In [None]:
model_version = 1
dev_model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")
dev_model

/*Note about model dependencies*/ When you load the model via MLflow above, you may see warnings about the Python environment. Make sure the environments for development, staging, and production match!

MLflow saves these libraries and versions alongside the logged model. See the [MLflow docs on model storage](link) for more information.

## Transition to Staging
We will move the model to `Staging` to indicate that we are actively testing it

In [None]:
client.transition_model_version_stage(model_name, model_version, "staging")

In [None]:
staging_model = dev_model

# an actual CD/CD workflow might load the `staging_model` programmatically, like:
# mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{Staging}")
# or
# mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")

We now "test" the model manually on sample data. Here, we simply print out the results and compare them with the original data. In a more realistic setting, we might use a set of human evaluators (or a combination of automated metrics and human evaluators) to decide whether the model outperformed the previous model or system

In [None]:
results = staging_model.predict(xsum_sample.to_pandas()['document'])
display(pd.DataFrame(results, columns=['generated_summary']))

If the results look good, they can be transitioned to production

## Transition to Production

In [None]:
client.transition_model_version_stage(model_name, model_version, "production")

## Create a production workflow for batch inference
Once the pipeline is in Production, it may be used by one or more production jobs or serving endpoints. Common deployment locations are:
* batch or streaming inference jobs
* model serving endpoints
* edge devices

Example shows batch inference using Apache Spark DataFrames with Delta Lake format. Spark allows simple scale-out inference for high-throughput, low-cost jobs, and Delta allows us to append to and modify inference refult tables with ACID transactions. See the [Apache Spark page](link) and the [Delta Lake page](link) for more information on these technologies

In [None]:
# Load our data as a Spark DataFrame
# Recall that we saved this as Delta at the start of the notebook
# Also note that it has a ground-truth summary column

prod_data = spark.read.format("delta").load(prod_data_path).limit(10)
display(prod_data)

Below, we load the model using `mlflow.pyfunc.spark_udf`. This returns the model as a Spark User Defined Function which can be applied efficiently to big data. /*Note that the deployment code is library-agnostic: it never references that the model is a Hugging Face pipeline*/. This simplified deployment is possible because MLflow logs environment metadata and "knows" how to load the model and run it

In [None]:
# MLflow lets you grab the latest model version in a given stage. Here, we grab the latest Production version
prod_model_udf = mlflow.pyfunc.spark_udf(
    spark,
    model_uri=f"models:/{model_name}/Production",
    env_manager="virtualenv",
    result_type="string"
)

In [None]:
# Run inference by appending a new column to the DataFrame
batch_inference_results = prod_data.withColumn("generated_summary", prod_model_udf("document"))
display(batch_inference_results)

We can now write out our inference results to another Delta table. Here, we append the results to an existing table (and create the table if it does not exist)

In [None]:
inference_results_path = f"{DA.paths.working_dir}/m6-inference-results".replace("/dbfs", "dbfs:")
batch_inference_results.write.format("delta").mode("append").save(inference_results_path)

Delta is chosen because it works well with Spark and other scale-out technologies, and works well in supporting both batch and streaming pipelines, allowing concurrent reads and writes to and from a table with ACID transactions

And that's it!

To create a productionjob, we could for example take the new lines of code above, put them in a new notebook, and schedule it as an automated workflow. MLflow can be integrated with essentially any deployment system, but for more intformation specific to this Databricks workspace, see the "Use model for inference" documentation for AWS, Azure, or GCP.

We did not cover model serving for real-time inference, but MLflow models can be deployed to any cloud or on-prem serving systems. For more information, see the [open-source MLflow Model Registry docs](link) or the [Databricks Model Serving docs](link)

For other topics not covered, see ["The Big Book of MLOps"](link)

## Summary
We have now walked through a full example of going from development to production. Our LLM pipeline was very simple, but LLM Ops for a more complex workflow (such as fune-tuning a custom model) would be very similar. You still follow the basic Ops steps of:
* Development: Creating the pipeline or model, tracking the process in the MLflow Tracking server, and seving the final pipeline or model
* Staging: Registering a new model or version in the MLflow Model Registry, testing it, and promoting it through Staging to Production. 
* Production: Creating an inference job, or creating a model serving endpoint