# LangChain Runnables: A Comprehensive Guide

## Overview
LangChain Runnables are the fundamental building blocks that enable you to create complex, composable AI workflows. This notebook explores the core runnable types and demonstrates how to build sophisticated data processing pipelines.

## What You'll Learn:
- **RunnablePassthrough**: Preserving input context
- **RunnableLambda**: Converting functions to runnables
- **RunnableParallel**: Concurrent execution patterns
- **Sequential Chaining**: Building step-by-step workflows
- **Conditional Logic**: Dynamic routing based on content
- **Streaming & Batch Processing**: Handling different execution modes

## Key Concepts:
- **Composability**: Combining simple runnables to create complex workflows
- **Type Safety**: Ensuring data flows correctly between components
- **Performance**: Parallel execution for independent operations
- **Flexibility**: Adapting to different input types and processing needs


In [None]:
# Clear any previous output from the notebook for a clean start
from IPython.display import clear_output
clear_output(wait=True)

In [None]:
# Import the core runnable classes from LangChain
# RunnablePassthrough: Passes input through unchanged
# RunnableLambda: Wraps any Python function to make it a runnable
# RunnableParallel: Executes multiple runnables in parallel
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel

## 1. RunnablePassthrough: Basic Input Handling

**RunnablePassthrough** is the simplest runnable that takes an input and returns it unchanged. This is useful when you want to preserve the original input in a chain while also performing other operations on it.


In [None]:
# Create a RunnablePassthrough instance
# This will simply return whatever input it receives
chain = RunnablePassthrough()

# Test with a simple string - it returns the same string
print(chain.invoke("abcd"))

abcd


## 2. RunnableLambda: Converting Functions to Runnables

**RunnableLambda** allows you to convert any Python function into a runnable that can be chained with other components. This is extremely powerful for integrating custom business logic into your LangChain workflows.


In [None]:
# Define a custom function that calculates the length of input text
def output_length(input: str):
    """Calculate the length of the input string"""
    output = len(input)
    return output

# Convert the function to a runnable using RunnableLambda
chain = RunnableLambda(output_length)

# Test the chain - it should return the length of the input string
print(chain.invoke("input to output"))  # Should print 15

15


In [None]:
# Define a function that works with dictionary input
def sum(item: dict):
    """Add two numbers from a dictionary"""
    return item["a"] + item["b"]

# Convert to runnable and test with dictionary input
chain = RunnableLambda(sum)
chain.invoke({"a": 1, "b": 2})  # Should return 3

3

In [None]:
# Alternative approach: Define a function with separate parameters
def sum_values(a: int, b: int) -> int:
    """Add two numbers passed as separate parameters"""
    return a + b

# Use lambda to adapt the function to work with dictionary input
# This demonstrates how to bridge different function signatures
chain = RunnableLambda(lambda item: sum_values(item["a"], item["b"]))

# Test the adapted function
result = chain.invoke({"a": 1, "b": 2})
print(result)  # Should print 3

3


## 3. RunnableParallel: Concurrent Processing

**RunnableParallel** executes multiple runnables concurrently on the same input, collecting their outputs into a dictionary. This is perfect for scenarios where you need to extract different types of information from the same input simultaneously.


In [None]:
# Create a parallel chain that:
# 1. Preserves the original text using RunnablePassthrough
# 2. Calculates the length using our custom function
chain = RunnableParallel(
    text1=RunnablePassthrough(),      # Keep original input
    length=RunnableLambda(output_length)  # Calculate length
)

# Test with a sample text - returns both original text and its length
chain.invoke("start-tech academy")

{'text1': 'start-tech academy', 'length': 18}

In [None]:
# Load environment variables and OpenAI API key
import os
from dotenv import load_dotenv

# Load environment variables from .env file
# Make sure you have a .env file with OPENAI_API_KEY=your_api_key_here
load_dotenv()

# Get the OpenAI API key from environment variables
OPENAI_KEY = os.getenv("OPENAI_API_KEY")

In [None]:
# Set the OpenAI API key as an environment variable
# This is required for the OpenAI client to authenticate
import os

os.environ['OPENAI_API_KEY'] = OPENAI_KEY

In [None]:
# Initialize the OpenAI chat model
from langchain_openai import ChatOpenAI

# Create a ChatOpenAI instance with specific configuration
# gpt-4o-mini: A cost-effective model for most tasks
# temperature=0: Ensures deterministic, consistent responses
chatgpt = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

In [None]:
# Create a prompt template and chain it with the LLM
from langchain_core.prompts import ChatPromptTemplate

# Create a simple prompt template that accepts user queries
prompt_txt = "{query}"
prompt_template = ChatPromptTemplate.from_template(prompt_txt)

# Create a chain by connecting the prompt template to the LLM
# The pipe operator (|) creates a sequential chain
# Data flows: input → prompt_template → chatgpt → output
llmchain = (prompt_template
              |
           chatgpt)

In [None]:
# Test the chain with a sample query
response = llmchain.invoke({'query': 'Explain Generative AI in 1 line'})

# Print the AI's response content
print(response.content)

Generative AI refers to algorithms that can create new content, such as text, images, or music, by learning patterns from existing data.


## 5. Advanced RunnableLambda Patterns

### Custom Data Processing with RunnableLambda

This section demonstrates how to create more sophisticated data processing pipelines using RunnableLambda to handle complex data transformations before sending to the LLM.



In [None]:
# Import RunnableLambda for custom function wrapping
from langchain_core.runnables import RunnableLambda

# Define a custom function to format user data
def format_user_data(user_info):
    """Process user information for downstream components"""
    # Create a formatted string with user details
    return f"User: {user_info['name']} | Role: {user_info['role']} | Location: {user_info['location']}"

# Convert the function to a Runnable for use in chains
formatter = RunnableLambda(format_user_data)

# Test the formatter with sample user data
user_data = {"name": "Sourav", "role": "Solution Architect", "location": "Bengaluru"}
result = formatter.invoke(user_data)
print(result)

User: Sourav | Role: Solution Architect | Location: Bengaluru


## 6. RunnablePassthrough - Preserving Input Context

### Complex Data Processing with Context Preservation

This example shows how to use RunnablePassthrough to maintain the original input while simultaneously processing it through other functions. This is crucial for maintaining context in complex AI workflows.


In [None]:
# Import necessary components for context preservation
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate

# Define a function to extract keywords from text
def extract_keywords(text):
    """Simple keyword extraction that filters words longer than 4 characters"""
    words = text.lower().split()
    keywords = [word for word in words if len(word) > 4]
    return keywords[:3]  # Return top 3 keywords

# Create a prompt template that uses both original text and keywords
prompt = ChatPromptTemplate.from_template(
    "Analyze this text: {original_text}\nKeywords found: {keywords}"
)

# Build a chain that preserves original input and adds keyword analysis
# This demonstrates parallel processing: 
# - RunnablePassthrough keeps the original text
# - RunnableLambda processes the text to extract keywords
chain = {
    "original_text": RunnablePassthrough(),      # Preserve original input
    "keywords": RunnableLambda(extract_keywords)    # Extract keywords
} | prompt | chatgpt

# Test the chain with a technical text
sample_text = "Building scalable data platforms requires careful architecture planning"
result = chain.invoke(sample_text)
print(result.content)

The text "Building scalable data platforms requires careful architecture planning" emphasizes the importance of strategic design in the development of data platforms that can grow and adapt to increasing demands. 

### Analysis:

1. **Keywords**:
   - **Building**: This suggests an active process of creation and development, indicating that constructing data platforms is not a passive task but requires effort and expertise.
   - **Scalable**: This is a critical term in technology and data management, referring to the ability of a system to handle growth, whether in terms of data volume, user load, or functionality. Scalability is essential for ensuring that a platform can evolve without requiring a complete redesign.
   - **Platforms**: This term indicates a foundational technology or framework that supports various applications or services. In the context of data, it implies a comprehensive system that manages data storage, processing, and analysis.

2. **Key Themes**:
   - **Architec

## 7. Sequential Chaining with Pipe Operator (|)

### Building Multi-Step Data Processing Pipelines

Sequential chaining allows you to build complex workflows where data flows through multiple processing steps in order. Each step receives the output of the previous step as its input, creating a powerful data transformation pipeline.

In [None]:
# Import RunnableLambda for creating processing steps
from langchain_core.runnables import RunnableLambda

# Step 1: Input validation and cleaning
def validate_input(data):
    """Validate and clean input data"""
    if not isinstance(data, str) or len(data.strip()) == 0:
        raise ValueError("Input must be a non-empty string")
    return data.strip()

# Step 2: Extract various text metrics
def extract_metrics(text):
    """Extract comprehensive text metrics from the input"""
    return {
        "original_text": text,
        "character_count": len(text),
        "word_count": len(text.split()),
        "sentence_count": text.count('.') + text.count('!') + text.count('?')
    }

# Step 3: Generate a formatted summary report
def generate_summary(metrics):
    """Generate a comprehensive summary report from metrics"""
    avg_words_per_sentence = metrics['word_count'] / max(metrics['sentence_count'], 1)
    return f"""
Text Analysis Summary:
- Characters: {metrics['character_count']}
- Words: {metrics['word_count']}  
- Sentences: {metrics['sentence_count']}
- Avg words per sentence: {avg_words_per_sentence:.1f}
Original: "{metrics['original_text'][:50]}..."
"""

# Create a sequential processing chain
# Data flows: input → validate → extract_metrics → generate_summary → output
analysis_chain = (
    RunnableLambda(validate_input) |     # Step 1: Clean input
    RunnableLambda(extract_metrics) |    # Step 2: Extract metrics
    RunnableLambda(generate_summary)     # Step 3: Generate report
)

# Test the sequential chain with sample text
sample_text = "Data engineering is crucial for ML success. It involves building robust pipelines. Quality data leads to better models."
result = analysis_chain.invoke(sample_text)
print(result)


Text Analysis Summary:
- Characters: 119
- Words: 18  
- Sentences: 3
- Avg words per sentence: 6.0
Original: "Data engineering is crucial for ML success. It inv..."



## 8. Advanced Parallel Execution with RunnableParallel

### Concurrent Multi-Analysis Pipeline

This advanced example demonstrates how to perform multiple types of analysis simultaneously on the same input. This is particularly useful for extracting different insights from text data in parallel, significantly improving performance compared to sequential processing.


In [None]:
# Import required modules for parallel processing
from langchain_core.runnables import RunnableLambda, RunnableParallel
import re

# Analysis Function 1: Technical Content Analysis
def analyze_technical_content(text):
    """Analyze technical aspects of text and calculate technical density"""
    tech_terms = ['data', 'platform', 'architecture', 'pipeline', 'model', 'algorithm']
    found_terms = [term for term in tech_terms if term.lower() in text.lower()]
    technical_density = len(found_terms) / len(text.split()) * 100
    return {
        "technical_terms": found_terms,
        "technical_density": technical_density
    }

# Analysis Function 2: Sentiment Analysis
def analyze_sentiment(text):
    """Simple rule-based sentiment analysis"""
    positive_words = ['good', 'great', 'excellent', 'success', 'efficient', 'robust']
    negative_words = ['bad', 'poor', 'failed', 'problem', 'issue', 'difficult']
    
    # Count positive and negative words
    pos_count = sum(1 for word in positive_words if word in text.lower())
    neg_count = sum(1 for word in negative_words if word in text.lower())
    
    # Determine overall sentiment
    if pos_count > neg_count:
        return "Positive"
    elif neg_count > pos_count:
        return "Negative"
    else:
        return "Neutral"

# Analysis Function 3: Entity Extraction
def extract_entities(text):
    """Extract potential named entities using simple pattern matching"""
    # Find capitalized words (potential proper nouns/entities)
    entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', text)
    return list(set(entities))  # Remove duplicates

# Create a parallel analysis pipeline that runs all analyses simultaneously
# All functions receive the same input text and execute concurrently
parallel_analyzer = RunnableParallel(
    technical_analysis=RunnableLambda(analyze_technical_content),  # Technical analysis
    sentiment=RunnableLambda(analyze_sentiment),                   # Sentiment analysis
    entities=RunnableLambda(extract_entities),                     # Entity extraction
    word_count=RunnableLambda(lambda x: len(x.split()))            # Simple word count
)

# Test the parallel execution with sample text
sample_text = "Databricks provides an excellent platform for building robust data pipelines. The architecture supports efficient model training and deployment."

# Execute all analyses in parallel
result = parallel_analyzer.invoke(sample_text)

# Display results from all parallel analyses
print("Parallel Analysis Results:")
for key, value in result.items():
    print(f"  {key}: {value}")


Parallel Analysis Results:
  technical_analysis: {'technical_terms': ['data', 'platform', 'architecture', 'pipeline', 'model'], 'technical_density': 27.77777777777778}
  sentiment: Positive
  entities: ['The', 'Databricks']
  word_count: 18


## 9. Conditional Logic with RunnableBranch

### Intelligent Query Routing System

RunnableBranch enables dynamic routing of inputs based on conditions. This example demonstrates how to create an intelligent system that routes different types of technical queries to specialized handlers, similar to how a support system might route tickets to different departments.

In [None]:
# Import required modules for conditional logic
from langchain_core.runnables import RunnableLambda, RunnableBranch

# Condition Functions: These determine which branch to take
def is_data_engineering_query(text):
    """Check if query is about data engineering topics"""
    de_keywords = ['pipeline', 'etl', 'data warehouse', 'spark', 'kafka', 'airflow']
    return any(keyword in text.lower() for keyword in de_keywords)

def is_ml_query(text):
    """Check if query is about machine learning topics"""
    ml_keywords = ['model', 'algorithm', 'training', 'prediction', 'classification', 'regression']
    return any(keyword in text.lower() for keyword in ml_keywords)

def is_cloud_query(text):
    """Check if query is about cloud platforms and services"""
    cloud_keywords = ['aws', 'azure', 'gcp', 'cloud', 'kubernetes', 'docker']
    return any(keyword in text.lower() for keyword in cloud_keywords)

# Handler Functions: These process queries based on their type
def handle_data_engineering(text):
    """Specialized handler for data engineering queries"""
    return f"🔧 Data Engineering Response: This query about '{text}' relates to building and maintaining data pipelines and infrastructure."

def handle_ml_query(text):
    """Specialized handler for ML queries"""
    return f"🤖 ML Response: This query about '{text}' involves machine learning models and algorithms."

def handle_cloud_query(text):
    """Specialized handler for cloud queries"""
    return f"☁️ Cloud Response: This query about '{text}' concerns cloud platforms and services."

def handle_general_query(text):
    """Default handler for general queries that don't match specific categories"""
    return f"💡 General Response: This is a general query about '{text}'. Please provide more specific context."

# Create conditional routing system using RunnableBranch
# Structure: (condition, handler) pairs, with default handler at the end
query_router = RunnableBranch(
    (RunnableLambda(is_data_engineering_query), RunnableLambda(handle_data_engineering)),
    (RunnableLambda(is_ml_query), RunnableLambda(handle_ml_query)),
    (RunnableLambda(is_cloud_query), RunnableLambda(handle_cloud_query)),
    RunnableLambda(handle_general_query)  # Default case when no conditions match
)

# Test the routing system with different types of queries
test_queries = [
    "How do I optimize Spark pipelines for better performance?",  # Data Engineering
    "What's the best algorithm for classification problems?",      # Machine Learning
    "How to deploy models on Azure ML?",                          # Cloud (contains both ML and cloud keywords)
    "What's the weather like today?"                              # General (no specific keywords)
]

# Execute routing for each test query
for query in test_queries:
    result = query_router.invoke(query)
    print(f"Query: {query}")
    print(f"Response: {result}\n")

Query: How do I optimize Spark pipelines for better performance?
Response: 🔧 Data Engineering Response: This query about 'How do I optimize Spark pipelines for better performance?' relates to building and maintaining data pipelines and infrastructure.

Query: What's the best algorithm for classification problems?
Response: 🤖 ML Response: This query about 'What's the best algorithm for classification problems?' involves machine learning models and algorithms.

Query: How to deploy models on Azure ML?
Response: 🤖 ML Response: This query about 'How to deploy models on Azure ML?' involves machine learning models and algorithms.

Query: What's the weather like today?
Response: 💡 General Response: This is a general query about 'What's the weather like today?'. Please provide more specific context.



## 10. Streaming Processing

### Real-time Data Processing with Streaming

Streaming allows you to process data incrementally and yield results as they become available. This is particularly useful for long-running processes or when you want to provide progressive updates to users.


In [None]:
# Import required modules for streaming
from langchain_core.runnables import RunnableLambda
import time

def streaming_processor(text):
    """Process text word by word and yield incremental results"""
    words = text.split()
    processed_words = []
    
    # Process each word incrementally
    for word in words:
        processed_words.append(word.upper())
        
        # Simulate processing time (remove in production)
        time.sleep(0.1)
        
        # Yield intermediate result after processing each word
        yield f"Processed: {' '.join(processed_words)}"

# Create a streaming runnable from the generator function
streaming_runnable = RunnableLambda(streaming_processor)

# Execute streaming processing - results are yielded incrementally
print("Streaming processing:")
for chunk in streaming_runnable.stream("building scalable data solutions"):
    print(f"  {chunk}")

Streaming processing:
  Processed: BUILDING
  Processed: BUILDING SCALABLE
  Processed: BUILDING SCALABLE DATA
  Processed: BUILDING SCALABLE DATA SOLUTIONS


## 11. Batch Processing

### Efficient Processing of Multiple Inputs

Batch processing allows you to process multiple inputs simultaneously, which is more efficient than processing them one by one. This is particularly useful when you need to analyze large volumes of data or multiple queries at once.

In [None]:
# Import RunnableLambda for batch processing
from langchain_core.runnables import RunnableLambda

def analyze_query_complexity(query):
    """Analyze the complexity of a user query based on word count"""
    word_count = len(query.split())
    char_count = len(query)
    
    # Categorize complexity based on word count
    if word_count <= 3:
        complexity = "Simple"
    elif word_count <= 10:
        complexity = "Medium"
    else:
        complexity = "Complex"
    
    return {
        "query": query[:30] + "..." if len(query) > 30 else query,  # Truncate long queries
        "word_count": word_count,
        "complexity": complexity
    }

# Create analyzer runnable
complexity_analyzer = RunnableLambda(analyze_query_complexity)

# Prepare multiple queries for batch processing
queries = [
    "Hello",                                    # Simple query
    "How do I setup Databricks?",              # Medium query
    "What are the best practices for designing a data lake architecture that can handle both batch and streaming data processing workloads?",  # Complex query
    "Spark optimization tips"                   # Simple query
]

# Batch process all queries simultaneously
# This is more efficient than processing them one by one
results = complexity_analyzer.batch(queries)

# Display results from batch processing
print("Batch Analysis Results:")
for i, result in enumerate(results, 1):
    print(f"  Query {i}: {result}")


Batch Analysis Results:
  Query 1: {'query': 'Hello', 'word_count': 1, 'complexity': 'Simple'}
  Query 2: {'query': 'How do I setup Databricks?', 'word_count': 5, 'complexity': 'Medium'}
  Query 3: {'query': 'What are the best practices fo...', 'word_count': 21, 'complexity': 'Complex'}
  Query 4: {'query': 'Spark optimization tips', 'word_count': 3, 'complexity': 'Simple'}


## 12. Advanced Examples with LLM Integration

### Building Production-Ready AI Workflows

This section demonstrates how to combine all the concepts we've learned to build sophisticated AI workflows that integrate custom processing with Large Language Models. These examples show real-world patterns for building production-ready AI applications.

### Example 1: Custom Data Processing with LLM Integration

This example demonstrates how to clean and format user input before sending it to the LLM. Data preprocessing is crucial for getting consistent, high-quality responses from AI models.

**Note:** In this approach, the `clean_user_input()` function expects a dictionary input with a "topic" key.

In [None]:
# Import string output parser for clean text responses
from langchain_core.output_parsers import StrOutputParser

# Function that processes dictionary input
def clean_user_input(text):
    """Clean and format user input for better LLM processing"""
    clean_text = text["topic"].strip().lower()  # Extract and clean the topic
    return {"topic": clean_text}

# Convert function to Runnable
input_cleaner = RunnableLambda(clean_user_input)

# Test the cleaner with dictionary input
print("Cleaned output:", input_cleaner.invoke({"topic": "  APACHE SPARK  "}))

# Create a complete AI chain with input cleaning
prompt = ChatPromptTemplate.from_template("Explain this concept clearly: {topic}")
simple_chain = input_cleaner | prompt | chatgpt | StrOutputParser()

# Test the complete chain with dictionary input
user_input = "  APACHE SPARK  "
result = simple_chain.invoke({"topic": user_input})
print(f"AI Response: {result}")

{'topic': 'apache spark'}
AI Response: Apache Spark is an open-source, distributed computing system that is designed for big data processing and analytics. It provides a fast and general-purpose cluster computing framework for large-scale data processing tasks. Spark is known for its speed and ease of use, as well as its ability to handle a wide range of workloads, including batch processing, real-time streaming, machine learning, and graph processing.

Spark uses a concept called Resilient Distributed Datasets (RDDs) to store and process data across multiple nodes in a cluster. RDDs are fault-tolerant, immutable collections of objects that can be operated on in parallel. Spark also provides a rich set of APIs in multiple programming languages, such as Scala, Java, Python, and R, making it accessible to a wide range of developers.

One of the key features of Apache Spark is its ability to perform in-memory processing, which allows it to achieve high performance by caching data in memor

#### Approach 2: Direct String Input Processing

This approach processes string input directly, which is more common when dealing with simple text preprocessing.

In [None]:

# Import string output parser for clean text responses
from langchain_core.output_parsers import StrOutputParser

# Function that processes string input directly
def clean_user_input(text):
    """Clean and format user input for better LLM processing"""
    return text.strip().lower()  # Remove whitespace and convert to lowercase

# Convert function to Runnable
input_cleaner = RunnableLambda(clean_user_input)

# Test the cleaner with string input
print("Cleaned output:", input_cleaner.invoke("  APACHE SPARK  "))

# Create a complete AI chain with input cleaning
# Note: We use dictionary syntax to map the cleaned input to the "topic" key
prompt = ChatPromptTemplate.from_template("Explain this concept clearly: {topic}")
simple_chain = {"topic": input_cleaner} | prompt | chatgpt | StrOutputParser()

# Test the complete chain with string input
user_input = "  APACHE SPARK  "
result = simple_chain.invoke(user_input)
print(f"AI Response: {result}")

apache spark
AI Response: Apache Spark is an open-source, distributed computing system that is designed for big data processing and analytics. It provides a fast and general-purpose cluster computing framework for large-scale data processing. Spark allows users to write applications in Java, Scala, Python, and R, and provides high-level APIs in these languages for ease of use.

One of the key features of Apache Spark is its ability to perform in-memory processing, which allows it to process data much faster than traditional disk-based systems. Spark also supports a wide range of data processing tasks, including batch processing, real-time streaming, machine learning, and graph processing.

Spark is built around the concept of Resilient Distributed Datasets (RDDs), which are fault-tolerant collections of data that can be operated on in parallel across a cluster of machines. Spark automatically distributes the data and computation across the cluster, making it easy to scale up and down a

### Example 2: Context Preservation with RunnablePassthrough

This example demonstrates how to preserve the original input context while simultaneously processing it through other functions. This pattern is essential for maintaining context in complex AI workflows where you need both the original data and processed insights.


In [None]:
# Function to extract technical keywords from text
def extract_keywords(text):
    """Extract key technical terms from text"""
    technical_terms = ['data', 'spark', 'pipeline', 'model', 'cloud', 'architecture']
    words = text.lower().split()
    found_terms = [term for term in technical_terms if term in ' '.join(words)]
    return found_terms

# Create a prompt template that uses both original text and extracted keywords
analysis_prompt = ChatPromptTemplate.from_template("""
Original query: "{original_text}"
Technical terms found: {keywords}

Provide a focused technical explanation based on these key terms.
""")

# Create a chain that preserves original input and adds keyword analysis
# RunnablePassthrough preserves the original text
# RunnableLambda extracts keywords from the same text
context_chain = {
    "original_text": RunnablePassthrough(),      # Preserve original input
    "keywords": RunnableLambda(extract_keywords)    # Extract keywords
} | analysis_prompt | chatgpt | StrOutputParser()

# Test the context preservation chain
tech_query = "How do I build data pipelines using Spark for cloud architecture?"
result = context_chain.invoke(tech_query)

# Display the results
print(f"Original: {tech_query}")
print(f"Keywords found: {extract_keywords(tech_query)}")
print(f"AI Analysis: {result[:600]}...")


Original: How do I build data pipelines using Spark for cloud architecture?
Keywords found: ['data', 'spark', 'pipeline', 'cloud', 'architecture']
AI Analysis: To build data pipelines using Spark for cloud architecture, you can leverage the capabilities of Spark's distributed processing framework to efficiently process and analyze large volumes of data in a cloud environment. 

First, you would need to design a pipeline that outlines the flow of data from source to destination, including any transformations or processing steps along the way. Spark provides a high-level API for building data pipelines, allowing you to easily define and execute complex data processing workflows.

Next, you can deploy your Spark application on a cloud platform such as A...


### Sequential Chaining with Pipe Operator (|)
The pipe operator creates sequential workflows where data flows from left to right through multiple processing steps.

In [None]:
# Import necessary modules for sequential chaining
from langchain_core.output_parsers import StrOutputParser

def categorize_query(text):
    """Categorize technical queries based on keywords"""
    text_lower = text.lower()
    if any(word in text_lower for word in ['spark', 'hadoop', 'etl', 'pipeline']):
        return "Data Engineering"
    elif any(word in text_lower for word in ['model', 'algorithm', 'training']):
        return "Machine Learning"
    elif any(word in text_lower for word in ['azure', 'aws', 'cloud']):
        return "Cloud Platform"
    return "General"

def create_expert_prompt(data):
    """Create specialized prompts based on category"""
    category = data['category']
    question = data['question']
    
    expert_prompts = {
        "Data Engineering": f"As a data engineering expert, provide technical guidance for: {question}",
        "Machine Learning": f"As an ML specialist, explain the concepts related to: {question}",
        "Cloud Platform": f"As a cloud architect, describe the solution for: {question}",
        "General": f"Provide a comprehensive answer to: {question}"
    }
    
    return expert_prompts[category]

# Build sequential expert chain
expert_chain = (
    RunnableLambda(lambda x: {"question": x, "category": categorize_query(x)}) |
    RunnableLambda(create_expert_prompt) |
    chatgpt |
    StrOutputParser()
)

# Test different question types
test_questions = [
    "How to optimize Spark job performance?",
    "What's the best algorithm for classification?",
    "How to deploy on Azure Kubernetes Service?"
]

for question in test_questions:
    category = categorize_query(question)
    result = expert_chain.invoke(question)
    print(f"Question: {question}")
    print(f"Category: {category}")
    print(f"Expert Response: {result[:150]}...\n")
test_questions = [
    "How to optimize Spark job performance?",
    "What's the best algorithm for classification?",
    "How to deploy on Azure Kubernetes Service?"
]

for question in test_questions:
    category = categorize_query(question)
    result = expert_chain.invoke(question)
    print(f"Question: {question}")
    print(f"Category: {category}")
    print(f"Expert Response: {result[:150]}...\n")


Question: How to optimize Spark job performance?
Category: Data Engineering
Expert Response: 1. Partitioning: Ensure that your data is properly partitioned to distribute the workload evenly across the cluster. Use the `repartition()` or `coale...

Question: What's the best algorithm for classification?
Category: Machine Learning
Expert Response: The best algorithm for classification can vary depending on the specific characteristics of the dataset and the problem at hand. Some commonly used al...

Question: How to deploy on Azure Kubernetes Service?
Category: Cloud Platform
Expert Response: To deploy on Azure Kubernetes Service (AKS) as a cloud architect, you would follow these steps:

1. Create an Azure account: If you don't already have...



In [108]:
# Another way to create the chain

# Build sequential expert chain
expert_chain = (
    {"question": RunnablePassthrough(),"category": RunnableLambda(categorize_query)} |
    RunnableLambda(create_expert_prompt) |
    chatgpt |
    StrOutputParser()
)

# Test different question types
test_questions = [
    "How to optimize Spark job performance?",
    "What's the best algorithm for classification?",
    "How to deploy on Azure Kubernetes Service?"
]

for question in test_questions:
    category = categorize_query(question)
    result = expert_chain.invoke(question)
    print(f"Question: {question}")
    print(f"Category: {category}")
    print(f"Expert Response: {result[:150]}...\n")

Question: How to optimize Spark job performance?
Category: Data Engineering
Expert Response: 1. Partitioning: Ensure that your data is properly partitioned before running a Spark job. This helps in distributing the workload evenly across the c...

Question: What's the best algorithm for classification?
Category: Machine Learning
Expert Response: The best algorithm for classification depends on various factors such as the nature of the data, the size of the dataset, the complexity of the proble...

Question: How to deploy on Azure Kubernetes Service?
Category: Cloud Platform
Expert Response: To deploy on Azure Kubernetes Service (AKS) as a cloud architect, you would follow these steps:

1. Create an Azure account: If you don't already have...



### Parallel Processing with RunnableParallel
RunnableParallel executes multiple operations simultaneously on the same input, perfect for extracting different types of information concurrently.

In [110]:
def count_words(text):
    """Simple word count function"""
    return len(text.split())

# Create different analysis prompts
sentiment_prompt = ChatPromptTemplate.from_template(
    "Analyze the sentiment of this text (positive/negative/neutral): {text}"
)

summary_prompt = ChatPromptTemplate.from_template(
    "Provide a one-sentence summary of: {text}"
)

# Create parallel analysis chains
sentiment_chain = sentiment_prompt | chatgpt | StrOutputParser()
summary_chain = summary_prompt | chatgpt | StrOutputParser()

In [114]:
sample_text = "Databricks provides excellent tools for building scalable data pipelines and machine learning solutions efficiently."
# RunnableLambda(count_words).invoke({"text": sample_text})

RunnableLambda(count_words).invoke( sample_text)


14

### Understand RunnableLambda
https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableLambda.html

In [2]:
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda

In [3]:
def sum(a):
    return a + 3

print(RunnableLambda(sum).invoke(4))

7


In [4]:
def add_one(x: int) -> int:
    return x + 1

print(RunnableLambda(add_one).invoke(1))

2


In [5]:
print(type(RunnableLambda(add_one)))

<class 'langchain_core.runnables.base.RunnableLambda'>


In [6]:
for elem in RunnableLambda(add_one).stream(2):
    print(elem)

3


In [7]:
for elem in RunnableLambda(add_one).batch([1,2,3]):
    print(elem)

2
3
4


### Understand RunnablePassthrough
https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.passthrough.RunnablePassthrough.html

In [8]:
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)

runnable = RunnableParallel(
    origin=RunnablePassthrough(),
    modified=lambda x: x+1
)

print(runnable.invoke(1))

{'origin': 1, 'modified': 2}


In [9]:
def fake_llm(prompt: str) -> str: # Fake LLM for the example
    return prompt+" completion"

In [11]:
# Create a chain that combines a fake LLM with text reversal
# Step 1: RunnableLambda(fake_llm) - Wraps our fake LLM function
# Step 2: {'parsed': lambda text: text[::-1]} - Creates a dictionary with a key 'parsed'
#         that contains a lambda function to reverse the text
# The pipe operator (|) chains these operations together
chain = RunnableLambda(fake_llm) | {'parsed': lambda text: text[::-1]}

# Invoke the chain with input "hello"
# This will: 1) Pass "hello" to fake_llm -> returns "hello completion"
#           2) Pass "hello completion" to the lambda function -> reverses it to "noitelpmoc olleh"
#           3) Return result as {'parsed': 'noitelpmoc olleh'}
chain.invoke("hello")

{'parsed': 'noitelpmoc olleh'}

In [12]:
chain = RunnableLambda(fake_llm) | {
    'original': RunnablePassthrough(), # Original LLM output
    'parsed': lambda text: text[::-1] # Reverse the output
}

print(chain.invoke('hello'))

{'original': 'hello completion', 'parsed': 'noitelpmoc olleh'}


#### Explanation of above code

1. RunnableLambda(fake_llm) Converts the fake_llm function into a LangChain Runnable. This makes it compatible with LangChain pipelines
2. This pipes the output of RunnableLambda(fake_llm) into the next processing step.
3. The output of fake_llm ("completion") is passed into both
    * 'original': RunnablePassthrough(), which simply forwards the input as is.
    * 'parsed': lambda text: text[::-1], which reverses the output string.

1. Input 'hello' is passed into fake_llm
    * fake_llm('hello')  # Returns "completion"
2. "completion" is passed into the dictionary processor, which does:
    * 'original': Keeps "completion" as is.
    * 'parsed': Reverses "completion" to "noitelpmoc".

In [13]:
RunnableLambda(fake_llm).invoke("hello")

'hello completion'

In [14]:
# Use Assign in RunnablePassThrough
# In some cases, it may be useful to pass the input through while adding some keys to the output. In this case, you can use the assign method.

def fake_llm(prompt: str) -> str: # Fake LLM for the example
    return "completion"

chain = {
    'llm1':  fake_llm,
    'llm2':  fake_llm,
} | RunnablePassthrough.assign(
    total_chars=lambda inputs: len(inputs['llm1'] + inputs['llm2'])
)

In [15]:
chain.invoke('hello')

{'llm1': 'completion', 'llm2': 'completion', 'total_chars': 20}

## 13. Summary and Key Takeaways

### What We've Learned

This notebook covered the fundamental concepts of LangChain Runnables and how to build sophisticated AI workflows:

#### Core Runnable Types:
1. **RunnablePassthrough**: Preserves input unchanged - useful for maintaining context
2. **RunnableLambda**: Wraps Python functions for custom processing
3. **RunnableParallel**: Executes multiple operations simultaneously
4. **RunnableBranch**: Enables conditional logic and routing

#### Key Patterns:
- **Sequential Processing**: Using the pipe operator (`|`) to chain operations
- **Parallel Processing**: Executing multiple analyses simultaneously
- **Context Preservation**: Maintaining original input while processing
- **Conditional Routing**: Directing inputs to specialized handlers

#### Best Practices:
- Use parallel processing for independent operations to improve performance
- Preserve context when building complex workflows
- Implement proper input validation and cleaning
- Design modular, reusable components
- Combine custom logic with LLM capabilities effectively

#### Real-world Applications:
- Content analysis and categorization
- Technical query routing systems
- Data preprocessing pipelines
- Multi-stage AI workflows
- Batch processing of large datasets

### Next Steps

Now that you understand the fundamentals of LangChain Runnables, you can:
- Build more complex multi-agent systems
- Implement sophisticated data processing pipelines
- Create intelligent routing systems
- Develop production-ready AI applications

**Remember**: The power of LangChain Runnables lies in their composability - you can combine these simple building blocks to create highly sophisticated AI workflows that solve real-world problems.
