## LCEL and the Chain Interface

### What is LCEL?

The **LangChain Expression Language (LCEL)** is a declarative approach for efficiently describing and constructing chains. It simplifies the process of chaining components by focusing on _what_ needs to happen rather than _how_ it should be implemented.

---

### When to Use LCEL vs. the Chain Interface

While simple applications may only require a single LLM, more complex workflows often involve integrating LLMs with other components. LangChain provides two key frameworks for building such workflows:

1. **Chain Interface**: The traditional, procedural method for constructing chains.
2. **LCEL**: A modern, declarative alternative designed for simplicity and flexibility.

For new applications, **LCEL** is the recommended choice due to its optimized execution and ease of use. However, the **Chain interface** can still be incorporated within LCEL, enabling a hybrid approach that leverages the strengths of both frameworks.

# Advantages of Using LCEL

The **LangChain Expression Language (LCEL)** offers several key advantages that enhance efficiency, flexibility, and observability when building and managing chains:

---

### 1. **Asynchronous, Batch, and Streaming Support**
LCEL chains natively support:
- **Synchronous** and **Asynchronous** execution.
- **Batch processing** for handling multiple inputs simultaneously.
- **Streaming** for real-time, incremental output generation.

This versatility enables developers to:
- Start with a simple synchronous prototype.
- Seamlessly transition to an asynchronous, streaming-based interface as application demands grow.

---

### 2. **Built-in Fallback Mechanism**
LCEL makes it easy to integrate fallbacks within chains, ensuring:
- Robust error handling.
- Graceful recovery from failures without disrupting the chain's overall flow.

---

### 3. **Optimized Parallel Processing**
LCEL chains are designed for **parallel execution** of their components. This is especially beneficial for LLM-based workflows that involve:
- Lengthy API calls.
- High latency operations.

Parallelism significantly reduces processing time, making LCEL ideal for performance-critical applications.

---

### 4. **Seamless Integration with LangSmith**
LCEL automatically logs every step of chain execution in **LangSmith**, offering:
- Maximum **observability** for monitoring and troubleshooting.
- Enhanced **debuggability** for complex workflows.

This integration ensures transparency and simplifies the process of identifying and resolving issues within chains.

!pip install langchain==0.0.321

In [7]:
import os
from rich import print as pp
# os.environ["OPENAI_API_KEY"] = "--------------------------------------"

## Describe the chain with "LCEL".
#### The chain that connects "prompt template → model" is written as " prompt | model ".

In [8]:
from langchain.prompts import ChatPromptTemplate
# from langchain.chat_models import ChatOpenAI
from langchain_ollama import ChatOllama
model = ChatOllama(model='llama3.2:1b')

prompt = ChatPromptTemplate.from_template("Tell a joke about {topic}")
chain = prompt | model

In [9]:
pp(chain)

## Streaming responses

In [10]:
for s in chain.stream({"topic": "Programming"}):
    print(s.content, end="", flush=True)

A programmer walked into a library and asked the librarian, "Do you have any books on computer science?" The librarian replied, "It's a bit of a crash course, but I think we can help with that."

# LangChain Components and the `Runnable` Protocol

LangChain components follow the **Runnable protocol**, which defines a standardized interface for creating and executing custom chains in a consistent and reusable manner.

---

## Standard Interface

The `Runnable` protocol provides both synchronous and asynchronous methods for interacting with components:

### Synchronous Methods
- **`stream`**: Streams chunks of the response back in real-time.
- **`invoke`**: Executes the chain with a given input and returns the result.
- **`batch`**: Processes a list of inputs through the chain and returns a list of outputs.

### Asynchronous Methods
- **`astream`**: Asynchronously streams chunks of the response in real-time.
- **`ainvoke`**: Asynchronously invokes the chain with a single input.
- **`abatch`**: Asynchronously processes a batch of inputs and returns corresponding outputs.
- **`astream_log`**: Streams intermediate steps along with the final response for enhanced debugging.

---

## Input and Output Types

The types of input and output vary depending on the specific component in use. 

### Input Types
- **Prompt**: Accepts a dictionary.
- **Retriever**: Takes a single string.
- **LLM / ChatModel**: Accepts a single string, a list of messages, or a `PromptValue`.
- **Tool**: May accept a single string or a dictionary, depending on the tool's implementation.
- **OutputParser**: Works with outputs from an LLM or ChatModel.

### Output Types
- **LLM**: Produces a string.
- **ChatModel**: Outputs a chat message object.
- **Prompt**: Returns a `PromptValue`.
- **Retriever**: Provides a list of documents.
- **Tool**: Output depends on the tool's functionality.
- **OutputParser**: Output varies depending on the parser's implementation.

---

## Inspecting Input and Output Schemas

You can validate the expected input and output types of a component using its schema definitions, which follow the **Pydantic** framework:

- **`input_schema`**: Defines the schema for input types.
- **`output_schema`**: Defines the schema for output types.

These schemas ensure compatibility and provide a clear structure for integrating components into workflows.

In [11]:
chain.input_schema.schema()

{'properties': {'topic': {'title': 'Topic', 'type': 'string'}},
 'required': ['topic'],
 'title': 'PromptInput',
 'type': 'object'}

In [12]:
prompt.input_schema.schema()

{'properties': {'topic': {'title': 'Topic', 'type': 'string'}},
 'required': ['topic'],
 'title': 'PromptInput',
 'type': 'object'}

In [13]:
model.input_schema.schema()

{'$defs': {'AIMessage': {'additionalProperties': True,
   'description': 'Message from an AI.\n\nAIMessage is returned from a chat model as a response to a prompt.\n\nThis message represents the output of the model and consists of both\nthe raw output as returned by the model together standardized fields\n(e.g., tool calls, usage metadata) added by the LangChain framework.',
   'properties': {'content': {'anyOf': [{'type': 'string'},
      {'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]},
       'type': 'array'}],
     'title': 'Content'},
    'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},
    'response_metadata': {'title': 'Response Metadata', 'type': 'object'},
    'type': {'const': 'ai',
     'default': 'ai',
     'enum': ['ai'],
     'title': 'Type',
     'type': 'string'},
    'name': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'default': None,
     'title': 'Name'},
    'id': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'd

In [14]:
chain.output_schema.schema()

{'$defs': {'AIMessage': {'additionalProperties': True,
   'description': 'Message from an AI.\n\nAIMessage is returned from a chat model as a response to a prompt.\n\nThis message represents the output of the model and consists of both\nthe raw output as returned by the model together standardized fields\n(e.g., tool calls, usage metadata) added by the LangChain framework.',
   'properties': {'content': {'anyOf': [{'type': 'string'},
      {'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]},
       'type': 'array'}],
     'title': 'Content'},
    'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},
    'response_metadata': {'title': 'Response Metadata', 'type': 'object'},
    'type': {'const': 'ai',
     'default': 'ai',
     'enum': ['ai'],
     'title': 'Type',
     'type': 'string'},
    'name': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'default': None,
     'title': 'Name'},
    'id': {'anyOf': [{'type': 'string'}, {'type': 'null'}],
     'd

In [20]:
pp(chain.invoke({"topic": "Football"}))

In [21]:
pp(chain.batch([{"topic": "Love"}, {"topic": "Romance"}]))

In [22]:
pp(chain.batch([{"topic": "Coding"}, {"topic": "Travelling"}], config={"max_concurrency": 5}))

### Async Stream

In [23]:
async for s in chain.astream({"topic": "Satellites"}):
    print(s.content, end="", flush=True)

Why did the satellite go to therapy?

Because it had a lot of "orbital" issues.

### Async Invoke

In [24]:
async for s in chain.astream({"topic": "Food"}):
    print(s.content, end="", flush=True)

Why was the pizza in a bad mood?

Because it was feeling crusty.

### Async Batch

In [28]:
c =await chain.abatch([{"topic": "Food"}, {"topic": "Sweets"}])
pp(c)

### Async Stream with intermediate steps

Useful for displaying progress to the user, working with intermediate results, and debugging chains. You can stream all steps (default) or include or exclude steps by name, tags, or metadata.

In [18]:
!pip install faiss-cpu tiktoken



Execute the Retriever chain and output intermediate steps using astream_log()

In [29]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from langchain.vectorstores import FAISS

template = """Please answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(["Sonu is the creator of AI Anytime Youtube Channel"], embedding=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()

retrieval_chain = (
    {"context": retriever.with_config(run_name='Docs'), "question": RunnablePassthrough()}
    | prompt 
    | model 
    | StrOutputParser()
)

async for chunk in retrieval_chain.astream_log("Who is the creator of AI Anytime?", include_names=['Docs']):
    print("-"*40)
    print(chunk)

  vectorstore = FAISS.from_texts(["Sonu is the creator of AI Anytime Youtube Channel"], embedding=OpenAIEmbeddings())


----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': 'ce998db5-012a-4db9-bd20-5fc3b0e67a77',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'b73a75fd-5805-42fc-bbaf-9dadf4537a97',
            'metadata': {'ls_embedding_provider': 'OpenAIEmbeddings',
                         'ls_retriever_name': 'vectorstore',
                         'ls_vector_store_provider': 'FAISS'},
            'name': 'Docs',
            'start_time': '2025-01-13T16:33:24.017+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
            'type': 'retriever'}})
--------------------

## Parallel Processing

"RunnableParallel" allows each element to run in parallel.

In [30]:
from langchain.schema.runnable import RunnableParallel
chain1 = ChatPromptTemplate.from_template("Tell a joke about {topic}") | model
chain2 = ChatPromptTemplate.from_template("Write a short (2 line) poem about {topic}") | model
combined = RunnableParallel(joke=chain1, poem=chain2)

In [34]:
%%time

pp(chain1.batch([{"topic": "AI"}, {"topic": "Math"}]))

CPU times: user 112 ms, sys: 11.4 ms, total: 123 ms
Wall time: 1.22 s


In [35]:
%%time

pp(chain2.batch([{"topic": "Science"}, {"topic": "Mango"}]))

CPU times: user 27.8 ms, sys: 2.81 ms, total: 30.6 ms
Wall time: 366 ms


In [36]:
%%time

# combined
pp(combined.batch([{"topic": "AI"}, {"topic": "Mountains"}]))

CPU times: user 69.1 ms, sys: 7.15 ms, total: 76.3 ms
Wall time: 863 ms
