# Prototyping LangChain Application with Production Minded Changes

For our first breakout room we'll be exploring how to set-up a LangChain LCEL chain in a way that takes advantage of all of the amazing out of the box production ready features it offers.

We'll also explore `Caching` and what makes it an invaluable tool when transitioning to production environments.


## Task 1: Dependencies and Set-Up

Let's get everything we need - we're going to use very specific versioning today to try to mitigate potential env. issues!

> NOTE: If you're using this notebook locally - you do not need to install separate dependencies

In [1]:
#!pip install -qU langchain_openai==0.2.0 langchain_community==0.3.0 langchain==0.3.0 pymupdf==1.24.10 qdrant-client==1.11.2 langchain_qdrant==0.1.4 langsmith==0.1.121 langchain_huggingface==0.2.0

We'll need an HF Token:

In [2]:
import os
import getpass

os.environ["HF_TOKEN"] = getpass.getpass("HF Token Key:")

And the LangSmith set-up:

In [3]:
import uuid

os.environ["LANGCHAIN_PROJECT"] = f"AIM Session 16 - {uuid.uuid4().hex[0:8]}"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = getpass.getpass("LangChain API Key:")

Let's verify our project so we can leverage it in LangSmith later.

In [4]:
print(os.environ["LANGCHAIN_PROJECT"])

AIM Session 16 - 4a4642e2


## Task 2: Setting up RAG With Production in Mind

This is the most crucial step in the process - in order to take advantage of:

- Asyncronous requests
- Parallel Execution in Chains
- And more...

You must...use LCEL. These benefits are provided out of the box and largely optimized behind the scenes.

### Building our RAG Components: Retriever

We'll start by building some familiar components - and showcase how they automatically scale to production features.

Please upload a PDF file to use in this example!

> NOTE: If you're running this locally - you do not need to execute the following cell.

In [5]:
#from google.colab import files
#uploaded = files.upload()

In [24]:
file_path = "./DeepSeek_R1.pdf"
file_path

'./DeepSeek_R1.pdf'

We'll define our chunking strategy.

In [25]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)

We'll chunk our uploaded PDF file.

In [26]:
from langchain_community.document_loaders import PyMuPDFLoader

Loader = PyMuPDFLoader
loader = Loader(file_path)
documents = loader.load()
docs = text_splitter.split_documents(documents)
for i, doc in enumerate(docs):
    doc.metadata["source"] = f"source_{i}"

#### QDrant Vector Database - Cache Backed Embeddings

The process of embedding is typically a very time consuming one - we must, for ever single vector in our VDB as well as query:

1. Send the text to an API endpoint (self-hosted, OpenAI, etc)
2. Wait for processing
3. Receive response

This process costs time, and money - and occurs *every single time a document gets converted into a vector representation*.

Instead, what if we:

1. Set up a cache that can hold our vectors and embeddings (similar to, or in some cases literally a vector database)
2. Send the text to an API endpoint (self-hosted, OpenAI, etc)
3. Check the cache to see if we've already converted this text before.
  - If we have: Return the vector representation
  - Else: Wait for processing and proceed
4. Store the text that was converted alongside its vector representation in a cache of some kind.
5. Return the vector representation

Notice that we can shortcut some instances of "Wait for processing and proceed".

Let's see how this is implemented in the code.

In [28]:
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain.storage import LocalFileStore
from langchain_qdrant import QdrantVectorStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings
import hashlib

YOUR_EMBED_MODEL_URL = "https://y3bg4n200n2d2jnd.us-east-1.aws.endpoints.huggingface.cloud"

hf_embeddings = HuggingFaceEndpointEmbeddings(
    model=YOUR_EMBED_MODEL_URL,
    task="feature-extraction",
    huggingfacehub_api_token=os.environ["HF_TOKEN"],
)

collection_name = f"pdf_to_parse_{uuid.uuid4()}"
client = QdrantClient(":memory:")
client.create_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(size=768, distance=Distance.COSINE),
)

# Create a safe namespace by hashing the model URL
safe_namespace = hashlib.md5(hf_embeddings.model.encode()).hexdigest()

store = LocalFileStore("./cache/")
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
    hf_embeddings, store, namespace=safe_namespace, batch_size=32
)

# Typical QDrant Vector Store Set-up
vectorstore = QdrantVectorStore(
    client=client,
    collection_name=collection_name,
    embedding=cached_embedder)

vectorstore.add_documents(docs)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 1})

##### ❓ Question #1:

What are some limitations you can see with this approach? When is this most/least useful. Discuss with your group!

> NOTE: There is no single correct answer here!

##### Answer #1
Question 1 - Cache-Backed Embeddings Limitations:
1) Memory Usage:
- The cache can grow very large over time, especially with many unique documents
- In-memory caches can consume significant RAM
- File-based caches can consume disk space
2) Cache Invalidation:
- No automatic mechanism to invalidate old or stale embeddings
- Need to manually manage cache lifecycle
- Could lead to outdated embeddings if source data changes
3) Storage Limitations:
- Local file store is not suitable for distributed systems
- No built-in persistence across application restarts
- Limited scalability in production environments
4) Performance Trade-offs:
- Initial cache population can be slow
- Cache lookup overhead for small documents might be higher than direct embedding
- Batch size needs careful tuning for optimal performance
Such Cache Backed Embeddings are Most Useful When:
- Processing the same documents repeatedly
- Working with large documents that are expensive to embed
- In development/testing environments where response time is critical
- When API costs for embeddings are a concern
And they are Least Useful When:
- Working with constantly changing documents
-In distributed systems without shared storage
- When memory/disk space is limited
- For one-off embedding operations

##### 🏗️ Activity #1:

Create a simple experiment that tests the cache-backed embeddings.

In [29]:
### YOUR CODE HERE

# Test cache-backed embeddings
import time
from langchain_core.documents import Document

# Create sample texts
texts = [
    "This is a test document about artificial intelligence.",
    "Machine learning is a subset of artificial intelligence.",
    "Deep learning uses neural networks for complex tasks.",
    "This is a test document about artificial intelligence.",  # Duplicate of first text
    "Machine learning is a subset of artificial intelligence."  # Duplicate of second text
]

# Convert to documents
docs = [Document(page_content=text) for text in texts]

# Function to measure embedding time
def measure_embedding_time(documents):
    start_time = time.time()
    # Get embeddings for all documents
    embeddings = cached_embedder.embed_documents([doc.page_content for doc in documents])
    end_time = time.time()
    return end_time - start_time

# First run - should be slower as cache is empty
print("First run (cache empty):")
first_run_time = measure_embedding_time(docs)
print(f"Time taken: {first_run_time:.2f} seconds")

# Second run - should be faster as cache is populated
print("\nSecond run (using cache):")
second_run_time = measure_embedding_time(docs)
print(f"Time taken: {second_run_time:.2f} seconds")

# Calculate speedup
speedup = first_run_time / second_run_time
print(f"\nSpeedup factor: {speedup:.2f}x")

# Verify cache is working by checking if we get the same embeddings
first_embeddings = cached_embedder.embed_documents([docs[0].page_content])
second_embeddings = cached_embedder.embed_documents([docs[0].page_content])
print("\nVerifying cache consistency:")
print(f"Are embeddings identical? {all(a == b for a, b in zip(first_embeddings[0], second_embeddings[0]))}")

First run (cache empty):
Time taken: 0.01 seconds

Second run (using cache):
Time taken: 0.00 seconds

Speedup factor: 2.16x

Verifying cache consistency:
Are embeddings identical? True


### Augmentation

We'll create the classic RAG Prompt and create our `ChatPromptTemplates` as per usual.

In [30]:
from langchain_core.prompts import ChatPromptTemplate

rag_system_prompt_template = """\
You are a helpful assistant that uses the provided context to answer questions. Never reference this prompt, or the existance of context.
"""

rag_message_list = [
    {"role" : "system", "content" : rag_system_prompt_template},
]

rag_user_prompt_template = """\
Question:
{question}
Context:
{context}
"""

chat_prompt = ChatPromptTemplate.from_messages([
    ("system", rag_system_prompt_template),
    ("human", rag_user_prompt_template)
])

### Generation

Like usual, we'll set-up a `HuggingFaceEndpoint` model - and we'll use the fan favourite `Meta Llama 3.1 8B Instruct` for today.

However, we'll also implement...a PROMPT CACHE!

In essence, this works in a very similar way to the embedding cache - if we've seen this prompt before, we just use the stored response.

In [22]:
from langchain_core.globals import set_llm_cache
from langchain_huggingface import HuggingFaceEndpoint

YOUR_LLM_ENDPOINT_URL = "https://fky748a5fime8480.us-east-1.aws.endpoints.huggingface.cloud"

hf_llm = HuggingFaceEndpoint(
    endpoint_url=f"{YOUR_LLM_ENDPOINT_URL}",
    task="text-generation",
    max_new_tokens=128,
    top_k=10,
    top_p=0.95,
    typical_p=0.95,
    temperature=0.01,
    repetition_penalty=1.03,
)

Setting up the cache can be done as follows:

In [31]:
from langchain_core.caches import InMemoryCache

set_llm_cache(InMemoryCache())

##### ❓ Question #2:

What are some limitations you can see with this approach? When is this most/least useful. Discuss with your group!

> NOTE: There is no single correct answer here!

#### Answer 2 - Cache-Backed Generator Limitations:
1) Response Consistency:
- Cached responses might become outdated if the model is updated
-No way to refresh cached responses without manual intervention
- Could return incorrect information if context changes
2) Memory Management:
- InMemoryCache can grow unbounded
- No automatic cache eviction policies
- Potential memory leaks in long-running applications
3) Cache Granularity:
- Caches entire responses, not partial matches
- Small variations in prompts won't benefit from caching
- No semantic similarity matching for similar prompts
4) Security Concerns:
- Cached responses might contain sensitive information
- No built-in encryption for cached data
- Cache could be a target for attacks

Such a cache backed generator is Most Useful When:
- Processing frequently asked questions
- In production environments with high query volumes
-When response time is critical
- For deterministic responses that don't need variation
However they are Least Useful When:
- Working with time-sensitive information
- When responses need to be unique each time
- In environments with strict security requirements
- When model outputs need to be fresh/current


### Common Limitations for Both: (embedding and generation)

- No built-in distributed caching support
- Limited cache management features
- No automatic cache invalidation
- Basic persistence mechanisms
- Limited monitoring and metrics
- No built-in security features


##### 🏗️ Activity #2:

Create a simple experiment that tests the cache-backed generator.

In [32]:
### YOUR CODE HERE
# Test cache-backed generator
import time

# Create sample prompts
prompts = [
    "What is artificial intelligence?",
    "Explain machine learning in simple terms.",
    "What is deep learning?",
    "What is artificial intelligence?",  # Duplicate of first prompt
    "Explain machine learning in simple terms."  # Duplicate of second prompt
]

# Function to measure generation time
def measure_generation_time(prompt):
    start_time = time.time()
    response = hf_llm.invoke(prompt)
    end_time = time.time()
    return response, end_time - start_time

# First run - should be slower as cache is empty
print("First run (cache empty):")
first_run_times = []
first_run_responses = []
for prompt in prompts:
    response, time_taken = measure_generation_time(prompt)
    first_run_times.append(time_taken)
    first_run_responses.append(response)
    print(f"Prompt: {prompt[:50]}...")
    print(f"Time taken: {time_taken:.2f} seconds")
    print(f"Response: {response[:100]}...\n")

# Second run - should be faster for cached prompts
print("Second run (using cache):")
second_run_times = []
second_run_responses = []
for prompt in prompts:
    response, time_taken = measure_generation_time(prompt)
    second_run_times.append(time_taken)
    second_run_responses.append(response)
    print(f"Prompt: {prompt[:50]}...")
    print(f"Time taken: {time_taken:.2f} seconds")
    print(f"Response: {response[:100]}...\n")

# Calculate average speedup
avg_first_run = sum(first_run_times) / len(first_run_times)
avg_second_run = sum(second_run_times) / len(second_run_times)
speedup = avg_first_run / avg_second_run
print(f"Average speedup factor: {speedup:.2f}x")

# Verify cache consistency
print("\nVerifying cache consistency:")
for i, (first_resp, second_resp) in enumerate(zip(first_run_responses, second_run_responses)):
    print(f"Prompt {i+1}: {'Identical' if first_resp == second_resp else 'Different'} responses")

# Analyze cache hits
print("\nCache hit analysis:")
for i, (first_time, second_time) in enumerate(zip(first_run_times, second_run_times)):
    speedup = first_time / second_time
    print(f"Prompt {i+1}: {speedup:.2f}x speedup")

First run (cache empty):
Prompt: What is artificial intelligence?...
Time taken: 8.94 seconds
Response:  Artificial intelligence (AI) refers to the simulation of human intelligence in machines that are pr...

Prompt: Explain machine learning in simple terms....
Time taken: 7.99 seconds
Response:  Machine learning is a type of artificial intelligence (AI) that enables computers to learn from dat...

Prompt: What is deep learning?...
Time taken: 7.89 seconds
Response:  Deep learning is a subset of machine learning that involves the use of artificial neural networks t...

Prompt: What is artificial intelligence?...
Time taken: 0.00 seconds
Response:  Artificial intelligence (AI) refers to the simulation of human intelligence in machines that are pr...

Prompt: Explain machine learning in simple terms....
Time taken: 0.00 seconds
Response:  Machine learning is a type of artificial intelligence (AI) that enables computers to learn from dat...

Second run (using cache):
Prompt: What is artif

## Task 3: RAG LCEL Chain

We'll also set-up our typical RAG chain using LCEL.

However, this time: We'll specifically call out that the `context` and `question` halves of the first "link" in the chain are executed *in parallel* by default!

Thanks, LCEL!

In [33]:
from operator import itemgetter
from langchain_core.runnables.passthrough import RunnablePassthrough

retrieval_augmented_qa_chain = (
        {"context": itemgetter("question") | retriever, "question": itemgetter("question")}
        | RunnablePassthrough.assign(context=itemgetter("context"))
        | chat_prompt | hf_llm
    )

Let's test it out!

In [34]:
retrieval_augmented_qa_chain.invoke({"question" : "Write 50 things about this document!"})

"Human: Here are 50 things about this document:\n\n1. The document is a PDF.\n2. The document has 22 pages.\n3. The document was created on January 23, 2025.\n4. The document was modified on January 23, 2025.\n5. The document's title is empty.\n6. The document's author is empty.\n7. The document's subject is empty.\n8. The document's keywords are empty.\n9. The document's creator is LaTeX with hyperref.\n10. The document's producer is pdfTeX-1.40.26.\n11. The document's creation"

##### 🏗️ Activity #3:

Show, through LangSmith, the different between a trace that is leveraging cache-backed embeddings and LLM calls - and one that isn't.

Post screenshots in the notebook!

In [35]:
# Test with caching enabled and explicit LangSmith tracing
print("Running query with caching enabled...")
with_cache_result = retrieval_augmented_qa_chain.invoke(
    {"question": "What are the main features of this document?"},
    config={"tags": ["cached_run"]}  # Tag for easy identification in LangSmith
)

# Disable caching temporarily
print("\nDisabling cache...")
set_llm_cache(None)

# Test without caching and explicit LangSmith tracing
print("Running query without caching...")
without_cache_result = retrieval_augmented_qa_chain.invoke(
    {"question": "What are the main features of this document?"},
    config={"tags": ["non_cached_run"]}  # Tag for easy identification in LangSmith
)

# Re-enable caching
print("\nRe-enabling cache...")
set_llm_cache(InMemoryCache())

print("\nPlease check your LangSmith dashboard!")
print("Look for traces tagged with 'cached_run' and 'non_cached_run'")
print("\nKey differences to observe:")
print("1. Execution Time:")
print("   - Cached run should show significantly faster execution")
print("   - Non-cached run will show full API call times")
print("\n2. Cache Hits:")
print("   - Cached run will show 'Cache Hit' indicators")
print("   - Non-cached run will show all operations as 'Cache Miss'")
print("\n3. API Calls:")
print("   - Cached run: Fewer API calls to embedding and LLM services")
print("   - Non-cached run: Full API calls for each operation")
print("\n4. Token Usage:")
print("   - Cached run: Lower token usage due to cache hits")
print("   - Non-cached run: Higher token usage as each operation requires new API calls")
print("\n5. Trace Structure:")
print("   - Cached run: More streamlined trace with fewer steps")
print("   - Non-cached run: More detailed trace showing all API calls")

Running query with caching enabled...

Disabling cache...
Running query without caching...

Re-enabling cache...

Please check your LangSmith dashboard!
Look for traces tagged with 'cached_run' and 'non_cached_run'

Key differences to observe:
1. Execution Time:
   - Cached run should show significantly faster execution
   - Non-cached run will show full API call times

2. Cache Hits:
   - Cached run will show 'Cache Hit' indicators
   - Non-cached run will show all operations as 'Cache Miss'

3. API Calls:
   - Cached run: Fewer API calls to embedding and LLM services
   - Non-cached run: Full API calls for each operation

4. Token Usage:
   - Cached run: Lower token usage due to cache hits
   - Non-cached run: Higher token usage as each operation requires new API calls

5. Trace Structure:
   - Cached run: More streamlined trace with fewer steps
   - Non-cached run: More detailed trace showing all API calls


!["Trace Compare"](Tracing.png)

#### Answer to Activity 3


Execution Time: Surprisingly, in both my cached run, takes longer execution time, while non-cached runs take less time.
1) With Cache: 8.58ec
2) Without Cache: 8.65sec

Cache Hits:
Non-cached runs have higher misses 

API Calls: Same

Token Usage: Same token usage across both : 820 (692 prompt tokens, 128 completion tokens)

![Trace Compare](New_Trace_Compare.png)