<a href="https://colab.research.google.com/github/hanhanwu/Hanhan_LangGraph_Exercise/blob/main/scaling/ray_serve_for_langgraph.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture --no-stderr
%pip install -U --quiet langchain-community tiktoken langchain-openai langchainhub chromadb langchain langgraph langchain-text-splitters playwright unstructured
!playwright install
%pip install -U --quiet rank_bm25 faiss-cpu
%pip install "ray[serve]"

In [None]:
from google.colab import userdata

# load the environment variables set in colab
OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
LANGSMITH_API_KEY = userdata.get('LANGSMITH_API_KEY')  # used to pull rlm/rag-prompt

In [None]:
import time
import requests
from starlette.requests import Request
from typing import Dict

from langchain_community.document_loaders import PlaywrightURLLoader
from langchain_community.vectorstores import Chroma
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.tools.retriever import create_retriever_tool

from ray import serve

import nltk
nltk_resources = [
    'averaged_perceptron_tagger_eng',
    'wordnet',
    'stopwords',
    'punkt_tab'
]
for resource in nltk_resources:
    try:
        nltk.download(resource)
    except Exception as e:
        print(f"Error downloading {resource}: {e}")

[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [None]:
start_time = time.time()

urls = [
   "https://langchain-ai.github.io/langgraph/tutorials/introduction/",
]

loader = PlaywrightURLLoader(urls=urls, remove_selectors=["header", "footer"])
docs = await loader.aload()  # returns "Document" type

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500, chunk_overlap=200
)
doc_splits = text_splitter.split_documents(docs)  # split into chunks with overlap

# choose retriever type based on the number of chunks
chunks_ct = len(doc_splits)
if chunks_ct < 30:
  print(chunks_ct, 'choose vectorstore based retriever')
  # use Vectorstore-backed retriever (the simplest retriever in LangChain)
  vectorstore = Chroma.from_documents(
      documents=doc_splits,
      collection_name="rag-chroma",
      embedding=OpenAIEmbeddings(api_key=OPENAI_API_KEY),
  )
  retriever = vectorstore.as_retriever()
else:
  print(chunks_ct, 'choose ensemble retriever')
  # use emsemble retriever
  # initialize the bm25 retriever and faiss retriever
  bm25_retriever = BM25Retriever.from_texts(
      [doc.page_content for doc in doc_splits], metadatas=[{"source": 1}] * len(doc_splits)
  )
  bm25_retriever.k = 2
  embedding = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
  faiss_vectorstore = FAISS.from_texts(
      [doc.page_content for doc in doc_splits], embedding, metadatas=[{"source": 2}] * len(doc_splits)
  )
  faiss_retriever = faiss_vectorstore.as_retriever(search_kwargs={"k": 2})
  # initialize the ensemble retriever
  retriever = EnsembleRetriever(
      retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5]
  )

end_time = time.time()
running_time = round(end_time - start_time, 4)
print(f"Running time: {running_time} seconds")

60 choose ensemble retriever
Running time: 17.3064 seconds


In [None]:
import asyncio
import json

start_time = time.time()

@serve.deployment
class MyAppDeployment:
    def __init__(self, urls: list[str],
                       openai_api_key: str):
        self.urls = urls
        self.openai_api_key = openai_api_key

    def load_and_split_docs(self):
        loader = PlaywrightURLLoader(urls=self.urls, remove_selectors=["header", "footer"])
        docs = loader.load() # Load synchronously

        text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
            chunk_size=500, chunk_overlap=200
        )
        return text_splitter.split_documents(docs)

    async def __retrieve__(self, request: Request):
        loop = asyncio.get_event_loop()
        docs = await loop.run_in_executor(None, self.load_and_split_docs)

        # choose retriever type based on the number of chunks
        chunks_ct = len(doc_splits)
        if chunks_ct < 30:
          print(chunks_ct, 'choose vectorstore based retriever')
          # use Vectorstore-backed retriever (the simplest retriever in LangChain)
          vectorstore = Chroma.from_documents(
              documents=doc_splits,
              collection_name="rag-chroma",
              embedding=OpenAIEmbeddings(api_key=self.openai_api_key),
          )
          retriever = vectorstore.as_retriever()
        else:
          print(chunks_ct, 'choose ensemble retriever')
          # use emsemble retriever
          # initialize the bm25 retriever and faiss retriever
          bm25_retriever = BM25Retriever.from_texts(
              [doc.page_content for doc in doc_splits], metadatas=[{"source": 1}] * len(doc_splits)
          )
          bm25_retriever.k = 2
          embedding = OpenAIEmbeddings(api_key=self.openai_api_key)
          faiss_vectorstore = FAISS.from_texts(
              [doc.page_content for doc in doc_splits], embedding, metadatas=[{"source": 2}] * len(doc_splits)
          )
          faiss_retriever = faiss_vectorstore.as_retriever(search_kwargs={"k": 2})
          # initialize the ensemble retriever
          retriever = EnsembleRetriever(
              retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5]
          )

        return retriever.weights

urls = [
   "https://langchain-ai.github.io/langgraph/tutorials/introduction/",
]
app = MyAppDeployment.bind(urls,
                           openai_api_key=OPENAI_API_KEY)
serve.run(app, route_prefix="/")
try:
    response = requests.get("http://localhost:8000/")
    # Check if the request was successful
    response.raise_for_status()
    print(response.json())
except requests.exceptions.RequestException as e:
    print(f"Error fetching data: {e}")



end_time = time.time()
running_time = round(end_time - start_time, 4)
print(f"Running time: {running_time} seconds")

INFO 2025-02-14 03:36:27,536 serve 181 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:36:27,614 controller 8657 -- Deploying new version of Deployment(name='MyAppDeployment', app='default') (initial target replicas: 1).
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:36:27,720 controller 8657 -- Stopping 1 replicas of Deployment(name='MyAppDeployment', app='default') with outdated versions.
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:36:27,721 controller 8657 -- Adding 1 replica to Deployment(name='MyAppDeployment', app='default').
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:36:29,770 controller 8657 -- Replica(id='f1axs419', deployment='MyAppDeployment', app='default') is stopped.
INFO 2025-02-14 03:36:31,570 serve 181 -- Application 'default' is ready at http://127.0.0.1:8000/.
INFO 2025-02-14 03:36:31,571 serve 181 -- Deployed app 'default' successfull

Error fetching data: 500 Server Error: Internal Server Error for url: http://localhost:8000/
Running time: 4.067 seconds


In [None]:
# check examples here: https://www.ray.io/

In [None]:
import requests
from starlette.requests import Request
from typing import Dict

from ray import serve


# 1: Define a Ray Serve application.
@serve.deployment
class MyModelDeployment:
    def __init__(self, msg: str):
        # Initialize model state: could be very large neural net weights.
        self._msg = msg

    def __call__(self, request: Request):
        return [self._msg]


app = MyModelDeployment.bind(msg="Hello world!")

# 2: Deploy the application locally.
serve.run(app, route_prefix="/")

# 3: Query the application and print the result.
print(requests.get("http://localhost:8000/").json())

INFO 2025-02-14 03:37:44,423 serve 181 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:37:44,615 controller 8657 -- Deploying new version of Deployment(name='MyModelDeployment', app='default') (initial target replicas: 1).
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:37:44,743 controller 8657 -- Stopping 1 replicas of Deployment(name='MyModelDeployment', app='default') with outdated versions.
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:37:44,745 controller 8657 -- Adding 1 replica to Deployment(name='MyModelDeployment', app='default').
[36m(ServeController pid=8657)[0m INFO 2025-02-14 03:37:46,798 controller 8657 -- Replica(id='j5imtln0', deployment='MyModelDeployment', app='default') is stopped.
INFO 2025-02-14 03:37:47,529 serve 181 -- Application 'default' is ready at http://127.0.0.1:8000/.
INFO 2025-02-14 03:37:47,531 serve 181 -- Deployed app 'default' suc

['Hello world!']


[36m(ServeReplica:default:MyModelDeployment pid=16650)[0m INFO 2025-02-14 03:37:47,543 default_MyModelDeployment kiu61vdp 183eeea7-45ea-4f65-b251-ad70c11b2c07 -- GET / 200 3.2ms
