In [11]:
# LangChain Expression Language (LCEL)
 # It is syntax within the LangChain framework to build and orchestrate chains of operations involving language models, tools, and other components.

In [10]:
# LangChain Expression Language (LCEL)
    # - Superfast development of chains.
    # - Advanced features such as streaming, async, parallel execution, and more.
    # - Easy integration with LangSmith and LangServe.
    # - Optimization
        # - Parallel Execution: Run multiple components or inputs concurrently.
        # - Async Support: Chains can be executed asynchronously.
        # Streaming: Supports streaming outputs for faster response times.

# Ex: 

    # chain = prompt | model | output_parser
    # result = chain.invoke({"topic": "Artificial Intelligence"})

| Type              | Description                                      | Use Case Example                          |
|-------------------|--------------------------------------------------|-------------------------------------------|
| Linear            | Simple left-to-right chaining                    | Prompt → Model → Parser                   |
| Sequential        | Multi-step with intermediate logic               | Multi-turn conversations                  |
| Parallel          | Run multiple chains at once                      | Summarize + Extract keywords              |
| Branching         | Conditional logic to choose chain                | Route to math or QA chain                 |
| Map-Reduce        | Process many inputs and combine results          | Document summarization                    |
| Streaming         | Output tokens as they are generated              | Chatbots, real-time apps                  |
| Tool-Using        | Use external tools in the chain                  | Search, calculator, API calls             |


In [None]:
### 01 Linear Type of LCEL

# A Linear LCEL chain is a series of steps where:

    # Each component takes the output of the previous one as its input.
    # The flow is strictly left-to-right.
    # There’s no branching, looping, or parallelism.

In [61]:
# ## Transitioning from Old class to New Pipe Base Operator

# ## 1. Understanding `Runnables`
# - `Runnables` are self-contained units of work.
# - Can be executed in isolation or combined for complex operations.
# - Provides flexibility in execution (sync, async, parallel).

# ## 2. `RunnableParallel`
# - Executes tasks concurrently.
# - Useful for performance enhancement in scenarios where tasks can run independently.
# - Syntax example:
#     ```python
#     from some_module import RunnableParallel
#     ```

# ## 3. `RunnablePassthrough`
# - A simple `Runnable` that passes inputs directly to outputs without modification.
# - Helpful for debugging or chaining in pipelines.
# - Example use case:
#     ```python
#     from some_module import RunnablePassthrough
#     passthrough = RunnablePassthrough()
#     result = passthrough.run(input_data)
#     ```

# ## 4. `RunnableLambda`
# - Allows quick, inline definitions of small, custom functions.
# - Example:
#     ```python
#     from some_module import RunnableLambda
#     lambda_op = RunnableLambda(lambda x: x * 2)
#     result = lambda_op.run(5)  # Output: 10
#     ```

# ## 5. Assign Functions
# - Used to assign values or parameters during execution.
# - Useful in data pipelines to update intermediate values.

# ## 6. Performance Improvement (Inference Speed)
# - Focus on optimizing the inference speed by leveraging parallel execution.
# - Use `RunnableParallel` or batching techniques.
# - Consider optimizing data pipelines by removing unnecessary steps.

# ## 7. Async Invoke
# - Executes operations asynchronously, improving the overall throughput of the system.
# - Syntax example:
#     ```python
#     async def async_operation():
#         result = await some_async_function()
#     ```

# ## 8. Batch Support
# - Handles multiple inputs at once to improve performance.
# - Can be combined with `RunnableParallel` for parallel batch execution.

# ## 9. Async Batch Execution
# - Combines asynchronous execution with batch processing for high-performance tasks.
# - Reduces overall execution time for larger datasets.

# ## 10. Using `Itemgetter` with `LCEL`
# - `Itemgetter` is used to extract specific items from collections.
# - When combined with `LCEL` (LangChain Execution Layer), it can streamline complex operations.

# ## 11. Bind Tools
# - `Bind` tools help to connect different steps in the pipeline.
# - Ensures smooth data flow between various `Runnable` components.

# ## 12. Stream Support
# - Keep your pipelines more responsive by incorporating stream support for data.
# - This allows continuous data processing and near real-time outputs.

In [None]:
from utils import read_json
from utils import (get_model_openAI,get_model_groq,get_openAI_embeddings)

In [None]:
keys = read_json("keys.json")

In [None]:

# Initialize the OpenAI chat model
llm = get_model_openAI(model ="gpt-4")

In [13]:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

prompt = PromptTemplate.from_template("Tell me a joke about {topic}")
model = get_model_openAI(model ="gpt-4")
parser = StrOutputParser()

chain = prompt | model | parser
result = chain.invoke({"topic": "cats"})

print(result)

Why don't cats play poker in the jungle?

Because there's too many cheetahs!


In [23]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI

prompt_template = "Tell me a {adjective} joke"

prompt = PromptTemplate(
    input_variables=["adjective"], 
    template=prompt_template
)

chain = prompt | llm | StrOutputParser()

chain.invoke("your adjective here")

'Why don\'t some adjectives ever go to dinner?\n\nBecause they\'re too "superficial".'

In [41]:
from langchain.chat_models import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


prompt = ChatPromptTemplate.from_template(
    "Tell me a short joke about {topic}"
)
output_parser = StrOutputParser()
chain = (
    {"topic": RunnablePassthrough()} 
    | prompt
    | model
    | output_parser
)

chain.invoke("ice cream")

'Why did the ice cream truck break down?\n\nBecause it had too many sundaes!'

In [27]:

# Use Case: Smart Q&A with Conditional Rephrasing  where 'intermediate logic' is needed
# Let’s say you’re building a Q&A system where:

    # 1.If the user’s question is too short or vague, you rephrase it to make it clearer.
    # 2.Then you retrieve documents based on the (possibly rephrased) question.
    # 3.Finally, you generate an answer using an LLM.

# This requires intermediate logic to:
    
    # Check the length or clarity of the question.
    # Decide whether to rephrase it or not.

from langchain_core.runnables import RunnableLambda, RunnableSequence


# Step 1: Intermediate logic to rephrase if needed
def maybe_rephrase(input):
    question = input["question"]
    if len(question.split()) < 5:
        # Rephrase short questions
        return {"question": f"Can you elaborate on: {question}?"}
    return {"question": question}

rephrase_logic = RunnableLambda(maybe_rephrase)

# Step 2: Document retrieval (mocked here)
retriever = RunnableLambda(lambda x: {"docs": ["doc1", "doc2"], "question": x["question"]})

# Step 3: Answer generation (mocked)
answer_generator = RunnableLambda(lambda x: f"Answering based on: {x['docs']} for question: {x['question']}")

# Full chain
chain = RunnableSequence(rephrase_logic,retriever,answer_generator)

# Run it
result = chain.invoke({"question": "Python?"})
print(result)


Answering based on: ['doc1', 'doc2'] for question: Can you elaborate on: Python??


In [None]:
# 02 : Sequential LCEL
    # Use Sequential LCEL when your task involves multiple stages, intermediate logic, or complex data flow.
    # Allows intermediate logic and transformations in between steps


# When to Use Sequential LCEL
    # When you need intermediate processing between steps
    # When steps have multiple inputs or outputs
    # When you want clear structure and traceability


In [14]:
# In LangChain, a RunnableSequence is a powerful composition operator that allows you to chain together multiple runnables—components like 
# prompt templates, models, and output parsers—so that the output of one becomes the input of the next.

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)

# Create a sequence
sequence = runnable_1 | runnable_2

# Run it
result = sequence.invoke(1)  # Output: 4

print(result)

4


In [15]:
from dotenv import load_dotenv
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain.schema.runnable import RunnableLambda, RunnableSequence


# Initialize the OpenAI chat model
model  = get_model_openAI(model = "gpt-3.5-turbo")

# Define prompt templates (no need for separate Runnable chains)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a comedian who tells jokes about {topic}."),
        ("human", "Tell me {joke_count} jokes."),
    ]
)

# Create the combined chain using LangChain Expression Language (LCEL)
chain = prompt_template | model | StrOutputParser()
# chain = prompt_template | model

# Run the chain
result = chain.invoke({"topic": "lawyers", "joke_count": 3})

# Create individual runnables (steps in the chain)
format_prompt = RunnableLambda(lambda x: prompt_template.format_prompt(**x))
invoke_model = RunnableLambda(lambda x: model.invoke(x.to_messages()))
parse_output = RunnableLambda(lambda x: x.content)

# Create the RunnableSequence (equivalent to the LCEL chain)
chain = RunnableSequence(first=format_prompt, middle=[invoke_model], last=parse_output)

# Run the chain
response = chain.invoke({"topic": "lawyers", "joke_count": 3})

# Output
print(response)


1. Why did the lawyer go to the bank? To get his case settled!

2. How many lawyer jokes are there? Only three. The rest are true stories.

3. Why did the lawyer wear a suit to the job interview? Because he wanted to make a good case for himself!


In [None]:
# In LangChain, a Parallel Chain (also known as RunnableParallel) is a type of chain that allows you to run multiple Runnables simultaneously, 
# rather than sequentially. 

# This is useful when you want to perform independent tasks at the same time and then combine their results.

# What is a Parallel Chain?
    # It executes multiple Runnables in parallel.
    # Each Runnable receives the same input.
    # The outputs are returned as a dictionary, mapping each Runnable to its result.

# Why Use It?
    # To speed up workflows by running tasks concurrently.
    # To gather multiple perspectives or outputs from different models/tools.
    # To fan out a single input to multiple processing paths.

# Real-World Use Case
    # Imagine you want to:
        # Summarize a document.
        # Extract keywords.
        # Translate it.

In [16]:

from langchain_core.runnables import RunnableLambda, RunnableParallel

# Define two simple Runnables
r1 = RunnableLambda(lambda x: f"Uppercase: {x.upper()}")
r2 = RunnableLambda(lambda x: f"Reversed: {x[::-1]}")

# Create a parallel chain
parallel_chain = RunnableParallel({
    "upper": r1,
    "reverse": r2
})

# Run it
result = parallel_chain.invoke("langchain")
print(result)


{'upper': 'Uppercase: LANGCHAIN', 'reverse': 'Reversed: niahcgnal'}


In [18]:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import SimpleJsonOutputParser

prompt = PromptTemplate.from_template(
    "In JSON format, give me a list of {topic} and their names in French, Spanish, and Cat Language."
)

parser = SimpleJsonOutputParser()

chain = prompt | model | parser

# Streaming output
async for chunk in chain.astream({'topic': 'colors'}):
    print(chunk)


In [19]:
from dotenv import load_dotenv
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableParallel, RunnableLambda
from langchain_openai import ChatOpenAI

# Define prompt template
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are an expert product reviewer."),
        ("human", "List the main features of the product {product_name}."),
    ]
)

# Define pros analysis step
def analyze_pros(features):
    pros_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are an expert product reviewer."),
            (
                "human",
                "Given these features: {features}, list the pros of these features.",
            ),
        ]
    )
    return pros_template.format_prompt(features=features)


# Define cons analysis step
def analyze_cons(features):
    cons_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are an expert product reviewer."),
            (
                "human",
                "Given these features: {features}, list the cons of these features.",
            ),
        ]
    )
    return cons_template.format_prompt(features=features)


# Combine pros and cons into a final review
def combine_pros_cons(pros, cons):
    return f"Pros:\n{pros}\n\nCons:\n{cons}"


# Simplify branches with LCEL
pros_branch_chain = (
    RunnableLambda(lambda x: analyze_pros(x)) | model | StrOutputParser()
)

cons_branch_chain = (
    RunnableLambda(lambda x: analyze_cons(x)) | model | StrOutputParser()
)

# Create the combined chain using LangChain Expression Language (LCEL)
chain = (
    prompt_template
    | model
    | StrOutputParser()
    | RunnableParallel(branches={"pros": pros_branch_chain, "cons": cons_branch_chain})
    | RunnableLambda(lambda x: combine_pros_cons(x["branches"]["pros"], x["branches"]["cons"]))
)

# Run the chain
result = chain.invoke({"product_name": "MacBook Pro"})

# Output
print(result)

Pros:
Based on the features provided, here are the pros of the MacBook Pro:

1. **Retina display**: Provides a high-quality viewing experience with vibrant colors and sharp detail, making it ideal for visual tasks like graphic design or watching movies.
   
2. **Powerful performance**: Ensures fast and smooth operation, allowing for multitasking and handling demanding applications with ease.

3. **Touch Bar**: Offers a convenient way to access shortcuts and controls, enhancing productivity and customization options for different tasks.

4. **Thunderbolt 3 ports**: Enables high-speed data transfer and connectivity with a wide range of peripherals, providing flexibility for various usage scenarios.

5. **macOS operating system**: Known for its user-friendly interface and seamless integration with other Apple devices, along with a wide selection of built-in apps for productivity and creativity.

6. **Long battery life**: Provides up to 10 hours of usage on a single charge, allowing for ex

In [28]:
# Chain Type — Branching:
# In LCEL (LangChain Expression Language), Branching refers to a pattern where the execution path splits based on \
# some condition or logic, allowing different chains or components to be run depending on the input.

# What is Branching in LCEL?
    # Branching allows you to:
        # Route inputs to different chains based on conditions
        # Handle different types of tasks (e.g., math vs. text)
        # Customize behavior dynamically

# Prompt Templates for Different Feedback Types:


In [33]:
from langchain_core.runnables import RunnableLambda, RunnableBranch

# Define condition functions
def is_math(input):
    return "calculate" in input["question"].lower()

def is_greeting(input):
    return input["question"].lower() in ["hi", "hello", "hey"]

def is_time_question(input):
    return "time" in input["question"].lower()

# Define mock chains
math_chain = RunnableLambda(lambda x: "🧮 Using calculator...")
greeting_chain = RunnableLambda(lambda x: "👋 Hello! How can I help you?")
time_chain = RunnableLambda(lambda x: "⏰ It's always a good time to learn!")
qa_chain = RunnableLambda(lambda x: "💬 Using language model for general Q&A...")

# Branching logic
branch = RunnableBranch(
    (is_math, math_chain),
    (is_greeting, greeting_chain),
    (is_time_question, time_chain),
    qa_chain  # default
)

# Example run
result = branch.invoke({"question": "What time is it?"})
print(result)


⏰ It's always a good time to learn!


In [None]:
from dotenv import load_dotenv
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableBranch
from langchain_openai import ChatOpenAI


# Define prompt templates for different feedback types
positive_feedback_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        ("human",
         "Generate a thank you note for this positive feedback: {feedback}."),
    ]
)

negative_feedback_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        ("human",
         "Generate a response addressing this negative feedback: {feedback}."),
    ]
)

neutral_feedback_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        (
            "human",
            "Generate a request for more details for this neutral feedback: {feedback}.",
        ),
    ]
)

escalate_feedback_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        (
            "human",
            "Generate a message to escalate this feedback to a human agent: {feedback}.",
        ),
    ]
)

# Define the feedback classification template
classification_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        ("human",
         "Classify the sentiment of this feedback as positive, negative, neutral, or escalate: {feedback}."),
    ]
)

# Define the runnable branches for handling feedback
branches = RunnableBranch(
    (
        lambda x: "positive" in x,
        positive_feedback_template | model | StrOutputParser()  # Positive feedback chain
    ),
    (
        lambda x: "negative" in x,
        negative_feedback_template | model | StrOutputParser()  # Negative feedback chain
    ),
    (
        lambda x: "neutral" in x,
        neutral_feedback_template | model | StrOutputParser()  # Neutral feedback chain
    ),
    escalate_feedback_template | model | StrOutputParser()
)

# Create the classification chain
classification_chain = classification_template | model | StrOutputParser()

# Combine classification and response generation into one chain
chain = classification_chain | branches

# Run the chain with an example review
# Good review - "The product is excellent. I really enjoyed using it and found it very helpful."
# Bad review - "The product is terrible. It broke after just one use and the quality is very poor."
# Neutral review - "The product is okay. It works as expected but nothing exceptional."
# Default - "I'm not sure about the product yet. Can you tell me more about its features and benefits?"

review = "The product is terrible. It broke after just one use and the quality is very poor."
result = chain.invoke({"feedback": review})

# Output the result
print(result)

In [None]:
# What is Map-Reduce in LCEL?
    # Map Phase: Apply a chain to each item in a list (e.g., each document).
    # Reduce Phase: Combine the outputs into a single result (e.g., a summary or answer).

# the Map-Reduce pattern is used to process multiple inputs in parallel and then combine the results into a final output. 
# This is especially useful for tasks like:
    # Document summarization
    # Multi-document question answering
    # Aggregating results from multiple sources


# Use Case: Multi-Document Summarization
    # You have a list of documents, and you want to:
        # Summarize each document individually (Map phase)
        # Combine those summaries into a single final summary (Reduce phase)

In [21]:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# Create two different analysis prompts
sentiment_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a sentiment analysis expert. Analyze the emotional tone."),
    ("user", "What's the sentiment of: {text}")
])

summary_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a summarization expert."),
    ("user", "Summarize in one sentence: {text}")
])

# Use RunnableParallel to run both analyses simultaneously
analysis_chain = RunnableParallel(
    {
        "sentiment": sentiment_prompt | model | StrOutputParser(),
        "summary": summary_prompt | model  | StrOutputParser(),
        "original": RunnablePassthrough()  # Pass through the original input
    }
)

# Test it
sample_text = {"text": "The product exceeded my expectations. Great quality!"}
results = analysis_chain.invoke(sample_text)

print("Sentiment:", results["sentiment"])
print("Summary:", results["summary"])
print("Original:", results["original"]["text"])

Sentiment: The sentiment of the sentence is positive. The use of words such as "exceeded my expectations" and "great quality" convey a sense of satisfaction and delight, indicating a positive emotional tone towards the product.
Summary: The product surpassed expectations and is of excellent quality.
Original: The product exceeded my expectations. Great quality!


In [45]:
from langchain_core.runnables import RunnablePassthrough
from langchain.prompts import PromptTemplate
from langchain_community.llms import OpenAI

USER_INPUT = "colorful socks"

prompt_template_product = "What is a good name for a company that makes {product}?"
prompt_template_business = "Give me the best business model idea for my company named: {company}"

chain = (
  PromptTemplate.from_template(prompt_template_product)
  | model
  | {'company': RunnablePassthrough()}
  | PromptTemplate.from_template(prompt_template_business)
  | model
)

business_model_output = chain.invoke({'product': 'colorful socks'})

business_model_output.content

'One potential business model idea for Rainbow Threads could be to offer a subscription-based service where customers receive a monthly curated box of colorful and unique threads for their crafting projects. Customers could choose from different subscription tiers based on their needs and budget, and each box could include a variety of high-quality threads in different colors and textures. This model would not only generate recurring revenue for Rainbow Threads but also provide customers with a convenient and exciting way to discover new threads for their projects. Additionally, Rainbow Threads could offer add-on products such as exclusive patterns or tutorials to further enhance the value of the subscription service.'

In [None]:
# In the code you shared, RunnablePassthrough() is a utility from LangChain that passes its input through unchanged.
# It’s used when you want to retain or forward a piece of data in a chain without modifying it.

In [55]:
from langchain_core.runnables import RunnableLambda

def add_five(x):
    return x + 5

def multiply_by_two(x):
    return x * 2

# wrap the functions with RunnableLambda
add_five = RunnableLambda(add_five)
multiply_by_two = RunnableLambda(multiply_by_two)

chain = add_five | multiply_by_two

chain.invoke(3)

16

In [51]:
class Runnable:
  def __init__(self, func):
    self.func = func

  def __or__(self, other):
    def chained_func(*args, **kwargs):
      # self.func is on the left, other is on the right
      return other(self.func(*args, **kwargs))
    return Runnable(chained_func)

  def __call__(self, *args, **kwargs):
    return self.func(*args, **kwargs)

def add_ten(x):
  return x + 10

def divide_by_two(x):
  return x / 2

runnable_add_ten = Runnable(add_ten)
runnable_divide_by_two = Runnable(divide_by_two)
chain = runnable_add_ten | runnable_divide_by_two
result = chain(8) # (8+10) / 2 = 9.0 should be the answer

result

9.0

In [56]:
def extract_fact(x):
    if "\n\n" in x:
        return "\n".join(x.split("\n\n")[1:])
    else:
        return x
    
get_fact = RunnableLambda(extract_fact)

chain = prompt | model | output_parser | get_fact

chain.invoke({"topic": "Artificial Intelligence"})

"Why did the AI break up with his girlfriend? Because he couldn't handle her emotional baggage!"