### LCEL Deepdive

In [1]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
prompt = ChatPromptTemplate.from_template("tell me a short joke about {topic}")
model = ChatOpenAI()
output_parser = StrOutputParser()

chain = prompt | model | output_parser

chain.invoke({"topic": "ice cream"})

'Why did the ice cream truck break down? Because it had too many "scoops" of ice cream!'

Every part in our chain implements the Runnable interface from langchain
and thus has an 'invoke' method.

In [3]:
print(prompt.invoke({"topic": "ice cream"}))

messages=[HumanMessage(content='tell me a short joke about ice cream', additional_kwargs={}, response_metadata={})]


In [4]:
from langchain_core.messages.human import HumanMessage

messages = [HumanMessage(content='tell me a short joke about ice cream')]
model.invoke(messages)

AIMessage(content='Why did the ice cream truck break down?\nIt had too many "scoops"!', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 19, 'prompt_tokens': 15, 'total_tokens': 34, 'completion_tokens_details': {'reasoning_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-cf1b6163-8386-4cbb-952d-37a16eda31a5-0', usage_metadata={'input_tokens': 15, 'output_tokens': 19, 'total_tokens': 34})

### What is this "|" in Python?

In [5]:
from abc import ABC, abstractmethod

class CRunnable(ABC):
    def __init__(self):
        self.next = None

    @abstractmethod
    def process(self, data):
        """
        This method must be implemented by subclasses to define
        data processing behavior.
        """
        pass

    def invoke(self, data):
        processed_data = self.process(data)
        if self.next is not None:
            return self.next.invoke(processed_data)
        return processed_data

    # NOTE: Here we override the | operator to create a new CRunnableSequence
    # object that chains the current object with another object.
    # Example: runnable1 | runnable2 | runnable3
    # Expands to: CRunnableSequence(CRunnableSequence(runnable1, runnable2), runnable3)
    def __or__(self, other):
        return CRunnableSequence(self, other)
    
    # This design allows for chaining without explicitly setting the next variable.
    # If you want to use the next variable for chaining, you would need to add a
    #  method to set it, like so:
    def set_next(self, next_runnable):
        self.next = next_runnable
        return next_runnable

class CRunnableSequence(CRunnable):
    def __init__(self, first, second):
        super().__init__()
        self.first = first
        self.second = second

    def process(self, data):
        return data

    def invoke(self, data):
        first_result = self.first.invoke(data)
        return self.second.invoke(first_result)



In [6]:
class AddTen(CRunnable):
    def process(self, data):
        print("AddTen: ", data)
        return data + 10

class MultiplyByTwo(CRunnable):
    def process(self, data):
        print("Multiply by 2: ", data)
        return data * 2

class ConvertToString(CRunnable):
    def process(self, data):
        print("Convert to string: ", data)
        return f"Result: {data}"

In [7]:
a = AddTen()
b = MultiplyByTwo()
c = ConvertToString()

chain = a | b | c

In [8]:
result = chain.invoke(10)
print(result)

AddTen:  10
Multiply by 2:  20
Convert to string:  40
Result: 40


### Runnables from LangChain

In [9]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel

In [10]:
chain = RunnablePassthrough() | RunnablePassthrough () | RunnablePassthrough ()
chain.invoke("hello")

'hello'

In [11]:
def input_to_upper(input: str):
    output = input.upper()
    return output

In [12]:
chain = RunnablePassthrough() | RunnableLambda(input_to_upper) | RunnablePassthrough()
chain.invoke("hello")

'HELLO'

In [13]:
chain = RunnableParallel({"x": RunnablePassthrough(), "y": RunnablePassthrough()})

In [14]:
chain.invoke("hello")

{'x': 'hello', 'y': 'hello'}

In [15]:
chain.invoke({"input": "hello", "input2": "goodbye"})

{'x': {'input': 'hello', 'input2': 'goodbye'},
 'y': {'input': 'hello', 'input2': 'goodbye'}}

In [16]:
chain = RunnableParallel({"x": RunnablePassthrough(), "y": lambda z: z["input2"]})

In [17]:
chain.invoke({"input": "hello", "input2": "goodbye"})

{'x': {'input': 'hello', 'input2': 'goodbye'}, 'y': 'goodbye'}

### Nested chains - now it gets more complicated!

In [18]:
def find_keys_to_uppercase(input: dict):
    output = input.get("input", "not found").upper()
    return output

In [19]:
chain = RunnableParallel({"x": RunnablePassthrough() | RunnableLambda(find_keys_to_uppercase), "y": lambda z: z["input2"]})

In [20]:
chain.invoke({"input": "hello", "input2": "goodbye"})

{'x': 'HELLO', 'y': 'goodbye'}

In [21]:
chain = RunnableParallel({"x": RunnablePassthrough()})

def assign_func(input):
    return 100

def multiply(input):
    return input * 10

In [22]:
chain.invoke({"input": "hello", "input2": "goodbye"})

{'x': {'input': 'hello', 'input2': 'goodbye'}}

In [23]:
chain = RunnableParallel({"x": RunnablePassthrough()}).assign(extra=RunnableLambda(assign_func))

In [24]:
result = chain.invoke({"input": "hello", "input2": "goodbye"})
print(result)

{'x': {'input': 'hello', 'input2': 'goodbye'}, 'extra': 100}


### Combine multiple chains (incl. coercion)

In [25]:
def extractor(input: dict):
    return input.get("extra", "Key not found")

def cupper(upper: str):
    return str(upper).upper()

new_chain = RunnableLambda(extractor) | RunnableLambda(cupper)

In [26]:
new_chain.invoke({"extra": "test"})

'TEST'

In [27]:
final_chain = chain | new_chain
final_chain.invoke({"input": "hello", "input2": "goodbye"})

'100'

### Real Work example

In [29]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings


vectorstore = FAISS.from_texts(
    ["Cats love thuna"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template=template)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


In [31]:

rag_chain = (
    # The below is equivalent to: RunnableParallel({"context": retriever | format_docs, "question": RunnablePassthrough()})
    # So the input question: "What does cats like to eat" , is passed to the retriever and the RunnablePassthrough.
    # The retriever will return matching docs that are then formatted by the format_docs function which
    # will be substituted, by the prompt Runnable, into the template as the context.
    RunnableParallel({"context": retriever | format_docs, "question": RunnablePassthrough()})
    #{"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | ChatOpenAI()
    | StrOutputParser()
)

In [32]:
rag_chain.invoke("What do cats like to eat?")

'Tuna'