In [None]:
%pip install "zenml[server]"

!zenml integration install openai langchain -y
import IPython

# automatically restart kernel
IPython.Application.instance().kernel.do_shutdown(restart=True)

### 🍡 The case for an ML Pipeline

As an ML practitioner, you are probably familiar with building ML models using Scikit-learn, PyTorch, TensorFlow, or similar. An ML Pipeline is simply an extension, including other steps you would typically do before or after building a model, like data acquisition, preprocessing, model deployment, or monitoring. The ML pipeline essentially defines a step-by-step procedure of your work as an ML practitioner. Defining ML pipelines explicitly in code is great because:

- We can easily rerun all of our work, not just the model, eliminating bugs and making our models easier to reproduce.
- Data and models can be versioned and tracked, so we can see at a glance which dataset a model was trained on and how it compares to other models.
- With caching, you can skip steps that have already been run, making it easy to work with large datasets and experiment quickly.
- If the entire pipeline is coded up, we can automate many operational tasks, like retraining and redeploying models when the underlying problem or data changes or rolling out new and improved models with CI/CD workflows.

Having a clearly defined ML pipeline is essential for ML teams that aim to serve models on a large scale.

### 🧘 ZenML Setup
We will define our ML pipelines using ZenML. It is an excellent tool for this task, as it is straightforward and intuitive to use and has integrations with most of the advanced MLOps tools we might want to use laterin our MLOps journey. Make sure you have ZenML installed (via pip install zenml). Next, let's run some commands to make sure you start with a fresh ML stack.

In [None]:
!rm -rf .zen
!zenml init

# set up a local ZenML server
!zenml up

#### Stack
A [stack](https://docs.zenml.io/user-guide/starter-guide/understand-stacks) in ZenML is the combination of tools and infrastructure that your pipelines can run on. When you run ZenML code without configuring a stack, the pipeline will run on the so-called default stack.

![image.png](attachment:image.png)

A stack allows ZenML to separate code from infrastructure. This means, that it is easy to switch the environment that the pipeline runs on without making changes to the code. 

Let's take a look at the current active stack. You will find that you are running a default stack which comes with a local [orchestrator](https://docs.zenml.io/stacks-and-components/component-guide/orchestrators) and a local [artifact store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores). Discover more about stack components [here](https://docs.zenml.io/stacks-and-components/component-guide).

In [1]:
!zenml stack describe

[1;35mNote: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.[0m
[1;35mNumExpr defaulting to 8 threads.[0m
[?25l[32m⠋[0m Describing the stack...
[2K[1A[2K[32m⠙[0m Describing the stack...
[2K[1A[2K[32m⠹[0m Describing the stack...
[2K[1A[2K[32m⠸[0m Describing the stack...
[2K[1A[2K[32m⠼[0m Describing the stack...
[2K[1A[2K[32m⠴[0m Describing the stack...
[2K[1A[2K[32m⠦[0m Describing the stack...
[2K[1A[2K[32m⠧[0m Describing the stack...
[2K[1A[2K[3m        Stack Configuration        [0m
┏━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━┓
┃[1m [0m[1mCOMPONENT_TYPE[0m[1m [0m│[1m [0m[1mCOMPONENT_NAME[0m[1m [0m┃
┠────────────────┼────────────────┨
┃ ARTIFACT_STORE │ default        ┃
┠────────────────┼────────────────┨
┃ ORCHESTRATOR   │ default        ┃
┗━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━┛
[2;3m     'default' stack (ACTIVE)      [0m
[32m⠧[0m Describing the stack...
[2K[1A[2K[2;36mStack [0m[2;32m'defa

### Let's go step-by-step

A ZenML pipeline is built up of several steps. Each step can be defined as a Python function which can either take in inputs from other steps, or directly from the pipeline or user.

![image.png](attachment:image.png)

Our pipeline will consist of the following steps:

- **URL Scraper**: This step will scrape the URLs of the documentation that we pass through it. It will return a list of URLs to load.

- **Web URL Loader**: This step will load the HTML content of the URLs passed through it. It will return a list of Documents.

- **Index Generator**: This step will generate an index of the documents passed through it. It will return a vector store collection.

- **Agent Creator**: This step takes in the vector store and returns a LangChain AgentExecutor which we can run queries against.

### The URL Scraper

All that you need to do to convert a simple python function into a ZenML step is add the `@step` decorator.
Adding this decorator tells ZenML to
- allow this function to be passed in as a pipeline step.
- track and version the outputs of this step in your active artifact store (configured by the ZenML Stack).
- allow additional options like enabling cache, setting remote execution environments for the step and more!

In [2]:
from typing import List

from steps.url_scraping_utils import get_all_pages, get_nested_readme_urls
from zenml import step


@step(enable_cache=True)
def url_scraper(
    docs_url: str = "",
    repo_url: str = "",
    release_notes_url: str = "",
    website_url: str = "",
) -> List[str]:
    """Generates a list of relevant URLs to scrape.

    Args:
        docs_url: URL to the documentation.
        repo_url: URL to the repository.
        release_notes_url: URL to the release notes.
        website_url: URL to the website.

    Returns:
        List of URLs to scrape.
    """
    # examples_readme_urls = get_nested_readme_urls(repo_url)
    docs_urls = get_all_pages(docs_url)
    website_urls = get_all_pages(website_url)
    return docs_urls + website_urls + [release_notes_url]

[1;35mNote: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.[0m
[1;35mNumExpr defaulting to 8 threads.[0m


### The Web URL Loader

This step makes use of the UnstructuredURLLoader wrapper from LangChain to load the URLs as documents.

You can import any libraries that you would want to use and use it inside a step definition. Running it locally only requires that you have the library installed on your environment. When you want to run on a remote stack, you can either choose to allow ZenML to derive dependencies from your local envrionment, or pass in the requirements explicitly (more on this later).

In [3]:
from langchain.docstore.document import Document
from langchain.document_loaders import UnstructuredURLLoader


@step(enable_cache=True)
def web_url_loader(urls: List[str]) -> List[Document]:
    """Loads documents from a list of URLs.

    Args:
        urls: List of URLs to load documents from.

    Returns:
        List of langchain documents.
    """
    loader = UnstructuredURLLoader(
        urls=urls,
    )
    return loader.load()

### The Index Generator

Here, we use FAISS as our vector store of choice, along with OpenAI embeddings to generate a collection from the documents passed from the previous step.

You can also choose some other store as you wish; the step implementation will change but the rest of the pipeline will stay the same. You can also choose to create a new function and then pass that to the pipeline instead of the one we have here. This way, ZenML allows you to easily switch between implementations without trouble.

In [4]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import (
    CharacterTextSplitter,
)
from langchain.vectorstores import FAISS, VectorStore


@step(enable_cache=True)
def index_generator(documents: List[Document]) -> VectorStore:
    embeddings = OpenAIEmbeddings()

    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    compiled_texts = text_splitter.split_documents(documents)

    return FAISS.from_documents(compiled_texts, embeddings)

### The Agent Creator

With the agent creator, we define:
- a VectorStoreQATool from LangChain's set of available tools, with the vector store we created in the previous step.
- a LangChain agent with some custom prompts. This is done to customize the personality of the agent and make it identify as a ZenML bot. You can also play around with the prompt to change how the agent talks (think pirate, shakespeare, etc.).
- a LangChain AgentExecutor which we can use to run queries against the agent. This AgentExecutor acts like a middleman facilitating communication between the LLM and the tools.


In [5]:
import logging
from typing import Dict, cast

from agent.agent_executor_materializer import AgentExecutorMaterializer
from agent.prompt import PREFIX, SUFFIX
from langchain.agents import ConversationalChatAgent
from langchain.chat_models import ChatOpenAI
from langchain.vectorstores import VectorStore
from langchain.tools.vectorstore.tool import VectorStoreQATool
from langchain.agents import AgentExecutor
from steps.agent_creator import AgentParameters
from zenml import step


PIPELINE_NAME = "zenml_agent_creation_pipeline"


@step(output_materializers=AgentExecutorMaterializer, enable_cache=False)
def agent_creator(
    vector_store: VectorStore, config: AgentParameters
) -> AgentExecutor:
    """Create an agent from a vector store.

    Args:
        vector_store: Vector store to create agent from.

    Returns:
        An AgentExecutor.
    """
    tools = [
        VectorStoreQATool(
            name=f"zenml",
            vectorstore=vector_store,
            description="Use this tool to answer questions about ZenML. "
            "How to debug errors in ZenML, how to answer conceptual "
            "questions about ZenML like available features, existing abstractions, "
            "and other parts from the documentation.",
            llm=ChatOpenAI(**config.llm),
        ),
    ]

    my_agent = ConversationalChatAgent.from_llm_and_tools(
        llm=ChatOpenAI(**config.llm),
        tools=tools,
        system_message=PREFIX,
        human_message=SUFFIX,
    )

    agent_executor = AgentExecutor.from_agent_and_tools(
        agent=my_agent,
        tools=tools,
        verbose=True,
    )

    logging.info("About to return agent executor.")
    return agent_executor



#### Custom Materializer

You may notice that we have specified an `output_materializer` in the step decorator. A [materializer](https://docs.zenml.io/user-guide/advanced-guide/artifact-management/handle-custom-data-types) in ZenML is what defines how an object is serialized and deserialized while moving between steps. ZenML comes with a host of built-in materializers that include basic types, collections and pydantic objects, along with a default cloudpickle materializer. 

You can also define your own custom materializers. Here, we are using a custom materializer (defined in the agent/ directory) for the AgentExecutor type, which is a pydantic object that doesn't have a `.json()` implementation. 
Instead of passing it in the decorator, you can also simply specify the data type a materializer can handle while defining it and ZenML will pick it up automatically and match any relevant output types to it.

### Creating a ZenML pipeline

A [pipeline](https://docs.zenml.io/user-guide/starter-guide/create-an-ml-pipeline) is where you chain all the steps together. The order of execution is determined by the relationship between outputs and inputs across steps. You can also choose to add explicit dependencies using `.after()` and `.before()`.

The output of each step is tracked and versioned in the artifact store that you configure in your ZenML stack (local, by deafult). ZenML does the heavy-lifting of making sure that the right data is available to the step that needs it.

In [6]:
from zenml import pipeline


@pipeline(name=PIPELINE_NAME, enable_cache=True)
def docs_to_agent_pipeline(
    docs_url: str = "",
    repo_url: str = "",
    release_notes_url: str = "",
    website_url: str = "",
) -> None:
    """Generate index for ZenML.

    Args:
        docs_url: URL to the documentation.
        repo_url: URL to the repository.
        release_notes_url: URL to the release notes.
        website_url: URL to the website.
    """
    urls = url_scraper(docs_url, repo_url, release_notes_url, website_url)
    documents = web_url_loader(urls)
    vector_store = index_generator(documents)
    agent = agent_creator(vector_store=vector_store)


### 🚀 Running the pipeline with the ZenML Docs and website data.

You can run a pipeline by simply calling the pipeline function with your parameters. In the block below, we pass in URLs specific to ZenML to create an index based on ZenML data. Feel free to use the URLs of your favorite tools to create an index of your own! 🧑‍🍳

In [10]:
import os

os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

In [11]:
version = "0.44.1"
docs_url = f"https://docs.zenml.io/v/{version}/"
website_url = "https://zenml.io"
repo_url = f"https://github.com/zenml-io/zenml/tree/{version}/examples"
release_notes_url = (
    f"https://github.com/zenml-io/zenml/blob/{version}/RELEASE_NOTES.md"
)

docs_to_agent_pipeline(
    website_url=website_url,
    docs_url=docs_url,
    repo_url=repo_url,
    release_notes_url=release_notes_url,
)

[1;35mInitiating a new run for the pipeline: [0m[1;36mzenml_agent_creation_pipeline[1;35m.[0m
[1;35mReusing registered version: [0m[1;36m(version: 14)[1;35m.[0m
[1;35mExecuting a new run.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mdefault[1;35m[0m
[1;35m  artifact_store: [0m[1;36mdefault[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35mUsing cached version of [0m[1;36murl_scraper[1;35m.[0m
[1;35mStep [0m[1;36murl_scraper[1;35m has started.[0m
[1;35mUsing cached version of [0m[1;36mweb_url_loader[1;35m.[0m
[1;35mStep [0m[1;36mweb_url_loader[1;35m has started.[0m
[1;35mStep [0m[1;36mindex_generator[1;35m has started.[0m
[33mCreated a chunk of size 2971, which is longer than the specified 1000[0m
[33mCreated a chunk of size 2929, which is longer than the specified 1000[0m
[33mCreated a chunk of size 2929, which is longer than the specified 1000[0m
[33mCreated a chunk of size 1104,

### 🧪 Testing the Agent

ZenML tracks and versions artifacts from all steps across all pipeline versions. This makes it really easy to pick up a specific version of your desired output and test it. You can also have a pipeline that compares models/outputs from different versions and picks the best one! The possibilities are endless 😉


By default when you call the `get_pipeline` function on the ZenML client, it gets you the latest pipeline version model. Here, we look into the `agent_creator` step and load its output for testing.

In [12]:
from zenml.client import Client

pipeline_model = Client().get_pipeline(
    name_id_or_prefix=PIPELINE_NAME
)

# you can additionally pass in the version if you want
# to move between different pipeline implementations.
# pipeline_model = Client().get_pipeline(
#     name_id_or_prefix=PIPELINE_NAME, version="9"
# )

if pipeline_model.runs is not None:
    # get the last run
    last_run = pipeline_model.runs[0]
    # get the agent_creator step
    agent_creator_step = last_run.steps["agent_creator"]

    try:
        agent = agent_creator_step.output.load()

        existing_tools = agent.tools
    except ValueError:
        print("No existing agent found.")

Now that we have the retrieved the agent from a previous run, let's test it by
asking it some questions. Feel free to customize the prompt to your liking!

In [20]:
agent.run({"input": "Who are you?","chat_history": []})



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m{
    "action": "Final Answer",
    "action_input": "I am ZenML Agent, a large language model operated by ZenML. I am here to assist with a wide range of tasks and provide information and insights on various topics, including ZenML. How can I assist you today?"
}[0m

[1m> Finished chain.[0m


'I am ZenML Agent, a large language model operated by ZenML. I am here to assist with a wide range of tasks and provide information and insights on various topics, including ZenML. How can I assist you today?'

In [19]:
agent.run({"input": "Where can I run my ZenML pipeline?","chat_history": []})



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m{
  "action": "zenml",
  "action_input": "Where can I run my ZenML pipeline?"
}[0m
Observation: [36;1m[1;3mZenML pipelines can be run on any infrastructure. You can develop your code and run your pipelines on your local machine, on a server, or in the cloud. ZenML does not require any special infrastructure or software to run, so there is no need to wait for environments to spin up.[0m
Thought:[32;1m[1;3m{
  "action": "Final Answer",
  "action_input": "ZenML pipelines can be run on any infrastructure. You can develop your code and run your pipelines on your local machine, on a server, or in the cloud. ZenML does not require any special infrastructure or software to run."
}[0m

[1m> Finished chain.[0m


'ZenML pipelines can be run on any infrastructure. You can develop your code and run your pipelines on your local machine, on a server, or in the cloud. ZenML does not require any special infrastructure or software to run.'