<a href="https://www.kaggle.com/code/yannicksteph/nlp-llm-llmops-pipeline-dev-stag-prod?scriptVersionId=158424344" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# | NLP | LLM | LLMOps | Pipeline Dev Stag Prod |

## Natural Language Processing (NLP) and Large Language Models (LLM) with LLMOps and make a Pipeline Dev Stag Prod.


![Learning](https://t3.ftcdn.net/jpg/06/14/01/52/360_F_614015247_EWZHvC6AAOsaIOepakhyJvMqUu5tpLfY.jpg)


# <b>1 <span style='color:#78D118'>|</span> Overview</b>


In this example, we will walk through some key steps for taking an LLM-based pipeline to production.  Our pipeline will be: summarization of news articles using a pre-trained model from Hugging Face.  

But in this walkthrough, we will be more rigorous about LLMOps.

**Develop an LLM pipeline**

Our LLMOps goals during development are (a) to track what we do carefully for later auditing and reproducibility and (b) to package models or pipelines in a format which will make future deployment easier.  Step-by-step, we will:
* Load data.
* Build an LLM pipeline.
* Test applying the pipeline to data, and log queries and results to MLflow Tracking.
* Log the pipeline to the MLflow Tracking server as an MLflow Model.

**Test the LLM pipeline**

Our LLMOps goals during testing (in the staging or QA stage) are (a) to track the LLM's progress through testing and towards production and (b) to do so programmatically to demonstrate the APIs needed for future CI/CD automation.  Step-by-step, we will:
* Register the pipeline to the MLflow Model Registry.
* Test the pipeline on sample data.
* Promote the registered model (pipeline) to production.

**Create a production workflow for batch inference**

Our LLMOps goals during production are (a) to write scale-out code which can meet scaling demands in the future and (b) to simplify deployment by using MLflow to write model-agnostic deployment code.  Step-by-step, we will:
* Load the latest production LLM pipeline from the Model Registry.
* Apply the pipeline to an Apache Spark DataFrame.
* Append the results to a Delta Lake table.

### Notes about this workflow

**This notebook vs. modular scripts**: Since this demo is in a single notebook, we will divide the workflow from development to production via notebook sections.  In a more realistic LLM Ops setup, you would likely have the sections split into separate notebooks or scripts.

**Promoting models vs. code**: We track the path from development to production via the MLflow Model Registry.  That is, we are *promoting models* towards production, rather than promoting code.  For more discussion of these two paradigms, see ["The Big Book of MLOps"](https://www.databricks.com/resources/ebook/the-big-book-of-mlops).

### Learning Objectives
1. Walk through a simple but realistic workflow to take an LLM pipeline from development to production.
2. Make use of MLflow Tracking and the Model Registry to package and manage the pipeline.
3. Scale out batch inference using Apache Spark and Delta Lake.


### Setup


In [1]:
%%capture

!pip install sparkmagic
!pip install pyspark
!pip install mlflow
!pip install pandas --upgrade

In [2]:
cache_dir = "./cache"
user_path = "user"
working_dir= "./working_dir"

In [3]:
import pandas as pd
pd.set_option('display.max_column', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_seq_items', None)
pd.set_option('display.max_colwidth', 500)
pd.set_option('expand_frame_repr', True)

# <b>2 <span style='color:#78D118'>|</span> Prepare data</b>

For this notebook we'll use the <a href="https://huggingface.co/datasets/xsum" target="_blank">Extreme Summarization (XSum) Dataset</a>  with the <a href="https://huggingface.co/t5-small" target="_blank">T5 Text-To-Text Transfer Transformer</a> from Hugging Face.


In [4]:
from datasets import load_dataset
from transformers import pipeline

xsum_dataset = load_dataset(
    "xsum", 
    version="1.2.0",
    cache_dir=cache_dir
)  # Note: We specify cache_dir to use pre-cached data.
xsum_sample = xsum_dataset["train"].select(range(10))
display(xsum_sample.to_pandas().head())



Downloading builder script:   0%|          | 0.00/2.05k [00:00<?, ?B/s]

Downloading metadata:   0%|          | 0.00/954 [00:00<?, ?B/s]

Downloading and preparing dataset xsum/default (download: 245.38 MiB, generated: 507.60 MiB, post-processed: Unknown size, total: 752.98 MiB) to ./cache/xsum/default/1.2.0/32c23220eadddb1149b16ed2e9430a05293768cfffbdfd151058697d4c11f934...


Downloading data files:   0%|          | 0/2 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/255M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/1.00M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/204045 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/11332 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/11334 [00:00<?, ? examples/s]

Dataset xsum downloaded and prepared to ./cache/xsum/default/1.2.0/32c23220eadddb1149b16ed2e9430a05293768cfffbdfd151058697d4c11f934. Subsequent calls will reuse this data.


  0%|          | 0/3 [00:00<?, ?it/s]

Unnamed: 0,document,summary,id
0,"The full cost of damage in Newton Stewart, one of the areas worst affected, is still being assessed.\nRepair work is ongoing in Hawick and many roads in Peeblesshire remain badly affected by standing water.\nTrains on the west coast mainline face disruption due to damage at the Lamington Viaduct.\nMany businesses and householders were affected by flooding in Newton Stewart after the River Cree overflowed into the town.\nFirst Minister Nicola Sturgeon visited the area to inspect the damage.\n...",Clean-up operations are continuing across the Scottish Borders and Dumfries and Galloway after flooding caused by Storm Frank.,35232142
1,"A fire alarm went off at the Holiday Inn in Hope Street at about 04:20 BST on Saturday and guests were asked to leave the hotel.\nAs they gathered outside they saw the two buses, parked side-by-side in the car park, engulfed by flames.\nOne of the tour groups is from Germany, the other from China and Taiwan. It was their first night in Northern Ireland.\nThe driver of one of the buses said many of the passengers had left personal belongings on board and these had been destroyed.\nBoth groups...",Two tourist buses have been destroyed by fire in a suspected arson attack in Belfast city centre.,40143035
2,"Ferrari appeared in a position to challenge until the final laps, when the Mercedes stretched their legs to go half a second clear of the red cars.\nSebastian Vettel will start third ahead of team-mate Kimi Raikkonen.\nThe world champion subsequently escaped punishment for reversing in the pit lane, which could have seen him stripped of pole.\nBut stewards only handed Hamilton a reprimand, after governing body the FIA said ""no clear instruction was given on where he should park"".\nBelgian St...",Lewis Hamilton stormed to pole position at the Bahrain Grand Prix ahead of Mercedes team-mate Nico Rosberg.,35951548
3,"John Edward Bates, formerly of Spalding, Lincolnshire, but now living in London, faces a total of 22 charges, including two counts of indecency with a child.\nThe 67-year-old is accused of committing the offences between March 1972 and October 1989.\nMr Bates denies all the charges.\nGrace Hale, prosecuting, told the jury that the allegations of sexual abuse were made by made by four male complainants and related to when Mr Bates was a scout leader in South Lincolnshire and Cambridgeshire.\n...","A former Lincolnshire Police officer carried out a series of sex attacks on boys, a jury at Lincoln Crown Court was told.",36266422
4,"Patients and staff were evacuated from Cerahpasa hospital on Wednesday after a man receiving treatment at the clinic threatened to shoot himself and others.\nOfficers were deployed to negotiate with the man, a young police officer.\nEarlier reports that the armed man had taken several people hostage proved incorrect.\nThe chief consultant of Cerahpasa hospital, Zekayi Kutlubay, who was evacuated from the facility, said that there had been ""no hostage crises"", adding that the man was ""alone i...","An armed man who locked himself into a room at a psychiatric hospital in Istanbul has ended his threat to kill himself, Turkish media report.",38826984


In [5]:
xsum_dataset

DatasetDict({
    train: Dataset({
        features: ['document', 'summary', 'id'],
        num_rows: 204045
    })
    validation: Dataset({
        features: ['document', 'summary', 'id'],
        num_rows: 11332
    })
    test: Dataset({
        features: ['document', 'summary', 'id'],
        num_rows: 11334
    })
})

Later on, when we show Production inference, we will want a dataset saved for it.  See the production section below for more information about Delta, the format we use to save the data here.

#### Library pre-requisites
IMPORTANT! Since we will be interacting with Spark by writing a Spark dataframe, we need a Spark Connector.

You need to attach a Spark-Pinecone connector **s3://pinecone-jars/0.2.1/spark-pinecone-uberjar.jar** in the cluster you are using. Refer to this documentation if you need more information.

In [6]:
##################
#### For Demo ####
##################
from pyspark.sql import SparkSession
# Spark in local mode else using spark from your Spark Connector 
# NOTE: For demo
spark = SparkSession.builder.master("local[*]").getOrCreate()
##################

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/10 10:06:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
prod_data_path = f"{cache_dir}/prod_data"
test_spark_dataset = spark.createDataFrame(xsum_dataset["test"].to_pandas())

##################
#### For Demo ####
##################
test_spark_dataset.write.mode("overwrite").save(prod_data_path)
##################

# Spark Connector 
#test_spark_dataset.write.format("delta").mode("overwrite").save(prod_data_path)

24/01/10 10:06:36 WARN TaskSetManager: Stage 0 contains a task of very large size (7188 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

# <b>2 <span style='color:#78D118'>|</span> Develop an LLM pipeline</b>

Create a Hugging Face pipeline

In [8]:
from transformers import pipeline

# Later, we plan to log all of these parameters to MLflow.
# Storing them as variables here will help with that.
hf_model_name = "t5-small"
min_length = 20
max_length = 40
truncation = True
do_sample = True

summarizer = pipeline(
    task="summarization",
    model=hf_model_name,
    min_length=min_length,
    max_length=max_length,
    truncation=truncation,
    do_sample=do_sample,
    model_kwargs={"cache_dir": cache_dir},
)  # Note: We specify cache_dir to use pre-cached models.


config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/2.32k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

We can now examine the `summarizer` pipeline summarizing a document from the `xsum` dataset.

In [9]:
doc0 = xsum_sample["document"][0]
print(f"Summary: {summarizer(doc0)[0]['summary_text']}")
print("===============================================")
print(f"Original Document: {doc0}")

Summary: the full cost of damage in Newton Stewart is still being assessed . the water breached a retaining wall, flooding many commercial properties . a flood alert remains in place across
Original Document: The full cost of damage in Newton Stewart, one of the areas worst affected, is still being assessed.
Repair work is ongoing in Hawick and many roads in Peeblesshire remain badly affected by standing water.
Trains on the west coast mainline face disruption due to damage at the Lamington Viaduct.
Many businesses and householders were affected by flooding in Newton Stewart after the River Cree overflowed into the town.
First Minister Nicola Sturgeon visited the area to inspect the damage.
The waters breached a retaining wall, flooding many commercial properties on Victoria Street - the main shopping thoroughfare.
Jeanette Tate, who owns the Cinnamon Cafe which was badly affected, said she could not fault the multi-agency response once the flood hit.
However, she said more preventativ

# <b>3 <span style='color:#78D118'>|</span> Track LLM development with MLflow</b>

[MLflow](https://mlflow.org/) has a Tracking component that helps you to track exactly how models or pipelines are produced during development.  Although we are not fitting (tuning or training) a model here, we can still make use of tracking to:

* Track example queries and responses to the LLM pipeline, for later review or analysis
* Store the model as an [MLflow Model flavor](https://mlflow.org/docs/latest/models.html#built-in-model-flavors), thus packaging it for simpler deployment


Apply to a batch of articles

In [10]:
# Apply to a batch of articles
import pandas as pd

results = summarizer(xsum_sample["document"])
display(pd.DataFrame(results, columns=["summary_text"]).head())

Unnamed: 0,summary_text
0,the full cost of damage in Newton Stewart is still being assessed . many roads in peeblesshire remain badly affected by standing water . a flood alert remains in place across the
1,a fire alarm went off at the Holiday Inn in Hope Street on Saturday . guests were asked to leave the hotel as they gathered outside . both groups have organised replacement coaches and
2,"Ferrari will start third ahead of team-mate Kimi Raikkonen . stewards only handed Hamilton a reprimand after governing body said ""no clear"
3,"the 67-year-old is accused of committing the offences between March 1972 and October 1989 . he denies all the charges, including two counts of indecency"
4,a man receiving treatment at the clinic threatened to shoot himself and others . he was evacuated from the hospital . the incident comes amid tension in Istanbul following several attacks .


[MLflow Tracking](https://mlflow.org/docs/latest/tracking.html) is organized hierarchically as follows:
* **An [experiment](https://mlflow.org/docs/latest/tracking.html#organizing-runs-in-experiments)** generally corresponds to the creation of 1 primary model or pipeline.  In our case, this is our LLM pipeline.  It contains some number of *runs*.

    * **A [run](https://mlflow.org/docs/latest/tracking.html#organizing-runs-in-experiments)** generally corresponds to the creation of 1 sub-model, such as 1 trial during hyperparameter tuning in traditional ML.  In our case, executing this notebook once will only create 1 run, but a second execution of the notebook will create a second run.  This version tracking can be useful during iterative development.  Each run contains some number of logged parameters, metrics, tags, models, artifacts, and other metadata.
        * **A [parameter](https://mlflow.org/docs/latest/tracking.html#concepts)** is an input to the model or pipeline, such as a regularization parameter in traditional ML or `max_length` for our LLM pipeline.
        * **A [metric](https://mlflow.org/docs/latest/tracking.html#concepts)** is an output of evaluation, such as accuracy or loss.
        * **An [artifact](https://mlflow.org/docs/latest/tracking.html#concepts)** is an arbitrary file stored alongside a run's metadata, such as the serialized model itself.
        * **A [flavor](https://mlflow.org/docs/latest/models.html#storage-format)** is an MLflow format for serializing models.  This format uses the underlying ML library's format (such as PyTorch, TensorFlow, Hugging Face, or your custom format), plus metadata.


MLflow has an API for tracking queries and predictions [`mlflow.llm.log_predictions()`](https://mlflow.org/docs/latest/python_api/mlflow.llm.html), which we will use below.  Note that, as of MLflow 2.3.1 (Apr 28, 2023), this API is Experimental, so it may change in later releases.  See the [LLM Tracking page](https://mlflow.org/docs/latest/llm-tracking.html) for more information.

***Tip***: We wrap our model development workflow with a call to `with mlflow.start_run():`.  This context manager syntax starts and ends the MLflow run explicitly, which is a best practice for code which may be moved to production.  See the [API doc](https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.start_run) for more information.


In [11]:
import mlflow

# Tell MLflow Tracking to use this explicit experiment path,
# which is located on the left hand sidebar under Machine Learning -> Experiments 
mlflow.set_experiment(f"/Users/{user_path}/experiment")

with mlflow.start_run():
    # LOG PARAMS
    mlflow.log_params(
        {
            "hf_model_name": hf_model_name,
            "min_length": min_length,
            "max_length": max_length,
            "truncation": truncation,
            "do_sample": do_sample,
        }
    )

    # --------------------------------
    # LOG INPUTS (QUERIES) AND OUTPUTS
    # Logged `inputs` are expected to be a list of str, or a list of str->str dicts.
    results_list = [r["summary_text"] for r in results]

    # Our LLM pipeline does not have prompts separate from inputs, so we do not log any prompts.
    mlflow.llm.log_predictions(
        inputs=xsum_sample["document"],
        outputs=results_list,
        prompts=["" for _ in results_list],
    )

    # ---------
    # LOG MODEL
    # We next log our LLM pipeline as an MLflow model.
    # This packages the model with useful metadata, such as the library versions used to create it.
    # This metadata makes it much easier to deploy the model downstream.
    # Under the hood, the model format is simply the ML library's native format (Hugging Face for us), plus metadata.

    # It is valuable to log a "signature" with the model telling MLflow the input and output schema for the model.
    signature = mlflow.models.infer_signature(
        xsum_sample["document"][0],
        mlflow.transformers.generate_signature_output(
            summarizer, xsum_sample["document"][0]
        ),
    )
    print(f"Signature:\n{signature}\n")

    # For mlflow.transformers, if there are inference-time configurations,
    # those need to be saved specially in the log_model call (below).
    # This ensures that the pipeline will use these same configurations when re-loaded.
    inference_config = {
        "min_length": min_length,
        "max_length": max_length,
        "truncation": truncation,
        "do_sample": do_sample,
    }

    # Logging a model returns a handle `model_info` to the model metadata in the tracking server.
    # This `model_info` will be useful later in the notebook to retrieve the logged model.
    model_info = mlflow.transformers.log_model(
        transformers_model=summarizer,
        artifact_path="summarizer",
        task="summarization",
        inference_config=inference_config,
        signature=signature,
        input_example="This is an example of a long news article which this pipeline can summarize for you.",
    )

2024/01/10 10:07:07 INFO mlflow.tracking.fluent: Experiment with name '/Users/user/experiment' does not exist. Creating a new experiment.
  mlflow.llm.log_predictions(
2024/01/10 10:07:07 INFO mlflow.tracking.llm_utils: Creating a new llm_predictions.csv for run 2ef13404ff6946bca51c7d43177dc984.


Signature:
inputs: 
  [string]
outputs: 
  [string]
params: 
  None




  model_info = mlflow.transformers.log_model(
  flavor.save_model(path=local_path, mlflow_model=mlflow_model, **kwargs)


README.md:   0%|          | 0.00/8.47k [00:00<?, ?B/s]



# <b>4 <span style='color:#78D118'>|</span> Query the MLflow Tracking server</b>

**MLflow Tracking API**: We briefly show how to query the logged model and metadata in the MLflow Tracking server, by loading the logged model.  See the [MLflow API](https://mlflow.org/docs/latest/python_api/mlflow.html) for more information about programmatic access.

**MLflow Tracking UI**: You can also use the UI.  In the right-hand sidebar, click the beaker icon to access the MLflow experiments run list, and then click through to access the Tracking server UI.  There, you can see the logged metadata and model.  Note in particular that our LLM inputs and outputs have been logged as a CSV file under model artifacts.

GIF of MLflow UI:
![GIF of MLflow UI](https://files.training.databricks.com/images/llm/llmops.gif)


Now, we can load the pipeline back from MLflow as a [pyfunc](https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html) and use the `.predict()` method to summarize an example document.

In [12]:
loaded_summarizer = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
loaded_summarizer.predict(xsum_sample["document"][0])




['many businesses and householders were affected by flooding in Newton Stewart . the water breached a retaining wall, flooding many commercial properties . a flood alert remains in place across']

The `.predict()` method can handle more than one document at a time, below we pass in all the data from `xsum_sample`.

In [13]:
results = loaded_summarizer.predict(xsum_sample.to_pandas()["document"])
display(pd.DataFrame(results, columns=["generated_summary"]).head())

Unnamed: 0,generated_summary
0,"many businesses and householders were affected by flooding in Newton Stewart . the water breached a retaining wall, flooding many commercial properties . a flood alert remains in place across"
1,"fire alarm went off at the Holiday Inn in Hope Street on Saturday . guests were asked to leave the hotel . one of the tour groups is from germany, the other from china"
2,"stewards only handed reprimand after governing body says ""no clear instruction was given on where he should park"" Stoffel Vandoorne out-qualified"
3,"the 67-year-old is accused of committing the offences between March 1972 and October 1989 . he denies all the charges, including two counts of indecency"
4,a man receiving psychiatric treatment at the clinic threatened to shoot himself and others . he was evacuated from the hospital . earlier reports that the armed man had


We are now ready to move to the staging step of deployment.  To get started, we will register the model in the MLflow Model Registry (more info below).

In [14]:
# Define the name for the model in the Model Registry.
# We filter out some special characters which cannot be used in model names.
model_name = f"summarizer - {user_path}"
model_name = model_name.replace("/", "_").replace(".", "_").replace(":", "_")
print(model_name)

summarizer - user


In [15]:
# Register a new model under the given name, or a new model version if the name exists already.
mlflow.register_model(model_uri=model_info.model_uri, name=model_name)

Successfully registered model 'summarizer - user'.
Created version '1' of model 'summarizer - user'.


<ModelVersion: aliases=[], creation_timestamp=1704881246855, current_stage='None', description=None, last_updated_timestamp=1704881246855, name='summarizer - user', run_id='2ef13404ff6946bca51c7d43177dc984', run_link=None, source='file:///kaggle/working/mlruns/548139300933260725/2ef13404ff6946bca51c7d43177dc984/artifacts/summarizer', status='READY', status_message=None, tags={}, user_id=None, version=1>

# <b>5 <span style='color:#78D118'>|</span> Test the LLM pipeline</b>



During the Staging step of development, our goal is to move code and/or models from Development to Production.  In order to do so, we must test the code and/or models to make sure they are ready for Production.

We track our progress here using the [MLflow Model Registry](https://mlflow.org/docs/latest/model-registry.html).  This metadata and model store organizes models as follows:
* **A registered model** is a named model in the registry, in our case corresponding to our summarization model.  It may have multiple *versions*.
    * **A model version** is an instance of a given model.  As you update your model, you will create new versions.  Each version is designated as being in a particular *stage* of deployment.
    * **A stage** is a stage of deployment: `None` (development), `Staging`, `Production`, or `Archived`.

The model we registered above starts with 1 version in stage `None` (development).

In the workflow below, we will programmatically transition the model from development to staging to production.  For more information on the Model Registry API, see the [Model Registry docs](https://mlflow.org/docs/latest/model-registry.html).  Alternatively, you can edit the registry and make model stage transitions via the UI.  To access the UI, click the Experiments menu option in the left-hand sidebar, and search for your model name.


In [16]:
from mlflow import MlflowClient

client = MlflowClient()

client.search_registered_models(filter_string=f"name = '{model_name}'")


[<RegisteredModel: aliases={}, creation_timestamp=1704881246852, description=None, last_updated_timestamp=1704881246855, latest_versions=[<ModelVersion: aliases=[], creation_timestamp=1704881246855, current_stage='None', description=None, last_updated_timestamp=1704881246855, name='summarizer - user', run_id='2ef13404ff6946bca51c7d43177dc984', run_link=None, source='file:///kaggle/working/mlruns/548139300933260725/2ef13404ff6946bca51c7d43177dc984/artifacts/summarizer', status='READY', status_message=None, tags={}, user_id=None, version=1>], name='summarizer - user', tags={}>]

In the metadata above, you can see that the model is currently in stage `None` (development).  In this workflow, we will run manual tests, but it would be reasonable to run both automated evaluation and human evaluation in practice.  Once tests pass, we will promote the model to stage `Production` to mark it ready for user-facing applications.

*Model URIs*: Below, we use model URIs to tell MLflow which model and version we are referring to.  Two common URI patterns for the MLflow Model Registry are:
* `f"models:/{model_name}/{model_version}"` to refer to a specific model version by number
* `f"models:/{model_name}/{model_stage}"` to refer to the latest model version in a given stage


In [17]:
model_version = 1
dev_model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")
dev_model



mlflow.pyfunc.loaded_model:
  artifact_path: summarizer
  flavor: mlflow.transformers
  run_id: 2ef13404ff6946bca51c7d43177dc984

*Note about model dependencies*:
When you load the model via MLflow above, you may see warnings about the Python environment.  It is very important to ensure that the environments for development, staging, and production match.
* For this demo notebook, everything is done within the same notebook environment, so we do not need to worry about libraries and versions.  However, in the Production section below, we demonstrate how to pass the `env_manager` argument to the method for loading the saved MLflow model, which tells MLflow what tooling to use to recreate the environment.
* To create a genuine production job, make sure to install the needed libraries.  MLflow saves these libraries and versions alongside the logged model; see the [MLflow docs on model storage](https://mlflow.org/docs/latest/models.html#storage-format) for more information.  While using Databricks for this course, you can also generate an example inference notebook which includes code for setting up the environment; see [the model inference docs](https://docs.databricks.com/machine-learning/manage-model-lifecycle/index.html#use-model-for-inference) for batch or streaming inference for more information.


# <b>6 <span style='color:#78D118'>|</span> Transition to Staging</b>

We will move the model to stage `Staging` to indicate that we are actively testing it.

In [18]:
client.transition_model_version_stage(model_name, model_version, "staging")

  client.transition_model_version_stage(model_name, model_version, "staging")


<ModelVersion: aliases=[], creation_timestamp=1704881246855, current_stage='Staging', description=None, last_updated_timestamp=1704881247581, name='summarizer - user', run_id='2ef13404ff6946bca51c7d43177dc984', run_link=None, source='file:///kaggle/working/mlruns/548139300933260725/2ef13404ff6946bca51c7d43177dc984/artifacts/summarizer', status='READY', status_message=None, tags={}, user_id=None, version=1>

See the current_stage move to Staging

In [19]:
##################
#### For Demo ####
##################
staging_model = dev_model
##################

# An actual CI/CD workflow might load the `staging_model` programmatically.  For example:
#   mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{Staging}")
# or
#   mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")


We now "test" the model manually on sample data. Here, we simply print out results and compare them with the original data.  In a more realistic setting, we might use a set of human evaluators to decide whether the model outperformed the previous model or system.

In [20]:
results = staging_model.predict(xsum_sample.to_pandas()["document"])
display(pd.DataFrame(results, columns=["generated_summary"]).head())


Unnamed: 0,generated_summary
0,the full cost of damage in Newton Stewart is still being assessed . many roads in peeblesshire remain badly affected by standing water . a flood alert remains in place across the
1,fire alarm went off at the Holiday Inn in Hope Street on Saturday . guests were asked to leave the hotel as they gathered outside . both groups have organised replacement coaches and will begin
2,"Sebastian Vettel will start third ahead of team-mate Kimi Raikkonen . stewards handed Hamilton a reprimand after governing body said ""no"
3,"the 67-year-old is accused of committing the offences between March 1972 and October 1989 . he denies all the charges, including two counts of indecency"
4,a man receiving treatment at the clinic threatened to shoot himself and others . the incident comes amid tensions in Istanbul following several attacks .


# <b>7 <span style='color:#78D118'>|</span> Transition to Production</b>


Let's transition the model to Production.

In [21]:
client.transition_model_version_stage(model_name, model_version, "production")

  client.transition_model_version_stage(model_name, model_version, "production")


<ModelVersion: aliases=[], creation_timestamp=1704881246855, current_stage='Production', description=None, last_updated_timestamp=1704881251604, name='summarizer - user', run_id='2ef13404ff6946bca51c7d43177dc984', run_link=None, source='file:///kaggle/working/mlruns/548139300933260725/2ef13404ff6946bca51c7d43177dc984/artifacts/summarizer', status='READY', status_message=None, tags={}, user_id=None, version=1>

See the current_stage move to Production

### Create a production workflow for batch inference

Once the LLM pipeline is in Production, it may be used by one or more production jobs or serving endpoints.  Common deployment locations are:
* Batch or streaming inference jobs
* Model serving endpoints
* Edge devices

Here, we will show batch inference using Apache Spark DataFrames, with Delta Lake format.  Spark allows simple scale-out inference for high-throughput, low-cost jobs, and Delta allows us to append to and modify inference result tables with ACID transactions.  See the [Apache Spark page](https://spark.apache.org/) and the [Delta Lake page](https://delta.io/) more more information on these technologies.


In [22]:
# Load our data as a Spark DataFrame.
# Recall that we saved this as Delta at the start of the notebook.
# Also note that it has a ground-truth summary column.

##################
#### For Demo ####
##################
prod_data = spark.read.load(prod_data_path).limit(10)
##################

# Spark Connector 
# prod_data = spark.read.format("delta").load(prod_data_path).limit(10)

display(prod_data)


DataFrame[document: string, summary: string, id: string]

Below, we load the model using `mlflow.pyfunc.spark_udf`.  This returns the model as a Spark User Defined Function which can be applied efficiently to big data.  *Note that the deployment code is library-agnostic: it never references that the model is a Hugging Face pipeline.*  This simplified deployment is possible because MLflow logs environment metadata and "knows" how to load the model and run it.

In [23]:
# MLflow lets you grab the latest model version in a given stage.  Here, we grab the latest Production version.
prod_model_udf = mlflow.pyfunc.spark_udf(
    spark,
    model_uri=f"models:/{model_name}/Production",
    env_manager="local",
    result_type="string",
)

  latest = client.get_latest_versions(name, None if stage is None else [stage])


Downloading artifacts:   0%|          | 0/16 [00:00<?, ?it/s]

2024/01/10 10:07:32 INFO mlflow.store.artifact.artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

2024/01/10 10:07:32 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


In [24]:
# Run inference by appending a new column to the DataFrame

batch_inference_results = prod_data.withColumn(
    "generated_summary", prod_model_udf("document")
)
display(batch_inference_results)

DataFrame[document: string, summary: string, id: string, generated_summary: string]

We can now write out our inference results to another Delta table.  Here, we append the results to an existing table (and create the table if it does not exist).

In [25]:
inference_results_path = f"{working_dir}/m6-inference-results".replace(
    "/dbfs", "dbfs:"
)
##################
#### For Demo ####
##################
batch_inference_results.write.mode("append").save(
    inference_results_path
)
##################

# Spark Connector 
#batch_inference_results.write.format("delta").mode("append").save(
#    inference_results_path
#)

Your max_length is set to 40, but your input_length is only 39. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=19)
  result = result.applymap(str)
  result = result.applymap(str)
  result = result.applymap(str)
  result = result.applymap(str)
                                                                                

To create a production job, we could for example take the new lines of code above, put them in a new notebook, and schedule it as an automated workflow.  MLflow can be integrated with essentially any deployment system, but for more information specific to this Databricks workspace, see the "Use model for inference" documentation for [AWS](https://docs.databricks.com/machine-learning/manage-model-lifecycle/index.html#use-model-for-inference), [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/manage-model-lifecycle/#--use-model-for-inference), or [GCP](https://docs.gcp.databricks.com/machine-learning/manage-model-lifecycle/index.html#use-model-for-inference).

We did not cover model serving for real-time inference, but MLflow models can be deployed to any cloud or on-prem serving systems.  For more information, see the [open-source MLflow Model Registry docs](https://mlflow.org/docs/latest/model-registry.html) or the [Databricks Model Serving docs](https://docs.databricks.com/machine-learning/model-serving/index.html).

For other topics not covered, see ["The Big Book of MLOps."](https://www.databricks.com/resources/ebook/the-big-book-of-mlops)


# <b>8 <span style='color:#78D118'>|</span> Summary</b>


We have now walked through a full example of going from development to production.  Our LLM pipeline was very simple, but LLM Ops for a more complex workflow (such as fine-tuning a custom model) would be very similar.  You still follow the basic Ops steps of:
* Development: Creating the pipeline or model, tracking the process in the MLflow Tracking server and saving the final pipeline or model.
* Staging: Registering a new model or version in the MLflow Model Registry, testing it, and promoting it through Staging to Production.
* Production: Creating an inference job, or creating a model serving endpoint.
