In [1]:
!mkdir cache
!pip install -U accelerate --quiet
!pip install pyspark --quiet
!pip install delta-spark --quiet
!pip install mlflow --quiet
!pip install pyngrok --quiet

In [None]:
!pip install datasets
!pip install transformers


In [6]:
from datasets import load_dataset
from transformers import pipeline

In [None]:
xsum_dataset = load_dataset(
    "xsum", version="1.2.0", cache_dir="cache"
)  # Note: We specify cache_dir to use pre-cached data.
xsum_sample = xsum_dataset["train"].select(range(10))
display(xsum_sample.to_pandas())

In [20]:
xsum_dataset

DatasetDict({
    train: Dataset({
        features: ['document', 'summary', 'id'],
        num_rows: 204045
    })
    validation: Dataset({
        features: ['document', 'summary', 'id'],
        num_rows: 11332
    })
    test: Dataset({
        features: ['document', 'summary', 'id'],
        num_rows: 11334
    })
})

In [8]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("LLMOps") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [9]:
prod_data_path = "cache/m6_prod_data"
test_spark_dataset = spark.createDataFrame(xsum_dataset["test"].to_pandas())
test_spark_dataset.write.format("delta").mode("overwrite").save(prod_data_path)

In [14]:
from transformers import pipeline

# Later, we plan to log all of these parameters to MLflow.
# Storing them as variables here will help with that.
hf_model_name = "t5-small"
min_length = 20
max_length = 100
truncation = True
do_sample = True

summarizer = pipeline(
    task="summarization",
    model=hf_model_name,
    min_length=min_length,
    max_length=max_length,
    truncation=truncation,
    do_sample=do_sample,
    model_kwargs={"cache_dir": "cache"},
)

In [None]:
doc0 = xsum_sample["document"][0]
doc0

In [17]:
summarizer(doc0)

[{'summary_text': 'the full cost of damage in Newton Stewart is still being assessed . many roads in peeblesshire remain badly affected by standing water . the water breached a retaining wall, flooding many commercial properties .'}]

In [None]:
print(f"Summary: {summarizer(doc0)[0]['summary_text']}")
print("===============================================")
print(f"Original Document: {doc0}")

In [None]:
import pandas as pd

results = summarizer(xsum_sample["document"])
display(pd.DataFrame(results, columns=["summary_text"]))

In [21]:
import mlflow
import mlflow.pytorch

In [22]:
get_ipython().system_raw("mlflow ui --port 5000 &")
mlflow.pytorch.autolog()



In [26]:
from pyngrok import ngrok
from getpass import getpass

# Terminate open tunnels if exist
ngrok.kill()

# Setting the authtoken (optional)
# Get your authtoken from https://dashboard.ngrok.com/auth
#NGROK_AUTH_TOKEN = "2Padn9VzXvPy7nJXe6eAUTR3Dbd_6cXCwQeLNLwZDCWL5ypKs"
#ngrok.set_auth_token(NGROK_AUTH_TOKEN)

# Open an HTTPs tunnel on port 5000 for http://localhost:5000
ngrok_tunnel = ngrok.connect(addr="5000", proto="http", bind_tls=True)
print("MLflow Tracking UI:", ngrok_tunnel.public_url)



MLflow Tracking UI: https://3ece-34-135-110-177.ngrok.io


In [None]:
# Tell MLflow Tracking to user this explicit experiment path,
# which is in your home directory under the Workspace browser (left-hand sidebar).
mlflow.set_experiment("MLflow experiment")

with mlflow.start_run():
    # LOG PARAMS
    mlflow.log_params(
        {
            "hf_model_name": hf_model_name,
            "min_length": min_length,
            "max_length": max_length,
            "truncation": truncation,
            "do_sample": do_sample,
        }
    )

    # --------------------------------
    # LOG INPUTS (QUERIES) AND OUTPUTS
    # Logged `inputs` are expected to be a list of str, or a list of str->str dicts.
    results_list = [r["summary_text"] for r in results]

    # Our LLM pipeline does not have prompts separate from inputs, so we do not log any prompts.
    mlflow.llm.log_predictions(
        inputs=xsum_sample["document"],
        outputs=results_list,
        prompts=["" for _ in results_list],
    )

    # ---------
    # LOG MODEL
    # We next log our LLM pipeline as an MLflow model.
    # This packages the model with useful metadata, such as the library versions used to create it.
    # This metadata makes it much easier to deploy the model downstream.
    # Under the hood, the model format is simply the ML library's native format (Hugging Face for us), plus metadata.

    # It is valuable to log a "signature" with the model telling MLflow the input and output schema for the model.
    signature = mlflow.models.infer_signature(
        xsum_sample["document"][0],
        mlflow.transformers.generate_signature_output(
            summarizer, xsum_sample["document"][0]
        ),
    )
    print(f"Signature:\n{signature}\n")

    # For mlflow.transformers, if there are inference-time configurations,
    # those need to be saved specially in the log_model call (below).
    # This ensures that the pipeline will use these same configurations when re-loaded.
    inference_config = {
        "min_length": min_length,
        "max_length": max_length,
        "truncation": truncation,
        "do_sample": do_sample,
    }

    # Logging a model returns a handle `model_info` to the model metadata in the tracking server.
    # This `model_info` will be useful later in the notebook to retrieve the logged model.
    model_info = mlflow.transformers.log_model(
        transformers_model=summarizer,
        artifact_path="summarizer",
        task="summarization",
        inference_config=inference_config,
        signature=signature,
        input_example="This is an example of a long news article which this pipeline can summarize for you.",
    )

In [None]:
loaded_summarizer = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
loaded_summarizer.predict(xsum_sample["document"][0])

In [None]:
results = loaded_summarizer.predict(xsum_sample.to_pandas()["document"])
display(pd.DataFrame(results, columns=["generated_summary"]))

In [30]:
# Define the name for the model in the Model Registry.
# We filter out some special characters which cannot be used in model names.
model_name = "summarizer - RaviKumar"
model_name = model_name.replace("/", "_").replace(".", "_").replace(":", "_")
print(model_name)

summarizer - RaviKumar


In [None]:
mlflow.register_model(model_uri=model_info.model_uri, name=model_name)

In [32]:
from mlflow import MlflowClient

client = MlflowClient()

In [None]:
client.search_registered_models(filter_string=f"name = '{model_name}'")

In [None]:
model_version = 1
dev_model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")
dev_model

In [None]:
client.transition_model_version_stage(model_name, model_version, "staging")

In [36]:
staging_model = dev_model

In [None]:
results = staging_model.predict(xsum_sample.to_pandas()["document"])
display(pd.DataFrame(results, columns=["generated_summary"]))