# Langchain on Vertex AI - Mamang Chatbot Use Case

This jupyter notebook will be used to deploy a chatbot built using the Langchain model on Vertex AI. 

In [2]:
# installing dependencies

!pip install --upgrade --quiet \
    google-cloud-aiplatform \
    langchain \
    langchain-google-vertexai \
    cloudpickle \
    pydantic \
    langchain_google_community \
    google-cloud-discoveryengine \
    google-api-python-client \
    google-auth

In [59]:
# setting up few variables

PROJECT_ID = "imrenagi-gemini-experiment"
LOCATION = "us-central1"  
STAGING_BUCKET = "gs://courses-imrenagicom-agent"  
DATA_STORE_ID = "course-software-instrumentation_1719363827721"
LOCATION_ID = "global"
LLM_MODEL = "gemini-1.5-pro-001"

In [60]:
# importing necessary libraries since I'm just an importir

from IPython.display import display, Markdown

from langchain.agents.format_scratchpad import format_to_openai_function_messages
from langchain.agents import tool

from langchain.memory import ChatMessageHistory
from langchain_google_community import VertexAISearchRetriever
from langchain_google_vertexai import HarmBlockThreshold, HarmCategory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory

from langchain.prompts import (
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
    MessagesPlaceholder,
    SystemMessagePromptTemplate,
)

import vertexai
from vertexai.preview import reasoning_engines

from typing import List

In [None]:
# Initializing vertex AI 

vertexai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET)

In [62]:
## Model safety settings
safety_settings = {
    HarmCategory.HARM_CATEGORY_UNSPECIFIED: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
}

## Model parameters
model_kwargs = {
    "temperature": 0.5,
    "safety_settings": safety_settings,
}

## Search Course Content Tool

This tool will be used to search for course content based on the user's query and execute query to Vertex AI Search datastore. 

In [63]:
@tool
def search_course_content(query: str) -> str:
    """Explain about software instrumentation course materials."""
    from langchain_google_community import VertexAISearchRetriever

    retriever = VertexAISearchRetriever(
        project_id=PROJECT_ID,
        data_store_id=DATA_STORE_ID,
        location_id=LOCATION_ID,
        engine_data_type=0,
        max_documents=10,
    )

    result = str(retriever.invoke(query))
    return result

## Imrenagi.com API Client

This is the implementation of a simple HTTP client which interacts to my API server. Since the API is protected with Id Token, caller must provide the JWT bearer token on each request. To simplify this, we are using the `google-auth` library to generate the token and to manage token's lifecycle.

In [64]:
from google.oauth2 import service_account
from google.auth import compute_engine
from google.auth.transport.requests import AuthorizedSession, Request
import os

sa_path = '/home/imrenagi/projects/app/secrets/course-imrenagi.json'

class ImreNagiComAPIClient:
  def __init__(self, url="https://api-dev.imrenagi.com", service_account_key=None, aud="api"):
    self.url = url
    credentials = None   
    
    if service_account_key and os.path.exists(service_account_key):
      # Use service account credentials
      credentials = service_account.IDTokenCredentials.from_service_account_file(service_account_key, target_audience=aud)
    else:
      # Use compute engine credentials. This is used when agent is running in reasoning engine.
      request = Request()
      credentials = compute_engine.IDTokenCredentials(
          request=request, target_audience=aud, use_metadata_identity_endpoint=True
      )
      credentials.refresh(request)
    self.authed_session = AuthorizedSession(credentials)
    
  def list_courses(self):
    response = self.authed_session.get(f"{self.url}/api/v1/courses")
    return response.json()
  
  def get_course(self, course):
    response = self.authed_session.get(f"{self.url}/api/v1/courses/{course}")
    return response.json()
  
  def create_order(self, course, package, user_name, user_email):
    response = self.authed_session.post(f"{self.url}/api/v1/courses/orders", json={
      "course": course, 
      "package": package, 
      "customer": {
        "name": user_name,
        "email": user_email
      },
      "payment": {
        "method": "gopay"
      }})
    return response.json()
  
  def get_order(self, order_id):
    response = self.authed_session.get(f"{self.url}/api/v1/courses/orders/{order_id}")
    return response.json()
  
  def get_invoice(self, invoice_number):
    response = self.authed_session.get(f"{self.url}/api/v1/invoices/{invoice_number}")
    return response.json()


## Imrenagi.com API Tools

These tools below are just a langchain tools wrapper that interact with the API server.

In [65]:
import time

@tool
def list_courses() -> List[str]:
  """List all available courses sold on the platform."""
  client = ImreNagiComAPIClient(
    service_account_key=sa_path
  )
  return client.list_courses()

@tool
def get_course(course: str) -> str:
  """Get course details by course name. course name is the unique identifier of the course. it typically contains the course title with dashes.
  This function can be used to get course details such as course packages price, etc."""
  client = ImreNagiComAPIClient(
    service_account_key=sa_path
  )
  return client.get_course(course)

@tool
def create_order(course: str, package: str, user_name: str, user_email: str) -> str:
  """Create order for a course package. This function can be used to create an order for a course package. When this function returns successfully, it will return payment url to user to make payment. """
  client = ImreNagiComAPIClient(
    service_account_key=sa_path
  )
  
  order = client.create_order(course, package, user_name, user_email)
  print(order)
  
  time.sleep(5)  
  
  order = client.get_order(order['number'])
  invoice_number = order['payment']['invoiceNumber']
  
  invoice = client.get_invoice(invoice_number)
  payment_url = invoice['payment']['redirectUrl']
  
  return f"Order created successfully. Payment URL: {payment_url}"

In [66]:
# list_courses.invoke({"input":""})
# get_course.invoke({"course":"software-instrumentation"})
# create_order.invoke({"course":"software-instrumentation", "package":"basic", "user_name":"Imre Nagi", "user_email":"imre.nagi2812@gmail.com"})


In [None]:
# combining all tools

tools = [search_course_content, list_courses, get_course, create_order]

## Prompt

This is the prompt that will be used to interact with model. I enabled chat history and agent scratchpad so that we can pass the message history and the result from Gemini function call to the model.

In [68]:
prompt = {
    "chat_history": lambda x: x["history"],
    "input": lambda x: x["input"],
    "agent_scratchpad": (
        lambda x: format_to_openai_function_messages(x["intermediate_steps"])
    ),
} | ChatPromptTemplate(
  messages = [
    SystemMessagePromptTemplate.from_template("""
      You are a bot assistant that sells online course about software instrumentation. You only use information provided from datastore or tools. You can provide the information that is relevant to the user's question or the summary of the content. If the user asks for more information, you may suggest the user to enroll in the course. 
      """),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    HumanMessagePromptTemplate.from_template("Use tools to answer this questions: {input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
  ]
)

## Chat Message History

This will be used to store the chat history for each user sessions

In [None]:
store = {}

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

## Setting up the Agent

While langchain is actually has API to create an agent, in this case we are using `reasoning_engines.LangchainAgent()` to create the agent. Once it is created, you can still use the exact same API to invoke the model as you would when you are using standard langchain agent.

In [None]:
model = LLM_MODEL

agent = reasoning_engines.LangchainAgent(
    model=model,
    tools=tools,
    prompt=prompt,    
    chat_history=get_session_history,
    agent_executor_kwargs={
      "return_intermediate_steps": True,
    },
    model_kwargs=model_kwargs,
    enable_tracing=True,
)

In [70]:
response = agent.query(
  input="Can you share what the software instrumentation course content offers?",
  config={"configurable": {"session_id": "demo1234"}},
  )

display(Markdown(response["output"]))

opentelemetry-exporter-gcp-trace is not installed. Please call 'pip install google-cloud-aiplatform[langchain]'.
enable_tracing=True but proceeding with tracing disabled because not all packages for tracing have been installed


Parent run fa6a6fd6-19d5-4d55-8a7d-97bda57530d8 not found for run 86fbc829-6ea8-4c50-8185-0c3d5f3af0d1. Treating as a root run.


This course is designed for beginner and intermediate software engineers who are interested in learning about software instrumentation and monitoring. 

Here are a few things you will learn:
- Fundamental ideas behind software instrumentation and monitoring
- Setting up basic tech stacks for tracing, logging, and monitoring in a cloud native environment
- Different instrumentation techniques to understand the performance and how your application behaves
- Performing instrumentation for applications consisting of multiple dependencies 
- Using instrumentation to diagnose performance issues, reliability issues, and business-relevant metrics.

Would you like to learn more about the course?


# Deploying the Agent on Vertex AI

Deploying is as simple as calling `create()` method. We will provide the agent here and some dependencies required to run the agent.

In [71]:
remote_agent = reasoning_engines.ReasoningEngine.create(
    agent,
    requirements=[
        "google-cloud-aiplatform==1.51.0",
        "langchain==0.1.20",
        "langchain-google-vertexai==1.0.3",
        "cloudpickle==3.0.0",
        "pydantic==2.7.1",        
        "requests==2.32.3",
        "langchain_google_community",
        "google-cloud-discoveryengine",
        "google-auth",
    ],
)

Using bucket courses-imrenagicom-agent
Writing to gs://courses-imrenagicom-agent/reasoning_engine/reasoning_engine.pkl
Writing to gs://courses-imrenagicom-agent/reasoning_engine/requirements.txt
Creating in-memory tarfile of extra_packages
Writing to gs://courses-imrenagicom-agent/reasoning_engine/dependencies.tar.gz
Creating ReasoningEngine
Create ReasoningEngine backing LRO: projects/896489987664/locations/us-central1/reasoningEngines/3647247195100413952/operations/8301296056516214784
ReasoningEngine created. Resource name: projects/896489987664/locations/us-central1/reasoningEngines/3647247195100413952
To use this ReasoningEngine in another session:
reasoning_engine = vertexai.preview.reasoning_engines.ReasoningEngine('projects/896489987664/locations/us-central1/reasoningEngines/3647247195100413952')


In [None]:
# Testing the remote agent
response = remote_agent.query(
  input="Can you please share what being taught on software instrumentation course?",
  config={"configurable": {"session_id": "test1235"}},
)
display(Markdown(response["output"]))

# Clean Up

Don't forget to clean up the resources after you are done with the agent.

In [73]:
# remote_agent = reasoning_engines.ReasoningEngine('projects/896489987664/locations/us-central1/reasoningEngines/3647247195100413952')

# remote_agent.delete()