In [None]:
%pip install "zenml[server]==0.55.0"

!zenml integration install openai langchain -y
import IPython

# automatically restart kernel
IPython.Application.instance().kernel.do_shutdown(restart=True)

**Note**: You need an [ngrok account](https://dashboard.ngrok.com/signup) to be able to view the ZenML Dashboard later. Please set up an account, then set your user token below:

In [None]:
NGROK_TOKEN = ""  # TODO: set your ngrok token since you're working on an online notebook

In [None]:
!pip install pyngrok
!ngrok authtoken {NGROK_TOKEN}

In [None]:
!zenml version

### 🍡 The case for an ML Pipeline

As an ML practitioner, you are probably familiar with building ML models using Scikit-learn, PyTorch, TensorFlow, or similar. An ML Pipeline is simply an extension, including other steps you would typically do before or after building a model, like data acquisition, preprocessing, model deployment, or monitoring. The ML pipeline essentially defines a step-by-step procedure of your work as an ML practitioner. Defining ML pipelines explicitly in code is great because:

- We can easily rerun all of our work, not just the model, eliminating bugs and making our models easier to reproduce.
- Data and models can be versioned and tracked, so we can see at a glance which dataset a model was trained on and how it compares to other models.
- With caching, you can skip steps that have already been run, making it easy to work with large datasets and experiment quickly.
- If the entire pipeline is coded up, we can automate many operational tasks, like retraining and redeploying models when the underlying problem or data changes or rolling out new and improved models with CI/CD workflows.

Having a clearly defined ML pipeline is essential for ML teams that aim to serve models on a large scale.

### 🧘 ZenML Setup
We will define our ML pipelines using ZenML. It is an excellent tool for this task, as it is straightforward and intuitive to use and has integrations with most of the advanced MLOps tools we might want to use laterin our MLOps journey. Make sure you have ZenML installed (via pip install zenml). Next, let's run some commands to make sure you start with a fresh ML stack.

In [None]:
!rm -rf .zen
!zenml init

#### Connect to the ZenML Cloud

[ZenML Cloud](https://www.zenml.io/cloud) offers multi-tenant, fully-managed ZenML deployments. The servers come with a host of advanced features like built-in roles for access control, a new dashboard for monitoring and visualizing your pipelines, a model control plane that tracks your ML models across pipelines along with all the artifacts and metadata associated with them, and much more.

You can sign up for a free trial of the cloud at https://cloud.zenml.io. Once signed up you can connect to it using the command below.

> **Note**: You can also choose to run this example locally. In that case, you can skip this step. You won't be able to see the Model Control Plane dashboard in that case, but you can still run the pipeline and see the results in the terminal.

In [None]:
!zenml connect --url <ZENML_CLOUD_SERVER_URL>

You'll then have to restart the kernel to make sure the changes are picked up.

In [None]:
# automatically restart kernel
import IPython

IPython.Application.instance().kernel.do_shutdown(restart=True)

#### Stack
A [stack](https://docs.zenml.io/user-guide/starter-guide/understand-stacks) in ZenML is the combination of tools and infrastructure that your pipelines can run on. When you run ZenML code without configuring a stack, the pipeline will run on the so-called default stack.

![image.png](attachment:image.png)

A stack allows ZenML to separate code from infrastructure. This means, that it is easy to switch the environment that the pipeline runs on without making changes to the code. 

Let's take a look at the current active stack. You will find that you are running a default stack which comes with a local [orchestrator](https://docs.zenml.io/stacks-and-components/component-guide/orchestrators) and a local [artifact store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores). Discover more about stack components [here](https://docs.zenml.io/stacks-and-components/component-guide).

In [None]:
!zenml stack describe

### Let's go step-by-step

A ZenML pipeline is built up of several steps. Each step can be defined as a Python function which can either take in inputs from other steps, or directly from the pipeline or user.

![](image.jpg)

Our pipeline will consist of the following steps:

- **URL Scraper**: This step will scrape the URLs of the documentation that we pass through it. It will return a list of URLs to load.

- **Web URL Loader**: This step will load the HTML content of the URLs passed through it. It will return a list of Documents.

- **Index Generator**: This step will generate an index of the documents passed through it. It will return a vector store collection.

- **Agent Creator**: This step takes in the vector store and returns a LangChain AgentExecutor which we can run queries against.

### The URL Scraper

All that you need to do to convert a simple python function into a ZenML step is add the `@step` decorator.
Adding this decorator tells ZenML to
- allow this function to be passed in as a pipeline step.
- track and version the outputs of this step in your active artifact store (configured by the ZenML Stack).
- allow additional options like enabling cache, setting remote execution environments for the step and more!

In [None]:
from typing import List
from typing_extensions import Annotated
from steps.url_scraping_utils import get_all_pages, get_nested_readme_urls
from zenml import step, log_artifact_metadata


@step(enable_cache=True)
def url_scraper(
    docs_url: str = "",
    repo_url: str = "",
    release_notes_url: str = "",
    website_url: str = "",
) -> Annotated[List[str], "urls"]:
    """Generates a list of relevant URLs to scrape.

    Args:
        docs_url: URL to the documentation.
        repo_url: URL to the repository.
        release_notes_url: URL to the release notes.
        website_url: URL to the website.

    Returns:
        List of URLs to scrape.
    """
    # examples_readme_urls = get_nested_readme_urls(repo_url)
    # docs_urls = get_all_pages(docs_url)
    # website_urls = get_all_pages(website_url)
    # return docs_urls + website_urls + [release_notes_url]
    all_urls = [website_url]
    log_artifact_metadata(
        artifact_name="urls",
        metadata={
            "count": len(all_urls),
        }
    )
    return all_urls

### The Web URL Loader

This step makes use of the UnstructuredURLLoader wrapper from LangChain to load the URLs as documents.

You can import any libraries that you would want to use and use it inside a step definition. Running it locally only requires that you have the library installed on your environment. When you want to run on a remote stack, you can either choose to allow ZenML to derive dependencies from your local envrionment, or pass in the requirements explicitly (more on this later).

In [None]:
from langchain.docstore.document import Document
from langchain.document_loaders import UnstructuredURLLoader


@step(enable_cache=True)
def web_url_loader(urls: List[str]) -> List[Document]:
    """Loads documents from a list of URLs.

    Args:
        urls: List of URLs to load documents from.

    Returns:
        List of langchain documents.
    """
    loader = UnstructuredURLLoader(
        urls=urls,
    )
    return loader.load()

### The Index Generator

Here, we use FAISS as our vector store of choice, along with OpenAI embeddings to generate a collection from the documents passed from the previous step.

You can also choose some other store as you wish; the step implementation will change but the rest of the pipeline will stay the same. You can also choose to create a new function and then pass that to the pipeline instead of the one we have here. This way, ZenML allows you to easily switch between implementations without trouble.

In [None]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import (
    CharacterTextSplitter,
)
from langchain.schema.vectorstore import VectorStore
from langchain.vectorstores.faiss import FAISS


@step(enable_cache=True)
def index_generator(documents: List[Document]) -> Annotated[VectorStore, "vector_store"]:
    embeddings = OpenAIEmbeddings()

    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    compiled_texts = text_splitter.split_documents(documents)

    log_artifact_metadata(
        artifact_name="vector_store",
        metadata={
            "embedding_type": "OpenAIEmbeddings",
            "vector_store_type": "FAISS",
        }
    )

    return FAISS.from_documents(compiled_texts, embeddings)

### The Agent Creator

With the agent creator, we define:
- a VectorStoreQATool from LangChain's set of available tools, with the vector store we created in the previous step.
- a LangChain agent with some custom prompts. This is done to customize the personality of the agent and make it identify as a ZenML bot. You can also play around with the prompt to change how the agent talks (think pirate, shakespeare, etc.).
- a LangChain AgentExecutor which we can use to run queries against the agent. This AgentExecutor acts like a middleman facilitating communication between the LLM and the tools.


In [None]:
%reload_ext autoreload
%autoreload 2

In [None]:
import logging
from typing import Dict, cast

from agent.agent_executor_materializer import AgentExecutorMaterializer
from agent.prompt import PREFIX, SUFFIX
from langchain.agents import ConversationalChatAgent
from langchain.chat_models import ChatOpenAI
from langchain.schema.vectorstore import VectorStore
from langchain.tools.vectorstore.tool import VectorStoreQATool
from langchain.agents import AgentExecutor
from steps.agent_creator import AgentParameters
from zenml import step, ArtifactConfig


PIPELINE_NAME = "zenml_agent_creation_pipeline"
# Choose what character to use for your agent's answers
CHARACTER = "technical assistant"

@step(output_materializers=AgentExecutorMaterializer, enable_cache=False)
def agent_creator(
    vector_store: VectorStore, config: AgentParameters
) -> Annotated[AgentExecutor, ArtifactConfig(name="agent", is_model_artifact=True)]:
    """Create an agent from a vector store.

    Args:
        vector_store: Vector store to create agent from.

    Returns:
        An AgentExecutor.
    """
    tools = [
        VectorStoreQATool(
            name=f"zenml-qa-tool",
            vectorstore=vector_store,
            description="Use this tool to answer questions about ZenML. "
            "How to debug errors in ZenML, how to answer conceptual "
            "questions about ZenML like available features, existing abstractions, "
            "and other parts from the documentation.",
            llm=ChatOpenAI(**config.llm),
        ),
    ]

    system_prompt = PREFIX.format(character=CHARACTER)

    my_agent = ConversationalChatAgent.from_llm_and_tools(
        llm=ChatOpenAI(**config.llm),
        tools=tools,
        system_message=system_prompt,
        human_message=SUFFIX,
    )

    agent_executor = AgentExecutor.from_agent_and_tools(
        agent=my_agent,
        tools=tools,
        verbose=True,
    )

    log_artifact_metadata(
        artifact_name="agent",
        metadata={
            "Tools and their descriptions": {
                tool.name: tool.description for tool in tools
            },
            "Personality": {
                "character": CHARACTER,
                "temperature": config.llm["temperature"],
                "model_name": config.llm["model_name"],
            },
        }
    )

    return agent_executor


#### Custom Materializer

You may notice that we have specified an `output_materializer` in the step decorator. A [materializer](https://docs.zenml.io/user-guide/advanced-guide/artifact-management/handle-custom-data-types) in ZenML is what defines how an object is serialized and deserialized while moving between steps. ZenML comes with a host of built-in materializers that include basic types, collections and pydantic objects, along with a default cloudpickle materializer. 

You can also define your own custom materializers. Here, we are using a custom materializer (defined in the agent/ directory) for the AgentExecutor type, which is a pydantic object that doesn't have a `.json()` implementation. 
Instead of passing it in the decorator, you can also simply specify the data type a materializer can handle while defining it and ZenML will pick it up automatically and match any relevant output types to it.

### Creating a ZenML pipeline

A [pipeline](https://docs.zenml.io/user-guide/starter-guide/create-an-ml-pipeline) is where you chain all the steps together. The order of execution is determined by the relationship between outputs and inputs across steps. You can also choose to add explicit dependencies using `.after()` and `.before()`.

The output of each step is tracked and versioned in the artifact store that you configure in your ZenML stack (local, by deafult). ZenML does the heavy-lifting of making sure that the right data is available to the step that needs it.

In [None]:
from zenml import pipeline, Model

MODEL_NAME = "zenml_agent"

@pipeline(name=PIPELINE_NAME,
          enable_cache=True,
          model=Model(
              name=MODEL_NAME,
              license="Apache",
              description="ZenML Agent with a vector store tool.",
              tags=["llm", "agent", "rag"]
          ))
def docs_to_agent_pipeline(
    docs_url: str = "",
    repo_url: str = "",
    release_notes_url: str = "",
    website_url: str = "",
) -> None:
    """Generate index for ZenML.

    Args:
        docs_url: URL to the documentation.
        repo_url: URL to the repository.
        release_notes_url: URL to the release notes.
        website_url: URL to the website.
    """
    urls = url_scraper(docs_url, repo_url, release_notes_url, website_url)
    documents = web_url_loader(urls)
    vector_store = index_generator(documents)
    agent = agent_creator(vector_store=vector_store)


### 🚀 Running the pipeline with the ZenML Docs and website data.

You can run a pipeline by simply calling the pipeline function with your parameters. In the block below, we pass in URLs specific to ZenML to create an index based on ZenML data. Feel free to use the URLs of your favorite tools to create an index of your own! 🧑‍🍳

In [None]:
import os

os.environ["OPENAI_API_KEY"] = ""

In [None]:
version = "0.54.1"
docs_url = f"https://docs.zenml.io/v/{version}/"
website_url = "https://zenml.io"
repo_url = f"https://github.com/zenml-io/zenml/tree/{version}/examples"
release_notes_url = (
    f"https://github.com/zenml-io/zenml/blob/{version}/RELEASE_NOTES.md"
)

docs_to_agent_pipeline(
    website_url=website_url,
    docs_url=docs_url,
    repo_url=repo_url,
    release_notes_url=release_notes_url,
)

#### The Models Tab on the Cloud Dashboard

If you are running this on the cloud, you can head over to your ZenML Dashboard to see the pipeline run. You can also see the models tab to see the models that were created as a result of the pipeline run. You can click on the model to see the metadata associated with it, the artifacts that were used to create it, and more.

![image.png](attachment:image.png)

### ZenML Dashboard

If you're using a Cloud tenant, you can skip this section. 

For those running the pipeline locally, you can start the ZenML Dashboard by running the code below. You can view all your pipelines, stacks, stack components and much more, all in one place!

In [None]:
!zenml down
!zenml up --blocking

### 🧪 Testing the Agent

ZenML tracks and versions artifacts from all steps across all pipeline versions. This makes it really easy to pick up a specific version of your desired output and test it. You can also have a pipeline that compares models/outputs from different versions and picks the best one! The possibilities are endless 😉


Here we define a function that fetches the latest model version (by default) and runs your query against it. You can also specify a specific version to test against. We also log the query and the response from a specific version of the model to the ZenML dashboard. This makes it easy to track the performance of the model over time.

In [None]:
from typing import Any
from zenml.client import Client
from zenml import log_model_metadata
import datetime

MODEL_NAME = "zenml_agent"

def ask_agent(question: str, version: Any = None) -> str:
    """Ask the agent a question.

    Args:
        question: Question to ask the agent.
        version: Version of the model to use.

    Returns:
        Answer from the agent.
    """
    client = Client()
    model_version = client.get_model_version(MODEL_NAME, model_version_name_or_number_or_id=version)
    agent = model_version.get_artifact("agent").load()

    answer = agent.run({"input": question,"chat_history": []})
    
    log_model_metadata(
        model_name=MODEL_NAME,
        model_version=model_version.number,
        metadata={
            # add current time as a heading
            f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}": {
                "question": question,
                "answer": answer,
            },
        }
    )


Now that we have the retrieved the agent from the latest model version, let's test it by
asking it some questions. Feel free to customize the question to your liking!

In [None]:
ask_agent("Who are you?")

If you go back to your `zenml_agent` model in your Cloud Dashboard, you can click on any version which would show you options to learn about the model artifacts, data artifacts, deployments of the model and the metadata associated with it.

![image.png](attachment:image.png)

You can see that the question and response have been logged to the dashboard in different cards with a timestamp.

#### Promoting the model to production

If you are satisfied with the performance of your agent, you can promote it to production with a simple function call. This will attach the `production` tag to your model and it can then be retrieved and consumed directly or in other pipelines using this tag.

You can also visit the dashboard to see the model in the production stage. Learn more about [Stages and Promotion](https://docs.zenml.io/user-guide/advanced-guide/data-management/model-management#stages-and-promotion) in our docs.

In [None]:
from zenml.enums import ModelStages

client = Client()
model_version = client.get_model_version(MODEL_NAME)
model_version.set_stage(ModelStages.PRODUCTION)

You can now call the `ask_agent` function with the production tag as the version to always get answers using the latest production model.

In [None]:
ask_agent("What is an orchestrator in ZenML?", version=ModelStages.PRODUCTION)