In [None]:
# =============================================================================
# LCEL - LangChain Expression Language (LangChain 1.0+)
# =============================================================================
"""
=====================================================================
LangChain Expression Language (LCEL) - The Core of LangChain
=====================================================================

LCEL is a declarative way to compose chains using the pipe operator (|).
It's the foundation for building complex LLM applications in LangChain.

Why LCEL?
---------
1. Declarative syntax - Easy to read and understand
2. Streaming support - Built-in, works automatically
3. Async support - Same code works sync and async
4. Batch processing - Process multiple inputs efficiently
5. Parallel execution - Run independent tasks concurrently
6. Retries & fallbacks - Built-in error handling

Core Concept: Runnables
-----------------------
Everything in LCEL is a Runnable - an object with these methods:
- .invoke(input)     - Process single input
- .batch([inputs])   - Process multiple inputs
- .stream(input)     - Stream output tokens
- .ainvoke(input)    - Async version of invoke
- .abatch([inputs])  - Async version of batch
- .astream(input)    - Async version of stream

The Pipe Operator (|)
---------------------
    prompt | model | parser
    
    Input ‚Üí [Prompt] ‚Üí [Model] ‚Üí [Parser] ‚Üí Output
    
Each component transforms input and passes output to the next.

Updated for LangChain 1.0+ (2025-2026)
"""

import os
from dotenv import load_dotenv

load_dotenv()

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")

print("‚úÖ Environment configured for LCEL examples")

In [None]:
# =============================================================================
# Basic LCEL Chain: Prompt | Model | Parser
# =============================================================================
"""
The Classic LCEL Pattern
------------------------
Most LLM applications follow this pattern:

1. Prompt Template - Format user input into a prompt
2. Model - Send prompt to LLM, get response
3. Output Parser - Extract/format the response
"""

from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Initialize model
model = init_chat_model("gpt-4o-mini", model_provider="openai", temperature=0.7)

# Create prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant that explains concepts simply."),
    ("human", "Explain {topic} in {num_sentences} sentences.")
])

# Create output parser
parser = StrOutputParser()

# Build chain using LCEL pipe operator
chain = prompt | model | parser

print("=" * 60)
print("Basic LCEL Chain: prompt | model | parser")
print("=" * 60)

# Invoke the chain
result = chain.invoke({
    "topic": "machine learning",
    "num_sentences": 3
})

print(f"\nüìù Result:\n{result}")
print(f"\nüìä Result type: {type(result).__name__}")

In [None]:
# =============================================================================
# RunnablePassthrough - Passing Data Through
# =============================================================================
"""
RunnablePassthrough
-------------------
Passes input through unchanged. Useful for:
- Preserving original input alongside transformations
- Building complex data flows

RunnablePassthrough.assign()
----------------------------
Adds new keys to the input dict while preserving existing ones.
"""

from langchain_core.runnables import RunnablePassthrough

print("=" * 60)
print("RunnablePassthrough Examples")
print("=" * 60)

# Example 1: Simple passthrough
passthrough = RunnablePassthrough()
result = passthrough.invoke({"name": "Alice", "age": 30})
print(f"\n1Ô∏è‚É£ Simple passthrough:")
print(f"   Input: {{'name': 'Alice', 'age': 30}}")
print(f"   Output: {result}")

# Example 2: Passthrough with assign - add new keys
passthrough_with_assign = RunnablePassthrough.assign(
    greeting=lambda x: f"Hello, {x['name']}!",
    is_adult=lambda x: x['age'] >= 18
)

result = passthrough_with_assign.invoke({"name": "Alice", "age": 30})
print(f"\n2Ô∏è‚É£ Passthrough with assign:")
print(f"   Input: {{'name': 'Alice', 'age': 30}}")
print(f"   Output: {result}")

# Example 3: Using passthrough in a chain
# This pattern is common in RAG - pass through the question while adding context
def get_context(query):
    return f"Context for '{query}': Python is a programming language."

rag_chain = RunnablePassthrough.assign(
    context=lambda x: get_context(x["question"])
) | prompt | model | parser

# Note: This would need a prompt that accepts both question and context

In [None]:
# =============================================================================
# RunnableParallel - Running Tasks Concurrently
# =============================================================================
"""
RunnableParallel
----------------
Runs multiple runnables in parallel and combines their outputs.
Great for:
- Generating multiple perspectives
- Fetching data from multiple sources
- Running independent LLM calls concurrently

Syntax:
-------
RunnableParallel({"key1": runnable1, "key2": runnable2})
# or
{"key1": runnable1, "key2": runnable2}  # Dict is auto-converted
"""

from langchain_core.runnables import RunnableParallel

print("=" * 60)
print("RunnableParallel Examples")
print("=" * 60)

# Create different prompts for different perspectives
positive_prompt = ChatPromptTemplate.from_messages([
    ("system", "Focus only on POSITIVE aspects. Be enthusiastic."),
    ("human", "What do you think about {topic}?")
])

negative_prompt = ChatPromptTemplate.from_messages([
    ("system", "Focus only on CHALLENGES and concerns. Be critical."),
    ("human", "What do you think about {topic}?")
])

neutral_prompt = ChatPromptTemplate.from_messages([
    ("system", "Provide a balanced, factual analysis."),
    ("human", "What do you think about {topic}?")
])

# Create parallel chain - all three run concurrently!
parallel_chain = RunnableParallel({
    "positive": positive_prompt | model | parser,
    "negative": negative_prompt | model | parser,
    "neutral": neutral_prompt | model | parser,
})

# Run all perspectives in parallel
print("\nüîÑ Running 3 LLM calls in parallel...")
results = parallel_chain.invoke({"topic": "artificial intelligence"})

print("\n‚úÖ Positive View:")
print(f"   {results['positive'][:150]}...")

print("\n‚ö†Ô∏è Concerns:")
print(f"   {results['negative'][:150]}...")

print("\n‚öñÔ∏è Balanced View:")
print(f"   {results['neutral'][:150]}...")

In [None]:
# =============================================================================
# RunnableLambda - Custom Functions in Chains
# =============================================================================
"""
RunnableLambda
--------------
Wraps any Python function as a Runnable.
Allows custom logic in your chains.

Use cases:
- Data transformation
- Validation
- Logging
- Calling external APIs
"""

from langchain_core.runnables import RunnableLambda

print("=" * 60)
print("RunnableLambda Examples")
print("=" * 60)

# Example 1: Simple transformation
def uppercase(text: str) -> str:
    return text.upper()

uppercase_runnable = RunnableLambda(uppercase)
print(f"\n1Ô∏è‚É£ Simple function:")
print(f"   Input: 'hello world'")
print(f"   Output: {uppercase_runnable.invoke('hello world')}")

# Example 2: Complex transformation
def extract_and_format(data: dict) -> dict:
    """Custom processing function."""
    return {
        "formatted_name": data["name"].title(),
        "year_born": 2024 - data["age"],
        "original": data
    }

transform = RunnableLambda(extract_and_format)
print(f"\n2Ô∏è‚É£ Complex transformation:")
result = transform.invoke({"name": "john doe", "age": 30})
print(f"   Output: {result}")

# Example 3: Using lambda in a chain
chain_with_lambda = (
    RunnableLambda(lambda x: {"topic": x["topic"].lower(), "num_sentences": 2})
    | prompt
    | model
    | parser
    | RunnableLambda(lambda x: f"üìö {x}")
)

print(f"\n3Ô∏è‚É£ Lambda in chain:")
result = chain_with_lambda.invoke({"topic": "PYTHON"})
print(f"   {result[:150]}...")

In [None]:
# =============================================================================
# Batch Processing - Multiple Inputs at Once
# =============================================================================
"""
.batch() Method
---------------
Process multiple inputs efficiently.
- Same chain, multiple inputs
- Automatic parallelization
- Better throughput than sequential .invoke()

Options:
- max_concurrency: Limit parallel requests
- return_exceptions: Return errors instead of raising
"""

print("=" * 60)
print("Batch Processing Example")
print("=" * 60)

# Simple chain
simple_chain = prompt | model | parser

# Multiple topics to process
topics = [
    {"topic": "quantum computing", "num_sentences": 2},
    {"topic": "blockchain", "num_sentences": 2},
    {"topic": "neural networks", "num_sentences": 2},
]

print(f"\nüîÑ Processing {len(topics)} topics in batch...")

# Batch process with concurrency limit
results = simple_chain.batch(
    topics,
    config={"max_concurrency": 3}  # Limit concurrent API calls
)

for i, (topic, result) in enumerate(zip(topics, results)):
    print(f"\nüìù Topic {i+1}: {topic['topic']}")
    print(f"   {result[:100]}...")

In [None]:
# =============================================================================
# Fallbacks - Handling Errors Gracefully
# =============================================================================
"""
.with_fallbacks() Method
------------------------
Provides backup chains if the primary fails.

Use cases:
- Fallback to cheaper model if premium fails
- Fallback to different provider
- Graceful degradation
"""

from langchain_core.runnables import RunnableLambda

print("=" * 60)
print("Fallback Example")
print("=" * 60)

# Simulate a failing chain
def sometimes_fails(x):
    import random
    if random.random() < 0.5:
        raise Exception("Random failure!")
    return f"Success: {x}"

def always_works(x):
    return f"Fallback result: {x}"

# Primary chain that might fail
primary = RunnableLambda(sometimes_fails)

# Fallback chain that always works
fallback = RunnableLambda(always_works)

# Chain with fallback
robust_chain = primary.with_fallbacks([fallback])

print("\nüîÑ Running chain with fallback 5 times:")
for i in range(5):
    result = robust_chain.invoke(f"test_{i}")
    print(f"   Run {i+1}: {result}")

# Real-world example: Model fallback
print("\n" + "=" * 60)
print("Model Fallback Pattern")
print("=" * 60)

print("""
# Primary: Use GPT-4o
primary_model = init_chat_model("gpt-4o", model_provider="openai")

# Fallback: Use GPT-4o-mini (cheaper, faster)
fallback_model = init_chat_model("gpt-4o-mini", model_provider="openai")

# Chain with fallback
robust_model = primary_model.with_fallbacks([fallback_model])

# If GPT-4o fails (rate limit, error), automatically tries GPT-4o-mini
""")

In [None]:
# =============================================================================
# RunnableBranch - Conditional Logic
# =============================================================================
"""
RunnableBranch
--------------
Routes to different chains based on conditions.
Like an if-elif-else for chains.

Structure:
RunnableBranch(
    (condition1, chain1),
    (condition2, chain2),
    default_chain
)
"""

from langchain_core.runnables import RunnableBranch

print("=" * 60)
print("RunnableBranch - Conditional Routing")
print("=" * 60)

# Different prompts for different question types
math_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a math tutor. Show step-by-step solutions."),
    ("human", "{question}")
])

code_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a coding expert. Provide code examples."),
    ("human", "{question}")
])

general_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{question}")
])

# Create chains
math_chain = math_prompt | model | parser
code_chain = code_prompt | model | parser
general_chain = general_prompt | model | parser

# Route based on question content
def is_math_question(x):
    keywords = ["calculate", "solve", "math", "equation", "sum", "+", "-", "*", "/"]
    return any(kw in x["question"].lower() for kw in keywords)

def is_code_question(x):
    keywords = ["code", "python", "javascript", "function", "program", "debug"]
    return any(kw in x["question"].lower() for kw in keywords)

# Create branching logic
router = RunnableBranch(
    (is_math_question, math_chain),
    (is_code_question, code_chain),
    general_chain  # Default
)

# Test different question types
questions = [
    {"question": "Calculate 15% of 200"},
    {"question": "Write a Python function to reverse a string"},
    {"question": "What is the capital of France?"},
]

for q in questions:
    result = router.invoke(q)
    route = "MATH" if is_math_question(q) else "CODE" if is_code_question(q) else "GENERAL"
    print(f"\n‚ùì Question: {q['question']}")
    print(f"üîÄ Route: {route}")
    print(f"üí¨ Answer: {result[:100]}...")

In [None]:
# =============================================================================
# Summary: LCEL in LangChain 1.0+
# =============================================================================
"""
=====================================================================
KEY TAKEAWAYS - LangChain Expression Language
=====================================================================

1. PIPE OPERATOR (|):
   ------------------
   chain = prompt | model | parser
   result = chain.invoke({"input": "..."})

2. CORE RUNNABLES:
   ----------------
   - RunnablePassthrough - Pass data through unchanged
   - RunnablePassthrough.assign() - Add new keys
   - RunnableParallel - Run multiple chains concurrently
   - RunnableLambda - Wrap any function
   - RunnableBranch - Conditional routing

3. EXECUTION METHODS:
   ------------------
   chain.invoke(input)       # Single input
   chain.batch([inputs])     # Multiple inputs
   chain.stream(input)       # Stream output
   chain.ainvoke(input)      # Async single
   chain.abatch([inputs])    # Async batch
   chain.astream(input)      # Async stream

4. ERROR HANDLING:
   ----------------
   chain.with_fallbacks([backup_chain])
   chain.with_retry(stop_after_attempt=3)

5. COMMON PATTERNS:
   -----------------
   # Basic chain
   prompt | model | parser
   
   # Parallel processing
   RunnableParallel({"a": chain1, "b": chain2})
   
   # Add context
   RunnablePassthrough.assign(context=get_context) | prompt | model
   
   # Conditional routing
   RunnableBranch((condition, chain), default)

Common Imports:
---------------
from langchain_core.runnables import (
    RunnablePassthrough,
    RunnableParallel,
    RunnableLambda,
    RunnableBranch,
)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

=====================================================================
"""

print("=" * 60)
print("LCEL Module Complete!")
print("=" * 60)
print("""
LCEL is the foundation of LangChain. Master these patterns:

1. prompt | model | parser (basic chain)
2. RunnableParallel for concurrent processing
3. RunnablePassthrough.assign() for adding data
4. RunnableLambda for custom functions
5. RunnableBranch for routing logic
6. .with_fallbacks() for error handling

Next: 8-streaming.ipynb - Streaming and async patterns
""")