## Demo: Mastering LangChain Expression Language (LCEL)
This series of demos is designed to accompany the Generative AI Engineering workshop. We will move beyond single LLM calls to explore Reasoning Dataflows.

Key Objectives:
* Runnables: Understanding the atomic unit of LCEL.
* The Pipe Operator (|): Visualizing the dataflow from prompt to parser.
* RAG Integration: Connecting models to external knowledge streams.
* Parallelism & Control: Implementing ensemble-style intelligence and cost-aware routing.

In [1]:
# Install necessary libraries
# !pip install langchain langchain-openai

import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# Initialize the Model as a Transform Function
model = ChatOpenAI(model="gpt-4o-mini")
print("Setup Complete. Model Initialized.")

Setup Complete. Model Initialized.


### The Basic Pipe Flow
The Pipe Flow Concept
In LCEL, we treat reasoning as flowing data. Instead of nested function calls, we use the pipe operator (|) to create a clear, traceable pipeline.

In [2]:
# 1. Define the Prompt (Computational Object) [cite: 287]
prompt = ChatPromptTemplate.from_template(
    "Tell me a brief professional joke about {topic}."
)

# 2. Define the Output Parser (Intelligence Filter) [cite: 301]
parser = StrOutputParser()

# 3. Construct the Chain using Pipe Flow
# Flow: Input -> Prompt -> Model -> Parser
chain = prompt | model | parser

# Execute the pipeline
response = chain.invoke({"topic": "AI Engineering"})
print(f"Chain Output: {response}")


Chain Output: Why did the AI engineer bring a ladder to work?

Because they heard the job required some high-level thinking!


### Parallel Reasoning Pipelines
**Parallelism and Ensemble Intelligence:**
Real-world problems often require multi-step reasoning. LCEL allows us to run multiple reasoning paths simultaneouslyâ€”for example, generating an answer while a separate path critiques the reasoning or verifies facts.

In [3]:
# Define two different reasoning paths
joke_chain = ChatPromptTemplate.from_template("Joke about {topic}") | model | parser
fact_chain = (
    ChatPromptTemplate.from_template("One real fact about {topic}") | model | parser
)

# Combine them into a Parallel Pipeline
map_chain = RunnableParallel(joke=joke_chain, fact=fact_chain)

# Execute both simultaneously
results = map_chain.invoke({"topic": "OpenAI"})
print(f"Parallel Result 1 (Joke): {results['joke']}")
print(f"Parallel Result 2 (Fact): {results['fact']}")

Parallel Result 1 (Joke): Why did OpenAI go to therapy? 

Because it had too many unresolved parameters!
Parallel Result 2 (Fact): OpenAI was founded in December 2015 with the mission to ensure that artificial general intelligence (AGI) benefits all of humanity.


### Cost-Aware Routing
**Cost-Aware Execution:**
Not every task requires the most powerful model. With LCEL, we can implement Conditional Logic to route simple tasks to smaller models (like GPT-4o-mini) and complex reasoning to larger ones, optimizing the system economically.

RunnableBranch allows the pipeline to adapt dynamically, routing simple queries to efficient routes and complex tasks to deeper reasoning paths. This is essential for Cost-Aware Execution, ensuring intelligence is economically optimized by routing simpler steps to smaller models.

In [4]:
from langchain_core.runnables import RunnableBranch

# To fix the ValueError, we must ensure the 'model' receives a string or message.
# We wrap the model in a small chain that extracts the 'question' from the input dict.
model_path = (lambda x: x["question"]) | model

# Define the routing logic
# LCEL treats reasoning as flowing data; each step reshapes meaning [cite: 273, 274, 276]
branch = RunnableBranch(
    (lambda x: len(x["question"]) < 20, model_path),  # Route simple queries
    model_path,  # Default route for complex tasks
)

# Build the full pipeline
# User input flows through as a 'Computational Object' [cite: 286, 287]
full_chain = {"question": RunnablePassthrough()} | branch | parser

# Test routing
try:
    simple_res = full_chain.invoke("Hi!")
    complex_res = full_chain.invoke(
        "Explain the structural implications of parallel reasoning pipelines in LCEL."
    )

    print(f"Simple Route Output: {simple_res}")
    print(f"Complex Route Output: {complex_res}")
except Exception as e:
    print(f"Error: {e}")

Simple Route Output: Hello! How can I assist you today?
Complex Route Output: Parallel reasoning pipelines in the context of LCEL (Logic-based Communication and Expression Language) and similar systems introduce both opportunities and challenges that can affect the structure and organization of the reasoning process. Here are some structural implications of utilizing parallel reasoning pipelines:

1. **Modularity**: Parallel reasoning allows for the decomposition of complex problems into smaller, more manageable sub-problems. Each pipeline can independently address a specific aspect of reasoning, promoting a modular architecture that enhances maintainability and comprehension.

2. **Increased Throughput**: By processing multiple reasoning tasks simultaneously, parallel pipelines can significantly increase the overall throughput of the system. This is particularly beneficial in environments requiring real-time decision-making or where large datasets need to be processed quickly.

3. **R