<a href="https://colab.research.google.com/github/GiX007/agent-labs/blob/main/03_langchain/07_expression_language.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LangChain Expression Language (LCEL)

## Setup

In [None]:
import os
import openai

from dotenv import load_dotenv, find_dotenv
dotenv_path = find_dotenv() or '/content/OPENAI_API_KEY.env' # read local .env file
load_dotenv(dotenv_path)

openai_api_key = os.getenv('OPENAI_API_KEY')
client = openai.OpenAI(api_key=openai_api_key)

import warnings
warnings.filterwarnings("ignore")

In [None]:
#!pip install pydantic==1.10.8

In [None]:
!pip install langchain langchain-openai langchain-community docarray



In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser

## Simple Chain

In [None]:
prompt = ChatPromptTemplate.from_template(
    "tell me a short joke about {topic}"
)
model = ChatOpenAI()
output_parser = StrOutputParser()

In [None]:
chain = prompt | model | output_parser

In [None]:
chain.invoke({"topic": "bears"})

'Why did the bear bring a flashlight to the party? \n\nBecause he wanted to be the "bright"est one there!'

## More complex chain

We will also use ```Runnable Map``` to supply user-provided inputs to the prompt.

In [None]:
from langchain_openai import OpenAIEmbeddings
from langchain.vectorstores import DocArrayInMemorySearch

In [None]:
# Create an in-memory vector store from text documents using OpenAI embeddings, then get a retriever to perform similarity search
vectorstore = DocArrayInMemorySearch.from_texts(
    ["harrison worked at kensho", "bears like to eat honey"],
    embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

In [None]:
# List all methods and attributes
print(dir(retriever))

['InputType', 'OutputType', '__abstractmethods__', '__annotations__', '__class__', '__class_getitem__', '__class_vars__', '__copy__', '__deepcopy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__fields__', '__fields_set__', '__format__', '__ge__', '__get_pydantic_core_schema__', '__get_pydantic_json_schema__', '__getattr__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__or__', '__orig_bases__', '__parameters__', '__pretty__', '__private_attributes__', '__pydantic_complete__', '__pydantic_computed_fields__', '__pydantic_core_schema__', '__pydantic_custom_init__', '__pydantic_decorators__', '__pydantic_extra__', '__pydantic_fields__', '__pydantic_fields_set__', '__pydantic_generic_metadata__', '__pydantic_init_subclass__', '__pydantic_parent_namespace__', '__pydantic_post_init__', '__pydantic_private__', '__pydantic_root_model__', '__pydantic_serializer__', 

In [None]:
# Print only “public” retriever methods
methods = [
    m for m in dir(retriever)
    if callable(getattr(retriever, m, None)) # callable
    and not m.startswith("_") # ignore private/internal
]
print(methods)

['InputType', 'OutputType', 'aadd_documents', 'abatch', 'abatch_as_completed', 'add_documents', 'aget_relevant_documents', 'ainvoke', 'as_tool', 'assign', 'astream', 'astream_events', 'astream_log', 'atransform', 'batch', 'batch_as_completed', 'bind', 'config_schema', 'configurable_alternatives', 'configurable_fields', 'construct', 'copy', 'dict', 'from_orm', 'get_config_jsonschema', 'get_graph', 'get_input_jsonschema', 'get_input_schema', 'get_lc_namespace', 'get_name', 'get_output_jsonschema', 'get_output_schema', 'get_prompts', 'get_relevant_documents', 'input_schema', 'invoke', 'is_lc_serializable', 'json', 'lc_id', 'map', 'model_construct', 'model_copy', 'model_dump', 'model_dump_json', 'model_json_schema', 'model_parametrized_name', 'model_post_init', 'model_rebuild', 'model_validate', 'model_validate_json', 'model_validate_strings', 'output_schema', 'parse_file', 'parse_obj', 'parse_raw', 'pick', 'pipe', 'schema', 'schema_json', 'stream', 'to_json', 'to_json_not_implemented', 't

**Common Retriever Methods**

- `invoke`(~ `get_relevant_documents(query)`) → Retrieves documents from the retriever that are most relevant to the given query.    
- `add_documents(documents)` → Adds new documents to the underlying vector store for future retrieval.  
- `as_tool()` → Wraps the retriever as a LangChain tool so it can be used by agents.

In [None]:
retriever.invoke("where did harrison work?")

[Document(metadata={}, page_content='harrison worked at kensho'),
 Document(metadata={}, page_content='bears like to eat honey')]

In [None]:
retriever.invoke("what do bears like to eat")

[Document(metadata={}, page_content='bears like to eat honey'),
 Document(metadata={}, page_content='harrison worked at kensho')]

In [None]:
# create a prompt template
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

In [None]:
from langchain.schema.runnable import RunnableMap

In [None]:
chain = RunnableMap({
    "context": lambda x: retriever.invoke(x["question"]),
    "question": lambda x: x["question"]
}) | prompt | model | output_parser

This defines a pipeline (RunnableMap) that takes an input dict with a "question":
 1. "context" key: uses the retriever to get relevant documents for the question.
 2. "question" key: just passes the original question through.
 Then the outputs are fed sequentially through:
 - `prompt` to format the input for the LLM,
 - `model` to generate a response,
 - `output_parser` to parse the LLM's output.

In short: it retrieves relevant context, combines it with the question (chain), and produces (by the model) a processed answer.


In [None]:
chain.invoke({"question": "where did harrison work?"})

'Harrison worked at Kensho.'

In [None]:
# display RunnableMap's output
inputs = RunnableMap({
    "context": lambda x: retriever.invoke(x["question"]),
    "question": lambda x: x["question"]
})

In [None]:
inputs.invoke({"question": "where did harrison work?"})

{'context': [Document(metadata={}, page_content='harrison worked at kensho'),
  Document(metadata={}, page_content='bears like to eat honey')],
 'question': 'where did harrison work?'}

## Bind

In [None]:
functions = [
    {
      "name": "weather_search",
      "description": "Search for weather given an airport code",
      "parameters": {
        "type": "object",
        "properties": {
          "airport_code": {
            "type": "string",
            "description": "The airport code to get the weather for"
          },
        },
        "required": ["airport_code"]
      }
    }
  ]

In [None]:
prompt = ChatPromptTemplate.from_messages(
    [
        ("human", "{input}")
    ]
)
model = ChatOpenAI(temperature=0).bind(functions=functions)

In [None]:
runnable = prompt | model

In [None]:
runnable.invoke({"input": "what is the weather in sf"})

AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"airport_code":"SFO"}', 'name': 'weather_search'}, 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 64, 'total_tokens': 80, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'id': 'chatcmpl-CbO9gajKCzMBjE8tU4grndghaPtZq', 'service_tier': 'default', 'finish_reason': 'function_call', 'logprobs': None}, id='run--d2da2426-679d-4509-ae62-ae79ac4b9b07-0', usage_metadata={'input_tokens': 64, 'output_tokens': 16, 'total_tokens': 80, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In [None]:
functions = [
    {
      "name": "weather_search",
      "description": "Search for weather given an airport code",
      "parameters": {
        "type": "object",
        "properties": {
          "airport_code": {
            "type": "string",
            "description": "The airport code to get the weather for"
          },
        },
        "required": ["airport_code"]
      }
    },
        {
      "name": "sports_search",
      "description": "Search for news of recent sport events",
      "parameters": {
        "type": "object",
        "properties": {
          "team_name": {
            "type": "string",
            "description": "The sports team to search for"
          },
        },
        "required": ["team_name"]
      }
    }
  ]

In [None]:
model = model.bind(functions=functions)

In [None]:
runnable = prompt | model

In [None]:
runnable.invoke({"input": "how did the patriots do yesterday?"})

AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"team_name":"New England Patriots"}', 'name': 'sports_search'}, 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 18, 'prompt_tokens': 99, 'total_tokens': 117, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'id': 'chatcmpl-CbOFNr62vY6KZoxXSCMPYqgEvpAGN', 'service_tier': 'default', 'finish_reason': 'function_call', 'logprobs': None}, id='run--00855c95-d620-4d89-b97e-4fbe1687bcc8-0', usage_metadata={'input_tokens': 99, 'output_tokens': 18, 'total_tokens': 117, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In this section, we use ```bind``` to pre-configure the LLM with additional context. In our case, the list of available functions. By binding the functions to the model, we create a version of the model that automatically “knows” which functions it can call whenever it is invoked, without needing to pass the functions each time. This is different from invoke, which actually runs the model on a specific input; bind simply sets up the model with persistent configuration, making it easier to integrate into pipelines or Runnables where the same context needs to be reused across multiple calls.

## Fallbacks

In [None]:
from langchain_openai import OpenAI
import json

In [None]:
simple_model = OpenAI(
    temperature=0,
    max_tokens=1000,
    model="gpt-4o-mini"
)
simple_chain = simple_model | json.loads

In [None]:
challenge = "write three poems in a json blob, where each poem is a json blob of a title, author, and first line"

In [None]:
simple_model.invoke(challenge)

'. \n\n```json\n{\n  "poems": [\n    {\n      "title": "The Road Not Taken",\n      "author": "Robert Frost",\n      "first_line": "Two roads diverged in a yellow wood,"\n    },\n    {\n      "title": "Still I Rise",\n      "author": "Maya Angelou",\n      "first_line": "You may write me down in history"\n    },\n    {\n      "title": "The Waste Land",\n      "author": "T.S. Eliot",\n      "first_line": "April is the cruellest month,"\n    }\n  ]\n}\n```'

<p style=\"background-color:#F5C780; padding:15px\"><b>Note:</b> The next line is expected to fail.</p>

In [None]:
simple_chain.invoke(challenge)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
model = ChatOpenAI(temperature=0)
chain = model | StrOutputParser() | json.loads

In [None]:
chain.invoke(challenge)

{'poem1': {'title': 'The Rose',
  'author': 'Emily Dickinson',
  'firstLine': 'A rose by any other name would smell as sweet'},
 'poem2': {'title': 'The Road Not Taken',
  'author': 'Robert Frost',
  'firstLine': 'Two roads diverged in a yellow wood'},
 'poem3': {'title': 'Hope is the Thing with Feathers',
  'author': 'Emily Dickinson',
  'firstLine': 'Hope is the thing with feathers that perches in the soul'}}

In LangChain, modern LLM classes like ```ChatOpenAI``` fully support the **Runnable interface**, so we can chain multiple steps (e.g., model → string parser → ```json.loads```) and call ```.invoke()``` on the whole chain. Deprecated classes like the old ```OpenAI``` do not, which is why similar chains fail.

To address this, we can use **fallback chain**: if ```simple_chain``` fails or cannot handle the input, the call is automatically passed to ```chain``` as a backup. This ensures our pipeline can still produce a result even if the first chain encounters an error.

In [None]:
final_chain = simple_chain.with_fallbacks([chain])

In [None]:
final_chain.invoke(challenge)

{'poem1': {'title': 'The Rose',
  'author': 'Emily Dickinson',
  'firstLine': 'A rose by any other name would smell as sweet'},
 'poem2': {'title': 'The Road Not Taken',
  'author': 'Robert Frost',
  'firstLine': 'Two roads diverged in a yellow wood'},
 'poem3': {'title': 'Hope is the Thing with Feathers',
  'author': 'Emily Dickinson',
  'firstLine': 'Hope is the thing with feathers that perches in the soul'}}

## Interface

In [None]:
prompt = ChatPromptTemplate.from_template(
    "Tell me a short joke about {topic}"
)
model = ChatOpenAI()
output_parser = StrOutputParser()

chain = prompt | model | output_parser

In [None]:
chain.invoke({"topic": "bears"})

'Why do bears have hairy coats?\n\nFur protection!'

In [None]:
# batch lets us process multiple inputs in one call instead of invoking the chain separately for each item
chain.batch([{"topic": "bears"}, {"topic": "frogs"}])

['Why do bears have hairy coats?\n\nFur protection!',
 'Why are frogs so happy? Because they eat whatever bugs them!']

In [None]:
# chain.stream lets us receive the output incrementally as it’s generated instead of waiting for the entire chain to finish
for t in chain.stream({"topic": "bears"}):
    print(t)


Why
 do
 bears
 have
 hairy
 coats
?
 

F
ur
 protection
!




In [None]:
# `ainvoke` runs the chain asynchronously, allowing other tasks to run while waiting, but still returns the final result only when the chain finishes
response = await chain.ainvoke({"topic": "bears"})
response

'Why did the bear bring a flashlight to the party? \nHe heard it was going to be a "beary" good time!'