In [68]:
from dotenv import load_dotenv
import os
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings

load_dotenv()

chat_model = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_RESOURCE"],
    azure_deployment=os.environ["AZURE_OPENAI_NAME"],
    api_version=os.environ["AZURE_OPENAI_VERSION"],
    api_key=os.environ["AZURE_OPENAI_KEY"],
    max_tokens=4000,
    temperature=0,
)

embedding_model = AzureOpenAIEmbeddings(
    azure_deployment=os.environ["AZURE_EMBEDDINGS_NAME"],
    azure_endpoint=os.environ["AZURE_OPENAI_RESOURCE"],
    api_version=os.environ["AZURE_EMBEDDINGS_VERSION"],
    api_key=os.environ["AZURE_OPENAI_KEY"],
)

# What is LCEL?
* LangChain expression language that allows to chain together runnable elements using the pipe `|` operator.
* When chaining together runnable elements the output of the previous element .invoke() is passed to the next one.
* A sequence of chained runnable elements is considered a Runnable Sequence and is itslef a Runnable that can be chained.

# Why LangChain recommends the usage of LCEL?

* Best for streaming: Get the best time-to-first-token. Get incremental chunks of output for chains at the same token rate as the given by the service provider (API).
* Sync and async support.
* Parallel execution: When some steps in a chain can be parallelized, they do it automatically.
* More summarized code and more representative of the LangChain chains (easier to understand).


# RAG example with LCEL

In [69]:
from langchain_community.vectorstores import DocArrayInMemorySearch
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

vectorstore = DocArrayInMemorySearch.from_texts(
    ["pablo used to work in a bakery last summer", "during winter pablo studies"],
    embedding=embedding_model,
)
retriever = vectorstore.as_retriever()

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
output_parser = StrOutputParser()

setup_and_retrieval = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
)
chain = setup_and_retrieval | prompt |  chat_model | output_parser

chain.invoke("Where was Pablo working last summer?")


'Pablo was working in a bakery last summer.'

When we need the final output of a LCEL chain we always have to use the `StrOutputParser`, otherwise we get all the response metadata as output. Using `StrOutputParser` is like only keeping `.content` when using `.invoke()`

In [70]:
chain = setup_and_retrieval | prompt |  chat_model

chain.invoke("Where was Pablo working last summer?")

AIMessage(content='Pablo was working in a bakery last summer.', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 53, 'total_tokens': 63}, 'model_name': 'gpt-4', 'system_fingerprint': 'fp_2f57f81c11', 'prompt_filter_results': [{'prompt_index': 0, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}], 'finish_reason': 'stop', 'logprobs': None, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}, id='run-557e0166-fd97-43c8-8401-b5fd1825703b-0', usage_metadata={'input_tokens': 53, 'output_tokens': 10, 'total_tokens': 63})

# Streaming

It is done in the same way, we just use the runnable chain to run .astream

In [27]:
chain = setup_and_retrieval | prompt |  chat_model | output_parser

async for event in chain.astream("Tell me a story of 2 paragraphs"):
    print(event, end="")

Pablo spent his last summer enveloped in the warm, comforting aroma of freshly baked bread and pastries. Each morning, as the sun began to peek over the horizon, he would tie his apron and prepare for the day's work at the local bakery. His hands, dusted with flour, moved with practiced ease as he kneaded dough and shaped loaves. The bakery was a bustling hub in the small town, and Pablo took pride in his craft, knowing that his efforts brought joy and satisfaction to the community. The regulars knew him by name, and their friendly banter was the soundtrack to his summer days.

As the seasons turned and the chill of winter set in, Pablo traded the warmth of the ovens for the quiet of the library. He was a diligent student, his mind as nimble as his baker's fingers, now turning pages and scribbling notes. Winter was a time for study, for burying himself in textbooks and lectures, preparing for a future that he hoped would be as fulfilling as his time at the bakery. The memories of summe

However, we can also use the method `.astream_log()` which outputs the all the intermediate steps of a chains, and allow us to debug the chain or to provide more info of the intermedite steps, like retrieved context.

In [28]:
async for chunk in chain.astream_log(
    "Where was Pablo working last summer?", include_names=["Docs"]
):
    print("-" * 40)
    print(chunk)

----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '840b382c-b91a-4cc9-96b9-f19ead866d56',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})
----------------------------------------
RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ''},
 {'op': 'replace', 'path': '/final_output', 'value': ''})
----------------------------------------
RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': 'P'},
 {'op': 'replace', 'path': '/final_output', 'value': 'P'})
----------------------------------------
RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': 'ablo'},
 {'op': 'replace', 'path': '/final_output', 'value': 'Pablo'})
----------------------------------------
RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ' was'},
 {'op': 'replace', 'path': '/final_output', 'value': 'Pablo 

# Parallelism
LCEL chains can be parallelized when the input of one chain doesn't depend on the output of the other using `RunnableParallel` 

In [29]:
from langchain_core.runnables import RunnableParallel

chain = setup_and_retrieval | prompt |  chat_model | output_parser
parallel_chain = RunnableParallel(question_rag_1=chain, question_rag_2=chain)

In [31]:
%%time
chain.invoke("During which season Pablo works? Give me a short answer")

CPU times: user 33.8 ms, sys: 7.42 ms, total: 41.2 ms
Wall time: 1.02 s


'Pablo works during the summer.'

In [32]:
%%time
chain.invoke("During which season Pablo studies? Give me a short answer")

CPU times: user 44.5 ms, sys: 4.02 ms, total: 48.5 ms
Wall time: 958 ms


'Pablo studies during winter.'

In [33]:
%%time
parallel_chain.invoke("During which season Pablo studies? Give me a short answer")

CPU times: user 89.3 ms, sys: 5.07 ms, total: 94.3 ms
Wall time: 941 ms


{'question_rag_1': 'Winter.', 'question_rag_2': 'Pablo studies during winter.'}

## Parallel + batch
We can also run batch parallel operations.

In [35]:
%%time
parallel_chain.batch(["During which season Pablo studies? Give me a short answer", "During which season Pablo works? Give me a short answer"])

CPU times: user 185 ms, sys: 13.9 ms, total: 199 ms
Wall time: 1.23 s


[{'question_rag_1': 'Pablo studies during winter.',
  'question_rag_2': 'Pablo studies during winter.'},
 {'question_rag_1': 'Pablo works during the summer.',
  'question_rag_2': 'Pablo used to work in the summer.'}]

# Streaming and output parsers

We can directly stream Parsed JSON objects without getting JSON parsin error for having an uncompleted JSON object.

In [25]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    chat_model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
    "Output a list of the spanish cities Madrid, Valencia, Sevilla and Santiago de Compostela and their more popular food in JSON format. Use a dict with an outer key of 'cities' which contains a list of countries. Each country should have the key `name` and `food`"
):
    print(text, flush=True)

{}
{'cities': []}
{'cities': [{}]}
{'cities': [{'name': ''}]}
{'cities': [{'name': 'Mad'}]}
{'cities': [{'name': 'Madrid'}]}
{'cities': [{'name': 'Madrid', 'food': ''}]}
{'cities': [{'name': 'Madrid', 'food': 'C'}]}
{'cities': [{'name': 'Madrid', 'food': 'Coc'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Mad'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madr'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrile'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrileño'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrileño'}, {}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrileño'}, {'name': ''}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrileño'}, {'name': 'Val'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrileño'}, {'name': 'Valencia'}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrileño'}, {'name': 'Valencia', 'food': ''}]}
{'cities': [{'name': 'Madrid', 'food': 'Cocido Madrileño'}

Whenever we want to include some extra step in the chain without breaking the functionality we have to use a generator function that implements a yield.

In [23]:
async def _extract_city_names_streaming(input_stream):
    """A function that operates on input streams."""
    food_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "cities" not in input:
            continue

        cities = input["cities"]

        if not isinstance(cities, list):
            continue

        for city in cities:
            name = city.get("name")
            if not name:
                continue
            if name not in food_so_far:
                yield name
                food_so_far.add(name)


chain = chat_model | JsonOutputParser() | _extract_city_names_streaming

async for text in chain.astream(
    "Output a list of the spanish cities Madrid, Valencia, Sevilla and Santiago de Compostela and their more popular food in JSON format. Use a dict with an outer key of 'cities' which contains a list of countries. Each country should have the key `name` and `food`"
):
    print(text, end="|", flush=True)

Mad|Madrid|Val|Valencia|Se|Sevilla|S|Santiago|Santiago de|Santiago de Com|Santiago de Compost|Santiago de Compostela|

# Graphical representation of LCEL chains

In [71]:
vectorstore = DocArrayInMemorySearch.from_texts(
    ["pablo used to work in a bakery last summer", "during winter pablo studies"],
    embedding=embedding_model,
)
retriever = vectorstore.as_retriever()

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
output_parser = StrOutputParser()

setup_and_retrieval = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
)
chain = setup_and_retrieval | prompt |  chat_model | output_parser

chain.get_graph().draw_mermaid_png(output_file_path="test_graph.png")
chain.get_graph().print_ascii()

            +---------------------------------+         
            | Parallel<context,question>Input |         
            +---------------------------------+         
                    **               ***                
                 ***                    ***             
               **                          **           
+----------------------+               +-------------+  
| VectorStoreRetriever |               | Passthrough |  
+----------------------+               +-------------+  
                    **               ***                
                      ***         ***                   
                         **     **                      
           +----------------------------------+         
           | Parallel<context,question>Output |         
           +----------------------------------+         
                             *                          
                             *                          
                             * 

# Using @chain decorator
We can turn functions into LCEL Runnables using the @chain decorator

In [72]:
from langchain_core.runnables import chain

prompt1 = ChatPromptTemplate.from_template("Answer with yes or no to if the weather is usually good in {city}")
prompt2 = ChatPromptTemplate.from_template("If the text is 'yes' answer: 'Apply suncream' otherwise answer: 'Take an umbrella'. Text: {answer_1}")

@chain
def custom_chain(city):
    prompt_val1 = prompt1.invoke({"city": city})
    output1 = chat_model.invoke(prompt_val1)
    parsed_output1 = StrOutputParser().invoke(output1)
    chain2 = prompt2 | chat_model | StrOutputParser()
    return chain2.invoke({"answer_1": parsed_output1})


custom_chain.invoke("London")

'Take an umbrella.'

In [73]:
prev_chain_output = RunnableParallel(
    {"context": custom_chain}
)
prompt_final = ChatPromptTemplate.from_template("Based on context info make a very short joke with about weather: {context}")

chain_2 = prev_chain_output | prompt_final | chat_model | StrOutputParser()
chain_2.invoke("Madrid")

"Why did the cloud apply suncream? Because it didn't want to get sunburned at the evaporate!"

# Chaining multiple chains

In [57]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt1 = ChatPromptTemplate.from_template("what is the capital of {country}?")
prompt2 = ChatPromptTemplate.from_template(
    "What is the most popular and traditional food in {capital}? Answer in one short sentence"
)

chain1 = prompt1 | chat_model | StrOutputParser()

chain2 = (
    {"capital": chain1}
    | prompt2
    | chat_model
    | StrOutputParser()
)

chain2.invoke({"country": "Indonesia"})

'The most popular and traditional food in Jakarta is Nasi Goreng, which is Indonesian fried rice.'