In [70]:
from IPython.display import clear_output
clear_output(wait=True)

In [71]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel

In [73]:
chain = RunnablePassthrough()
print(chain.invoke("abcd"))

abcd


In [74]:
def output_length(input: str):
    output = len(input)
    return output

chain = RunnableLambda(output_length)
print(chain.invoke("input to output"))

15


In [37]:
def sum(item: dict):
    return item["a"]+item["b"]

chain = RunnableLambda(sum)
chain.invoke({"a":1,"b":2})

3

In [56]:
def sum_values(a: int, b: int) -> int:
    return a + b

# Wrap the function to accept a dict and unpack it
chain = RunnableLambda(lambda item: sum_values(item["a"], item["b"]))

# Invoke with dictionary input
result = chain.invoke({"a": 1, "b": 2})
print(result)

3


In [60]:
chain = RunnableParallel(text1 = RunnablePassthrough(), length = RunnableLambda(output_length))
chain.invoke("start-tech academy")

{'text1': 'start-tech academy', 'length': 18}

In [4]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Get the API key
OPENAI_KEY = os.getenv("OPENAI_API_KEY")

In [5]:
import os

os.environ['OPENAI_API_KEY'] = OPENAI_KEY

In [6]:
from langchain_openai import ChatOpenAI

chatgpt = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

In [7]:
from langchain_core.prompts import ChatPromptTemplate

# create a prompt template to accept user queries
prompt_txt = "{query}"
prompt_template = ChatPromptTemplate.from_template(prompt_txt)

# the chain has been formatted for better readability
# you could also write this as llmchain = prompt_template | chatgpt
llmchain = (prompt_template
              |
           chatgpt)

In [8]:
response = llmchain.invoke({'query' : 'Explain Generative AI in 1 line'})
print(response.content)

Generative AI refers to algorithms that can create new content, such as text, images, or music, by learning patterns from existing data.


### RunnableLambda



In [15]:
from langchain_core.runnables import RunnableLambda

def format_user_data(user_info):
    """Process user information for downstream components"""
    return f"User: {user_info['name']} | Role: {user_info['role']} | Location: {user_info['location']}"

# Convert function to Runnable
formatter = RunnableLambda(format_user_data)

# Execute the runnable
user_data = {"name": "Sourav", "role": "Solution Architect", "location": "Bengaluru"}
result = formatter.invoke(user_data)
print(result)

User: Sourav | Role: Solution Architect | Location: Bengaluru


### RunnablePassthrough - Preserving Input Context


In [22]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate

def extract_keywords(text):
    """Simple keyword extraction"""
    words = text.lower().split()
    keywords = [word for word in words if len(word) > 4]
    return keywords[:3]  # Top 3 keywords

# Create components
prompt = ChatPromptTemplate.from_template(
    "Analyze this text: {original_text}\nKeywords found: {keywords}"
)

# Build chain that preserves original input and adds keywords
chain = {
    "original_text": RunnablePassthrough(),
    "keywords": RunnableLambda(extract_keywords)
} | prompt | chatgpt

result = chain.invoke("Building scalable data platforms requires careful architecture planning")
print(result.content)

The text "Building scalable data platforms requires careful architecture planning" emphasizes the importance of strategic design in the development of data platforms that can grow and adapt to increasing demands. 

### Analysis:

1. **Keywords**:
   - **Building**: This suggests an active process of creation and development, indicating that constructing data platforms is not a passive task but requires effort and expertise.
   - **Scalable**: This is a critical term in technology and data management, referring to the ability of a system to handle growth, whether in terms of data volume, user load, or functionality. Scalability is essential for ensuring that a platform can evolve without requiring a complete redesign.
   - **Platforms**: This term indicates a foundational technology or framework that supports various applications or services. In the context of data, it implies a comprehensive system that manages data storage, processing, and analysis.

2. **Key Themes**:
   - **Architec

### Sequential Chaining with Pipe Operator (|)

In [23]:
from langchain_core.runnables import RunnableLambda

def validate_input(data):
    """Validate and clean input data"""
    if not isinstance(data, str) or len(data.strip()) == 0:
        raise ValueError("Input must be a non-empty string")
    return data.strip()

def extract_metrics(text):
    """Extract text metrics"""
    return {
        "original_text": text,
        "character_count": len(text),
        "word_count": len(text.split()),
        "sentence_count": text.count('.') + text.count('!') + text.count('?')
    }

def generate_summary(metrics):
    """Generate a summary report"""
    return f"""
Text Analysis Summary:
- Characters: {metrics['character_count']}
- Words: {metrics['word_count']}  
- Sentences: {metrics['sentence_count']}
- Avg words per sentence: {metrics['word_count'] / max(metrics['sentence_count'], 1):.1f}
Original: "{metrics['original_text'][:50]}..."
"""

# Create sequential chain
analysis_chain = (
    RunnableLambda(validate_input) |
    RunnableLambda(extract_metrics) |
    RunnableLambda(generate_summary)
)

# Execute the chain
sample_text = "Data engineering is crucial for ML success. It involves building robust pipelines. Quality data leads to better models."
result = analysis_chain.invoke(sample_text)
print(result)


Text Analysis Summary:
- Characters: 119
- Words: 18  
- Sentences: 3
- Avg words per sentence: 6.0
Original: "Data engineering is crucial for ML success. It inv..."



### Parallel Execution with RunnableParallel


In [24]:
from langchain_core.runnables import RunnableLambda, RunnableParallel
import re

def analyze_technical_content(text):
    """Analyze technical aspects of text"""
    tech_terms = ['data', 'platform', 'architecture', 'pipeline', 'model', 'algorithm']
    found_terms = [term for term in tech_terms if term.lower() in text.lower()]
    return {
        "technical_terms": found_terms,
        "technical_density": len(found_terms) / len(text.split()) * 100
    }

def analyze_sentiment(text):
    """Simple sentiment analysis"""
    positive_words = ['good', 'great', 'excellent', 'success', 'efficient', 'robust']
    negative_words = ['bad', 'poor', 'failed', 'problem', 'issue', 'difficult']
    
    pos_count = sum(1 for word in positive_words if word in text.lower())
    neg_count = sum(1 for word in negative_words if word in text.lower())
    
    if pos_count > neg_count:
        return "Positive"
    elif neg_count > pos_count:
        return "Negative"
    else:
        return "Neutral"

def extract_entities(text):
    """Extract potential entities (simplified)"""
    # Simple pattern matching for demonstration
    entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', text)
    return list(set(entities))

# Create parallel analysis pipeline
parallel_analyzer = RunnableParallel(
    technical_analysis=RunnableLambda(analyze_technical_content),
    sentiment=RunnableLambda(analyze_sentiment),
    entities=RunnableLambda(extract_entities),
    word_count=RunnableLambda(lambda x: len(x.split()))
)

# Test the parallel execution
sample_text = "Databricks provides an excellent platform for building robust data pipelines. The architecture supports efficient model training and deployment."

result = parallel_analyzer.invoke(sample_text)
print("Parallel Analysis Results:")
for key, value in result.items():
    print(f"  {key}: {value}")


Parallel Analysis Results:
  technical_analysis: {'technical_terms': ['data', 'platform', 'architecture', 'pipeline', 'model'], 'technical_density': 27.77777777777778}
  sentiment: Positive
  entities: ['The', 'Databricks']
  word_count: 18


### Conditional Logic with RunnableBranch

In [25]:
from langchain_core.runnables import RunnableLambda, RunnableBranch

def is_data_engineering_query(text):
    """Check if query is about data engineering"""
    de_keywords = ['pipeline', 'etl', 'data warehouse', 'spark', 'kafka', 'airflow']
    return any(keyword in text.lower() for keyword in de_keywords)

def is_ml_query(text):
    """Check if query is about machine learning"""
    ml_keywords = ['model', 'algorithm', 'training', 'prediction', 'classification', 'regression']
    return any(keyword in text.lower() for keyword in ml_keywords)

def is_cloud_query(text):
    """Check if query is about cloud platforms"""
    cloud_keywords = ['aws', 'azure', 'gcp', 'cloud', 'kubernetes', 'docker']
    return any(keyword in text.lower() for keyword in cloud_keywords)

def handle_data_engineering(text):
    """Specialized handler for data engineering queries"""
    return f"🔧 Data Engineering Response: This query about '{text}' relates to building and maintaining data pipelines and infrastructure."

def handle_ml_query(text):
    """Specialized handler for ML queries"""
    return f"🤖 ML Response: This query about '{text}' involves machine learning models and algorithms."

def handle_cloud_query(text):
    """Specialized handler for cloud queries"""
    return f"☁️ Cloud Response: This query about '{text}' concerns cloud platforms and services."

def handle_general_query(text):
    """Default handler for general queries"""
    return f"💡 General Response: This is a general query about '{text}'. Please provide more specific context."

# Create conditional routing
query_router = RunnableBranch(
    (RunnableLambda(is_data_engineering_query), RunnableLambda(handle_data_engineering)),
    (RunnableLambda(is_ml_query), RunnableLambda(handle_ml_query)),
    (RunnableLambda(is_cloud_query), RunnableLambda(handle_cloud_query)),
    RunnableLambda(handle_general_query)  # Default case
)

# Test different types of queries
test_queries = [
    "How do I optimize Spark pipelines for better performance?",
    "What's the best algorithm for classification problems?",
    "How to deploy models on Azure ML?",
    "What's the weather like today?"
]

for query in test_queries:
    result = query_router.invoke(query)
    print(f"Query: {query}")
    print(f"Response: {result}\n")

Query: How do I optimize Spark pipelines for better performance?
Response: 🔧 Data Engineering Response: This query about 'How do I optimize Spark pipelines for better performance?' relates to building and maintaining data pipelines and infrastructure.

Query: What's the best algorithm for classification problems?
Response: 🤖 ML Response: This query about 'What's the best algorithm for classification problems?' involves machine learning models and algorithms.

Query: How to deploy models on Azure ML?
Response: 🤖 ML Response: This query about 'How to deploy models on Azure ML?' involves machine learning models and algorithms.

Query: What's the weather like today?
Response: 💡 General Response: This is a general query about 'What's the weather like today?'. Please provide more specific context.



### Streaming Responses


In [26]:
from langchain_core.runnables import RunnableLambda
import time

def streaming_processor(text):
    """Process text and yield results incrementally"""
    words = text.split()
    processed_words = []
    
    for word in words:
        processed_words.append(word.upper())
        # Simulate processing time
        time.sleep(0.1)
        # Yield intermediate result
        yield f"Processed: {' '.join(processed_words)}"

# Create streaming runnable
streaming_runnable = RunnableLambda(streaming_processor)

# Stream the results
print("Streaming processing:")
for chunk in streaming_runnable.stream("building scalable data solutions"):
    print(f"  {chunk}")

Streaming processing:
  Processed: BUILDING
  Processed: BUILDING SCALABLE
  Processed: BUILDING SCALABLE DATA
  Processed: BUILDING SCALABLE DATA SOLUTIONS


### Batch Processing

In [27]:
from langchain_core.runnables import RunnableLambda

def analyze_query_complexity(query):
    """Analyze the complexity of a user query"""
    word_count = len(query.split())
    char_count = len(query)
    
    if word_count <= 3:
        complexity = "Simple"
    elif word_count <= 10:
        complexity = "Medium"
    else:
        complexity = "Complex"
    
    return {
        "query": query[:30] + "..." if len(query) > 30 else query,
        "word_count": word_count,
        "complexity": complexity
    }

# Create analyzer
complexity_analyzer = RunnableLambda(analyze_query_complexity)

# Process multiple queries at once
queries = [
    "Hello",
    "How do I setup Databricks?",
    "What are the best practices for designing a data lake architecture that can handle both batch and streaming data processing workloads?",
    "Spark optimization tips"
]

# Batch process all queries
results = complexity_analyzer.batch(queries)

print("Batch Analysis Results:")
for i, result in enumerate(results, 1):
    print(f"  Query {i}: {result}")


Batch Analysis Results:
  Query 1: {'query': 'Hello', 'word_count': 1, 'complexity': 'Simple'}
  Query 2: {'query': 'How do I setup Databricks?', 'word_count': 5, 'complexity': 'Medium'}
  Query 3: {'query': 'What are the best practices fo...', 'word_count': 21, 'complexity': 'Complex'}
  Query 4: {'query': 'Spark optimization tips', 'word_count': 3, 'complexity': 'Simple'}


## Examples with llm

### RunnableLambda
RunnableLambda wraps any Python function to make it compatible with LangChain chains. This is perfect for integrating custom business logic with AI.

#### In below code clean_user_input() take str as dict

In [99]:
from langchain_core.output_parsers import StrOutputParser
def clean_user_input(text):
    """Clean and format user input for better LLM processing"""
    clean_text = text["topic"].strip().lower()
    return {"topic": clean_text}

# Convert function to Runnable
input_cleaner = RunnableLambda(clean_user_input)
print(input_cleaner.invoke({"topic": "  APACHE SPARK  "}))

# Create a simple AI chain
prompt = ChatPromptTemplate.from_template("Explain this concept clearly: {topic}")
simple_chain = input_cleaner | prompt | chatgpt | StrOutputParser()

# Test the chain
user_input = "  APACHE SPARK  "
result = simple_chain.invoke({"topic": user_input})
print(f"AI Response: {result}")

{'topic': 'apache spark'}
AI Response: Apache Spark is an open-source, distributed computing system that is designed for big data processing and analytics. It provides a fast and general-purpose cluster computing framework for large-scale data processing tasks. Spark is known for its speed and ease of use, as well as its ability to handle a wide range of workloads, including batch processing, real-time streaming, machine learning, and graph processing.

Spark uses a concept called Resilient Distributed Datasets (RDDs) to store and process data across multiple nodes in a cluster. RDDs are fault-tolerant, immutable collections of objects that can be operated on in parallel. Spark also provides a rich set of APIs in multiple programming languages, such as Scala, Java, Python, and R, making it accessible to a wide range of developers.

One of the key features of Apache Spark is its ability to perform in-memory processing, which allows it to achieve high performance by caching data in memor

#### In below code clean_user_input() take str as Input Argument

In [None]:

from langchain_core.output_parsers import StrOutputParser
def clean_user_input(text):
    """Clean and format user input for better LLM processing"""
    return text.strip().lower()

# Convert function to Runnable
input_cleaner = RunnableLambda(clean_user_input)
print(input_cleaner.invoke( " APACHE SPARK  "))

# Create a simple AI chain
prompt = ChatPromptTemplate.from_template("Explain this concept clearly: {topic}")
simple_chain = {"topic": input_cleaner
                } | prompt | chatgpt | StrOutputParser()

# Test the chain
user_input = "  APACHE SPARK  "
result = simple_chain.invoke(user_input)
print(f"AI Response: {result}")

apache spark
AI Response: Apache Spark is an open-source, distributed computing system that is designed for big data processing and analytics. It provides a fast and general-purpose cluster computing framework for large-scale data processing. Spark allows users to write applications in Java, Scala, Python, and R, and provides high-level APIs in these languages for ease of use.

One of the key features of Apache Spark is its ability to perform in-memory processing, which allows it to process data much faster than traditional disk-based systems. Spark also supports a wide range of data processing tasks, including batch processing, real-time streaming, machine learning, and graph processing.

Spark is built around the concept of Resilient Distributed Datasets (RDDs), which are fault-tolerant collections of data that can be operated on in parallel across a cluster of machines. Spark automatically distributes the data and computation across the cluster, making it easy to scale up and down a

### RunnablePassthrough - Preserving Original Context
RunnablePassthrough keeps the original input available while simultaneously processing it through other functions.


In [107]:
def extract_keywords(text):
    """Extract key technical terms from text"""
    technical_terms = ['data', 'spark', 'pipeline', 'model', 'cloud', 'architecture']
    words = text.lower().split()
    found_terms = [term for term in technical_terms if term in ' '.join(words)]
    return found_terms

# Create prompt that uses both original text and extracted keywords
analysis_prompt = ChatPromptTemplate.from_template("""
Original query: "{original_text}"
Technical terms found: {keywords}

Provide a focused technical explanation based on these key terms.
""")

# Chain that preserves original input and adds keyword analysis
context_chain = {
    "original_text": RunnablePassthrough(),
    "keywords": RunnableLambda(extract_keywords)
} | analysis_prompt | chatgpt | StrOutputParser()

# Test with technical query
tech_query = "How do I build data pipelines using Spark for cloud architecture?"
result = context_chain.invoke(tech_query)
print(f"Original: {tech_query}")
print(f"Keywords found: {extract_keywords(tech_query)}")
print(f"AI Analysis: {result[:600]}...")


Original: How do I build data pipelines using Spark for cloud architecture?
Keywords found: ['data', 'spark', 'pipeline', 'cloud', 'architecture']
AI Analysis: To build data pipelines using Spark for cloud architecture, you can leverage the capabilities of Spark's distributed processing framework to efficiently process and analyze large volumes of data in a cloud environment. 

First, you would need to design a pipeline that outlines the flow of data from source to destination, including any transformations or processing steps along the way. Spark provides a high-level API for building data pipelines, allowing you to easily define and execute complex data processing workflows.

Next, you can deploy your Spark application on a cloud platform such as A...


### Sequential Chaining with Pipe Operator (|)
The pipe operator creates sequential workflows where data flows from left to right through multiple processing steps.

In [106]:
def categorize_query(text):
    """Categorize technical queries"""
    text_lower = text.lower()
    if any(word in text_lower for word in ['spark', 'hadoop', 'etl', 'pipeline']):
        return "Data Engineering"
    elif any(word in text_lower for word in ['model', 'algorithm', 'training']):
        return "Machine Learning"
    elif any(word in text_lower for word in ['azure', 'aws', 'cloud']):
        return "Cloud Platform"
    return "General"

def create_expert_prompt(data):
    """Create specialized prompts based on category"""
    category = data['category']
    question = data['question']
    
    expert_prompts = {
        "Data Engineering": f"As a data engineering expert, provide technical guidance for: {question}",
        "Machine Learning": f"As an ML specialist, explain the concepts related to: {question}",
        "Cloud Platform": f"As a cloud architect, describe the solution for: {question}",
        "General": f"Provide a comprehensive answer to: {question}"
    }
    
    return expert_prompts[category]

# Build sequential expert chain
expert_chain = (
    RunnableLambda(lambda x: {"question": x, "category": categorize_query(x)}) |
    RunnableLambda(create_expert_prompt) |
    chatgpt |
    StrOutputParser()
)

# Test different question types
test_questions = [
    "How to optimize Spark job performance?",
    "What's the best algorithm for classification?",
    "How to deploy on Azure Kubernetes Service?"
]

for question in test_questions:
    category = categorize_query(question)
    result = expert_chain.invoke(question)
    print(f"Question: {question}")
    print(f"Category: {category}")
    print(f"Expert Response: {result[:150]}...\n")


Question: How to optimize Spark job performance?
Category: Data Engineering
Expert Response: 1. Partitioning: Ensure that your data is properly partitioned to distribute the workload evenly across the cluster. Use the `repartition()` or `coale...

Question: What's the best algorithm for classification?
Category: Machine Learning
Expert Response: The best algorithm for classification can vary depending on the specific characteristics of the dataset and the problem at hand. Some commonly used al...

Question: How to deploy on Azure Kubernetes Service?
Category: Cloud Platform
Expert Response: To deploy on Azure Kubernetes Service (AKS) as a cloud architect, you would follow these steps:

1. Create an Azure account: If you don't already have...



In [108]:
# Another way to create the chain

# Build sequential expert chain
expert_chain = (
    {"question": RunnablePassthrough(),"category": RunnableLambda(categorize_query)} |
    RunnableLambda(create_expert_prompt) |
    chatgpt |
    StrOutputParser()
)

# Test different question types
test_questions = [
    "How to optimize Spark job performance?",
    "What's the best algorithm for classification?",
    "How to deploy on Azure Kubernetes Service?"
]

for question in test_questions:
    category = categorize_query(question)
    result = expert_chain.invoke(question)
    print(f"Question: {question}")
    print(f"Category: {category}")
    print(f"Expert Response: {result[:150]}...\n")

Question: How to optimize Spark job performance?
Category: Data Engineering
Expert Response: 1. Partitioning: Ensure that your data is properly partitioned before running a Spark job. This helps in distributing the workload evenly across the c...

Question: What's the best algorithm for classification?
Category: Machine Learning
Expert Response: The best algorithm for classification depends on various factors such as the nature of the data, the size of the dataset, the complexity of the proble...

Question: How to deploy on Azure Kubernetes Service?
Category: Cloud Platform
Expert Response: To deploy on Azure Kubernetes Service (AKS) as a cloud architect, you would follow these steps:

1. Create an Azure account: If you don't already have...



### Parallel Processing with RunnableParallel
RunnableParallel executes multiple operations simultaneously on the same input, perfect for extracting different types of information concurrently.

In [110]:
def count_words(text):
    """Simple word count function"""
    return len(text.split())

# Create different analysis prompts
sentiment_prompt = ChatPromptTemplate.from_template(
    "Analyze the sentiment of this text (positive/negative/neutral): {text}"
)

summary_prompt = ChatPromptTemplate.from_template(
    "Provide a one-sentence summary of: {text}"
)

# Create parallel analysis chains
sentiment_chain = sentiment_prompt | chatgpt | StrOutputParser()
summary_chain = summary_prompt | chatgpt | StrOutputParser()

In [114]:
sample_text = "Databricks provides excellent tools for building scalable data pipelines and machine learning solutions efficiently."
# RunnableLambda(count_words).invoke({"text": sample_text})

RunnableLambda(count_words).invoke( sample_text)


14

In [None]:
# Combine into parallel processing
parallel_analyzer = RunnableParallel(
    word_count=RunnableLambda(count_words),
    sentiment=sentiment_chain,
    summary=summary_chain
)

# Test parallel processing
sample_text = "Databricks provides excellent tools for building scalable data pipelines and machine learning solutions efficiently."
results = parallel_analyzer.invoke({"text": sample_text})

print("=== PARALLEL ANALYSIS RESULTS ===")
for analysis_type, result in results.items():
    print(f"{analysis_type.title()}: {result}")