## LangChain Expression Language (LCEL)
LangChain Expression Language, or LCEL, is a declarative way to chain LangChain components.

In [1]:
import os
# Disable pip version check
os.environ['PIP_DISABLE_PIP_VERSION_CHECK'] = '1'
import warnings
warnings.filterwarnings('ignore')

In [2]:
from dotenv import load_dotenv, dotenv_values
import google.generativeai as genai
from IPython.display import Markdown, display
load_dotenv()
os.getenv("GOOGLE_API_KEY") 
my_api_key = os.getenv("GOOGLE_API_KEY")
genai.configure(api_key=my_api_key)



In [19]:
from langchain_google_genai.chat_models import  ChatGoogleGenerativeAI
model = ChatGoogleGenerativeAI(model= "gemini-1.5-flash") # "chat-bison@001"

#### Chaining Runnables

In [15]:
#### Using The pipe operator: |

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")

chain = prompt | model | StrOutputParser()
chain.invoke ( {"topic": "Marriage"})

"Why did the married man get a vasectomy? \n\nHe wanted to make sure his wife wouldn't be able to blame him for anything anymore! \n"

In [16]:
#### Using Coercion
"""We can even combine this chain with more runnables to create another chain. 
This may involve some input/output formatting using other types of runnables, 
depending on the required inputs and outputs of the chain components"""
analysis_prompt = ChatPromptTemplate.from_template("is this a funny joke? {joke}")

composed_chain = {"joke": chain} | analysis_prompt | model | StrOutputParser()

composed_chain.invoke({"topic": "Marriage"})


'That\'s a pretty good one! It\'s a classic dad joke - simple, silly, and relies on a pun.  It\'s funny because it plays on the word "high" and the unexpected image of someone using a ladder to reach the high tide. \n\nI\'d love to hear another joke! üòÑ \n'

In [17]:
### Using the Pipe Method

from langchain_core.runnables import RunnableParallel

composed_chain_with_pipe = (
    RunnableParallel({"joke": chain})
    .pipe(analysis_prompt)
    .pipe(model)
    .pipe(StrOutputParser())
)

composed_chain_with_pipe.invoke({"topic": "Marriage"})

'Yes, this is a funny joke! Here\'s why:\n\n* **Wordplay:** The joke uses the phrase "You\'re not listening!" in a clever way. It plays on the common complaint of one partner feeling unheard, but then twists it by having the husband realize his wife was actually right all along.\n* **Unexpected Twist:** The joke sets up the expectation that the husband is annoyed by his wife\'s complaint. The punchline then subverts this expectation, creating a humorous surprise.\n* **Relatability:**  Many people can relate to the experience of feeling unheard in a relationship, making the joke resonate with a wider audience.\n\nOverall, the joke is funny because it\'s clever, unexpected, and relatable.  The wink face at the end adds to the playful tone. \n'

#### Streaming runnables
Streaming is critical in making applications based on LLMs feel responsive to end-users.
Important LangChain primitives like chat models, output parsers, prompts, retrievers, and agents implement the LangChain <b> Runnable Interface </b>.
This interface provides two general approaches to stream content:

<b> sync stream and async astream </b> :  a default implementation of streaming that streams the final output from the chain.

<b> async astream_events and async astream_log</b>: these provide a way to stream both intermediate steps and final output from the chain.

In [18]:
chunks = []
for chunk in model.stream("what color is the sea?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

The| sea is not a single color, but rather appears in a range of shades depending| on various factors:

* **Depth:**  Shallow water appears turquoise or green| due to sunlight reflecting off the seafloor and suspended particles. Deeper water appears blue because red and yellow wavelengths of light are absorbed by water molecules, leaving the blue| wavelengths to be reflected back.
* **Sunlight:** The angle of the sun affects the color of the sea. At sunrise and sunset, the light travels through| more atmosphere, scattering the blue wavelengths and leaving reds and oranges to be reflected.
* **Turbidity:** The presence of sediment, algae, and other particles can make the water appear green, brown, or even red.
* **|Sky:** The color of the sky can also affect the perception of the sea's color. A clear blue sky will make the sea appear bluer, while a cloudy sky will make it appear grayer.

**So, the "|color" of the sea is actually a complex interplay of these factors, and can vary greatly

In [20]:
chunks = []
async for chunk in model.astream("what color is the sky?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

The| sky is **blue** during the day due to a phenomenon called **Rayleigh| scattering**. 

Here's why:

* **Sunlight:** Sunlight is| made up of all the colors of the rainbow.
* **Scattering:** When sunlight enters the Earth's atmosphere, it collides with gas molecules.|
* **Wavelengths:** Blue light has a shorter wavelength than other colors, so it gets scattered more easily by the molecules in the air.
* **|Our Eyes:** Our eyes are most sensitive to blue light, so we see the scattered blue light as the color of the sky.

However, the sky can appear other colors at different times of day:

* **Sunrise and Sunset:**| The sky appears red and orange because the sunlight has to travel through more atmosphere at these times, scattering away most of the blue light.
* **Clouds:** Clouds can appear white or gray because they reflect sunlight.
* **Night:**| The sky appears black at night because there is no sunlight to scatter. 
|

#### InputStream
What if you wanted to stream JSON from the output as it was being generated? If you were to rely on json.loads to parse the partial json, the parsing would fail as the partial json wouldn't be valid json.

In [21]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, flush=True)

{'countries': [{}]}
{'countries': [{'name': 'France', 'population': 67.39}]}
{'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {}]}
{'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {'name': 'Japan', 'population': 125.8}]}


In [24]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

In [23]:
from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(text, end="|", flush=True)

France|Spain|Japan|

#### Parallelizing Runnables 
RunnableParallels are useful for parallelizing operations, but can also be useful for manipulating the output of one Runnable to match the input format of the next Runnable in a sequence.

In [27]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_google_genai import GoogleGenerativeAIEmbeddings

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=GoogleGenerativeAIEmbeddings(model="models/text-embedding-004")
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

# The prompt expects input with keys for "context" and "question"
prompt = ChatPromptTemplate.from_template(template)


retrieval_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

retrieval_chain.invoke("where did harrison work?")

'Harrison worked at Kensho. \n'

In [59]:
### Parallellizing Steps
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
import textwrap

joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = (
    ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
)

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

result = map_chain.invoke({"topic": "bear"})

joke = result['joke']
poem = result['poem']
print(joke.content)
print(poem.content) 

Why did the bear say no to dessert? 

Because he was stuffed! üêª 

A furry giant, strong and bold,
In the forest, stories unfold. 



#### @chain decorator
We can also turn an arbitrary function into a chain by adding a @chain decorator. This is functionaly equivalent to wrapping the function in a RunnableLambda constructor.

In [60]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import chain

prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")


@chain
def custom_chain(text):
    prompt_val1 = prompt1.invoke({"topic": text})
    output1 = model.invoke(prompt_val1)
    parsed_output1 = StrOutputParser().invoke(output1)
    chain2 = prompt2 | model | StrOutputParser()
    return chain2.invoke({"joke": parsed_output1})


custom_chain.invoke("bears")

'The subject of the joke is **poker**. \n\nThe joke uses a pun, playing on the word "bear" and its homophone "bare" to create a humorous situation about why poker wouldn\'t be a good game to play in the forest. \n'

#### Automatic coercion in chains

In [64]:
prompt = ChatPromptTemplate.from_template("tell me a story about {topic}")


### Delimiting the length of the content
chain_with_coerced_function = prompt | model | (lambda x: x.content[:5])

result = chain_with_coerced_function.invoke({"topic": "bears"})
print(result)


The s


#### How to inspect runnables

In [None]:
!pip install grandalf

In [67]:
retrieval_chain.get_graph()

Graph(nodes={'40844a25c1b1464895b050582948c09a': Node(id='40844a25c1b1464895b050582948c09a', data=<class 'pydantic.v1.main.RunnableParallel<context,question>Input'>), '06aec70321e242e5a83b0287991832e4': Node(id='06aec70321e242e5a83b0287991832e4', data=<class 'pydantic.v1.main.RunnableParallel<context,question>Output'>), '793a9605c7bf4bd48f2480f1542c50a5': Node(id='793a9605c7bf4bd48f2480f1542c50a5', data=VectorStoreRetriever(tags=['FAISS', 'GoogleGenerativeAIEmbeddings'], vectorstore=<langchain_community.vectorstores.faiss.FAISS object at 0x0000020C10D08E10>)), 'fe605b09c1b14c1b8c745464391cb2fe': Node(id='fe605b09c1b14c1b8c745464391cb2fe', data=RunnablePassthrough()), '452458793be042958e554c4dc3eb61ee': Node(id='452458793be042958e554c4dc3eb61ee', data=ChatPromptTemplate(input_variables=['context', 'question'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['context', 'question'], template='Answer the question based only on the following context:\n{context}\n

In [71]:
retrieval_chain.get_graph().print_ascii()

            +---------------------------------+         
            | Parallel<context,question>Input |         
            +---------------------------------+         
                    **               ***                
                 ***                    ***             
               **                          **           
+----------------------+               +-------------+  
| VectorStoreRetriever |               | Passthrough |  
+----------------------+               +-------------+  
                    **               ***                
                      ***         ***                   
                         **     **                      
           +----------------------------------+         
           | Parallel<context,question>Output |         
           +----------------------------------+         
                             *                          
                             *                          
                             * 

In [73]:
retrieval_chain.get_prompts()

[ChatPromptTemplate(input_variables=['context', 'question'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['context', 'question'], template='Answer the question based only on the following context:\n{context}\n\nQuestion: {question}\n'))])]