# **LangChain Expression Language (LCEL):**


LangChain Expression Language (LCEL) is a tool introduced in LangChain to simplify the process of creating, organizing, and managing complex workflows or "chains" in language model applications. **LCEL provides a structured way to define, execute, and control sequences of tasks, enhancing flexibility and allowing for more sophisticated branching, merging, and parallel processing.**<br><br>

**LCEL describes by:**<br>
* Simplifying the creation of complex chains with minimal code.
* Allowing **modular components** that can be reused across different workflows.
* Supporting **parallel** and **asynchronous processing**, this optimizes performance.
* Enabling **easy debugging** and **intuitive syntax** for readability.





In [None]:
# Install the Dependencies:


!pip install langchain -qU
!pip install langchain_community -qU
!pip install langchain_google_genai -qU
!pip install boto3 -qU
!pip install "pinecone[grpc]" -qU
!pip install langchain_chroma -qU

## **Basic Functions:**

* StrOutputParser
* RunnablePassthrough
* RunnableLambda
* RunnableParallel
* Pipe (**|**) Operator
* assign
* ainvoke
* batch, abatch
* stream


In [2]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough , RunnableLambda

In [3]:
# StrOutputParser:


chain = StrOutputParser()

input = "Hello All"
chain.invoke(input)

'Hello All'

In [4]:
# RunnablePassthrough:

chain = RunnablePassthrough()
chain.invoke("Hello All")

'Hello All'

In [5]:
chain = RunnablePassthrough() | RunnablePassthrough() | RunnablePassthrough()
chain.invoke("Hello All")

'Hello All'

In [6]:
# RunnableLambda:

def string_upper(input):
  return input.upper()

chain = RunnableLambda(string_upper)
chain.invoke("Hello All")

'HELLO ALL'

In [7]:
# RunnableLambda:

chain = RunnablePassthrough() | RunnableLambda(string_upper)
chain.invoke("Hello All")

'HELLO ALL'

In [10]:
# RunnableParallel:

chain = RunnableParallel({'x': RunnablePassthrough(), 'y': RunnablePassthrough()})
chain.invoke("Dibyendu")

{'x': 'Dibyendu', 'y': 'Dibyendu'}

In [11]:
# RunnableParallel + RunnableLambda:

chain = RunnableParallel({
    'x': RunnablePassthrough() | RunnableLambda(string_upper),
    'y': RunnablePassthrough()
})

chain.invoke("Dibyendu")

{'x': 'DIBYENDU', 'y': 'Dibyendu'}

In [12]:
# RunnableParallel:

chain = RunnableParallel({'x': RunnablePassthrough(), 'y': RunnablePassthrough()})
chain.invoke(
    {
        'fname': 'Dibyendu',
        'lname': 'Biswas'
    }
)

{'x': {'fname': 'Dibyendu', 'lname': 'Biswas'},
 'y': {'fname': 'Dibyendu', 'lname': 'Biswas'}}

In [13]:
# RunnableParallel + RunnablePassthrough:

chain = RunnableParallel(
  {'x': RunnablePassthrough(), 'Blog': lambda x:x['Blog']}
)

chain.invoke({'fname':'Dibyendu', 'lname': 'Biswas', 'Blog': 'dibyendubiswas1998'})

{'x': {'fname': 'Dibyendu', 'lname': 'Biswas', 'Blog': 'dibyendubiswas1998'},
 'Blog': 'dibyendubiswas1998'}

In [14]:
# RunnableParallel + RunnableLambda + RunnablePassthrough:

def fetch_website(input: dict):
    output = input.get('Website','Not found')
    return output


chain = RunnableParallel(
    {'Website': RunnablePassthrough() | RunnableLambda(fetch_website), 'Blog': lambda x:x['Blog']}
)
chain.invoke({'fname':'Dibyendu', 'lname': 'Biswas', 'Blog': 'dibyendubiswas1998'})

{'Website': 'Not found', 'Blog': 'dibyendubiswas1998'}

In [15]:
chain.invoke({'fname':'Dibyendu', 'lname': 'Biswas', 'Blog': 'dibyendubiswas1998', 'Website': 'abc.com'})

{'Website': 'abc.com', 'Blog': 'dibyendubiswas1998'}

In [17]:
# assign:

chain = RunnableParallel({'X': RunnablePassthrough()}).assign(Y=lambda x: x['X'].upper())
chain.invoke("hello all !!")

{'X': 'hello all !!', 'Y': 'HELLO ALL !!'}

In [19]:
# assign:

def extra_func(input):
    return 'dibyendubiswas1998@gail.com'

chain = RunnableParallel({'Name': RunnablePassthrough()}).assign(Email=RunnableLambda(extra_func))
chain.invoke("Dibyendu Biswas")

{'Name': 'Dibyendu Biswas', 'Email': 'dibyendubiswas1998@gail.com'}

## **Using LLM, Retriever, etc:**

### **Load LLM, Embedding Model:**

In [None]:
# Get Embeddings:

import boto3
from langchain.llms.bedrock import Bedrock
from langchain.embeddings import BedrockEmbeddings
from google.colab import userdata
from langchain_google_genai import ChatGoogleGenerativeAI


AWS_REGION = ''
AWS_ACCESS_KEY = ''
AWS_SECRET_KEY = ''

def get_embeddings():
  try:
    bedrock_client = boto3.client(
          service_name = "bedrock-runtime",
          region_name = AWS_REGION,
          aws_access_key_id = AWS_ACCESS_KEY,
          aws_secret_access_key = AWS_SECRET_KEY,
      )
    # The model_id was incorrect. Changing it to amazon.titan-text-embed-v1
    bedrock_embedding = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1", client=bedrock_client)
    return bedrock_embedding

  except Exception as ex:
    return ex


# Get LLM:
GEMINI_API_KEY = userdata.get("GEMINI_API_KEY")

llm = ChatGoogleGenerativeAI(
    model="gemini-pro",
    google_api_key=GEMINI_API_KEY,
    temperature=0.1,
    max_tokens=1024,
    max_length=1024,
)

embedding_model = get_embeddings()

### **Document Loading:**

In [None]:
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
data = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=300)
all_splits = text_splitter.split_documents(data)

### **Vector Store:**

In [25]:
from langchain_chroma import Chroma

vectorstore = Chroma.from_documents(documents=all_splits, embedding=embedding_model)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

#### **Example 01:**

In [58]:
from operator import itemgetter
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough
from IPython.display import display, Markdown
import time

template = """Your are a helpful AI assistant, your name is Lili, designed by Dibyendu Biswas, an AI/ML Engineer.
Answer the question based only on the following context: {context}

Question: {question}
"""
prompt = PromptTemplate.from_template(template)


retrieveal_chain = (
    RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
    | prompt
    | llm
    | StrOutputParser()
    )

# def generate_response(query):
#   start_time = time.time()
#   response = retrieveal_chain.invoke(query)
#   display(Markdown(response))
#   print(f"Time taken: {round(time.time() - start_time, 4)} seconds")

In [59]:
# Synchronous Call:

start_time = time.time()
response = retrieveal_chain.invoke("What are the approaches to Task Decomposition?")
print(f"Time taken: {round(time.time() - start_time, 4)} seconds")
display(Markdown(response))

Time taken: 1.4034 seconds


1. Chain of thought (CoT)
2. Tree of Thoughts
3. LLM with simple prompting
4. Task-specific instructions
5. Human inputs
6. LLM+P

In [60]:
# Batch output (Synchronous Class):

start_time = time.time()

batch_output = retrieveal_chain.batch([
                        "what is Memory?",
                        "Tell me something about Agent"
                       ])

print('Time taken:',time.time() - start_time)

Time taken: 2.6368277072906494


In [61]:
batch_output

['Memory can be defined as the processes used to acquire, store, retain, and later retrieve information.',
 'In a LLM-powered autonomous agent system, LLM functions as the agent’s brain, complemented by several key components:\n- Memory stream: is a long-term memory module (external database) that records a comprehensive list of agents’ experience in natural language.\n- Retrieval model: surfaces the context to inform the agent’s behavior, according to relevance, recency and importance.\n- Reflection mechanism: synthesizes memories into higher level inferences over time and guides the agent’s future behavior.\n- Planning & Reacting: translate the reflections and the environment information into actions.']

In [64]:

start_time = time.time()

batch_output = await retrieveal_chain.abatch([
                        "what is memory?",
                        "Tell me something about Agent"
                       ])

print('Time taken:',time.time() - start_time)

Time taken: 2.6153481006622314


In [65]:
batch_output

['Memory can be defined as the processes used to acquire, store, retain, and later retrieve information.',
 'In a LLM-powered autonomous agent system, LLM functions as the agent’s brain, complemented by several key components:\n- Memory stream: is a long-term memory module (external database) that records a comprehensive list of agents’ experience in natural language.\n- Retrieval model: surfaces the context to inform the agent’s behavior, according to relevance, recency and importance.\n- Reflection mechanism: synthesizes memories into higher level inferences over time and guides the agent’s future behavior.\n- Planning & Reacting: translate the reflections and the environment information into actions.']

#### **Example 02:**

In [67]:
from operator import itemgetter

template = """Your are a helpful AI assistant, your name is Lili, designed by Dibyendu Biswas, an AI/ML Engineer.
Answer the question based only on the following context: {context}

Question: {question}
Answer in the following language: {language}
"""
prompt = PromptTemplate.from_template(template)


retrieval_chain = (
    RunnableParallel({"context": itemgetter('question') | retriever,
                       "question": itemgetter('question'),
                       "language": itemgetter('language')
                       })
    | prompt
    | llm
    | StrOutputParser()
    )

In [69]:
start_time = time.time()
response = retrieval_chain.invoke(
    {
        'question': "What is Chain of Thoughts?",
        'language': "Bengali"
    })

print('Time taken:',time.time() - start_time)
print(response)

Time taken: 3.876643419265747
চিন্তার শৃঙ্খল হল একটি মানক প্রম্পটিং কৌশল যা জটিল কাজগুলিতে মডেলের কর্মক্ষমতা বাড়ানোর জন্য ব্যবহৃত হয়। মডেলকে "ধাপে ধাপে চিন্তা করার" নির্দেশ দেওয়া হয় যাতে কঠিন কাজগুলিকে ছোট এবং সহজ ধাপে বিভক্ত করতে আরও পরীক্ষার সময়ের গণনা ব্যবহার করা যায়। CoT বড় কাজগুলিকে একাধিক পরিচালনাযোগ্য কাজে রূপান্তর করে এবং মডেলের চিন্তা প্রক্রিয়ার একটি ব্যাখ্যা দেয়।


In [None]:
# Asynchronous Calls:

start_time = time.time()
response = retrieval_chain.ainvoke(
    {
        'question': "What is Chain of Thoughts?",
        'language': "English"
    })

print('Time taken:',time.time() - start_time)

#### **Example 03:**

In [73]:
template = 'Hi! I am learning {skill}. Can you suggest me top 5 things to learn?\n'

prompt = PromptTemplate.from_template(template=template)

chain = prompt | llm

In [74]:
for s in chain.stream({'skill':'Big Data'}):
    print(s.content,end='')

**Top 5 Essential Concepts for Big Data Learning:**

1. **Data Management and Storage:** Understand different data storage technologies (e.g., Hadoop Distributed File System, NoSQL databases) and data management techniques (e.g., data ingestion, data cleansing, data governance).

2. **Data Processing and Analytics:** Learn about data processing frameworks (e.g., Apache Spark, Apache Flink) and analytical techniques (e.g., machine learning, statistical analysis) for extracting insights from large datasets.

3. **Data Visualization and Communication:** Develop skills in visualizing and communicating data effectively through tools like Tableau, Power BI, and Jupyter Notebooks. This helps stakeholders understand complex data patterns and make informed decisions.

4. **Cloud Computing and Big Data Platforms:** Familiarize yourself with cloud computing platforms (e.g., AWS, Azure, GCP) and their offerings for big data storage, processing, and analytics.

5. **Data Security and Privacy:** Und