In [10]:
from dotenv import load_dotenv, find_dotenv
from langchain_core.prompts import ChatPromptTemplate, ChatMessagePromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

load_dotenv()

system_template = ChatMessagePromptTemplate.from_template(
    "You are a comedian specialized in {style} humor.",
    role="system"
)

message_template= ChatMessagePromptTemplate.from_template(
     template="tell a joke about: {topic}",
     role="user"
)

chat_prompt = ChatPromptTemplate.from_messages([system_template, message_template])
model = ChatOpenAI(model="gpt-4-turbo", temperature=1)
output_parser = StrOutputParser()

chain = chat_prompt | model | output_parser
input = {
    "style" : "pirate commedian",
    "topic" : "fish"
}

chain.invoke(input)



'Why did the pirate bring a fish to the poker game?\n\nBecause he heard they were playing for goldfish! Arrr, trying to plunder the sea one scale at a time!'

In [16]:
from abc import ABC, abstractmethod
from typing import TypeVar, Generic

T = TypeVar('T')
U = TypeVar('U')

class CRunnable(ABC, Generic[T, U]):
    @abstractmethod
    def invoke(self, data: T) -> U:
        """Abstract method that must be implemented by subclasses"""

class Chain(CRunnable):
    def __init__(self, first: CRunnable, second: CRunnable):
        self.first = first
        self.second = second

    def invoke(self, data):
        first_result = self.first.invoke(data)
        return self.second.invoke(first_result)

class Pipe:
    def __init__(self, runnable: CRunnable):
        self._runnable = runnable
    
    def __or__(self, other: 'Pipe') -> 'Pipe':
        # other is a Pipe, so we need to access its _runnable
        # Create a Chain with both wrapped CRunnables
        chained = Chain(self._runnable, other._runnable)
        # Wrap the chain in a new Pipe
        return Pipe(chained)
    
    def invoke(self, data):
        return self._runnable.invoke(data)

class AddTen(CRunnable):
    def invoke(self, data: int) -> int:
        print("AddTen:", data)
        return data + 10

class MultiplyByTwo(CRunnable):
    def invoke(self, data: int) -> int:
        print("Multiply by 2:", data)
        return data * 2

class ConvertToString(CRunnable):
    def invoke(self, data: int) -> str:
        print("Convert to string:", data)
        return f"Result: {data}"

if __name__ == "__main__":
    # Each CRunnable is wrapped in a Pipe
    a = Pipe(AddTen())
    b = Pipe(MultiplyByTwo())
    c = Pipe(ConvertToString())

    chain = a | b | c  # Now pipe operators work on Pipe objects
    result = chain.invoke(10)
    print(result)

AddTen: 10
Multiply by 2: 20
Convert to string: 40
Result: 40


In [26]:
from langchain.chat_models import ChatOpenAI
from langchain_core.runnables import RunnableParallel
from langchain.schema.runnable import RunnableConfig
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.tracers.stdout import ConsoleCallbackHandler

# Custom callback handler to demonstrate metadata usage
class MetadataTracker(BaseCallbackHandler):
    def on_chain_start(self, serialized, inputs, **kwargs):
        # Access metadata from the config
        metadata = kwargs.get("metadata", {})
        print(f"Chain started with metadata: {metadata}")
        print(f"User ID: {metadata.get('user_id')}")
        print(f"Request ID: {metadata.get('request_id')}")

    def on_chain_end(self, outputs, **kwargs):
        metadata = kwargs.get("metadata", {})
        print(f"Chain completed for request: {metadata.get('request_id')}")

# Initialize components
model = ChatOpenAI()
metadata_tracker = MetadataTracker()

# Example 1: Tracking User Sessions
def process_user_request(user_id: str, request_id: str, query: str):
    config = RunnableConfig(
        metadata={
            "user_id": user_id,
            "request_id": request_id,
            "session_start": "2024-03-06T10:00:00",
            "query_type": "analysis",
            "priority": "high"
        },
        callbacks=[metadata_tracker]
    )

    parallel_chain = RunnableParallel({
        "analysis": lambda x: "Analysis result",
        "summary": lambda x: "Summary result"
    })

    return parallel_chain.invoke({"query": query}, config=config)

# Example 2: Cost Tracking
def process_with_cost_tracking(query: str, department: str):
    config = RunnableConfig(
        metadata={
            "department": department,
            "cost_center": "ML-Team",
            "project_code": "AI-2024",
            "billing_type": "internal"
        },
        callbacks=[metadata_tracker]
    )
    
    chain = RunnableParallel({
        "result": lambda x: "Processing result"
    })
    
    return chain.invoke({"query": query}, config=config)

# Example 3: Debugging and Tracing
def debug_chain_execution(query: str):
    config = RunnableConfig(
        metadata={
            "debug_level": "verbose",
            "trace_id": "trace-123",
            "component_version": "v1.2.3",
            "environment": "development"
        },
        callbacks=[metadata_tracker]
    )
    
    chain = RunnableParallel({
        "result": lambda x: "Debug result"
    })
    
    return chain.invoke({"query": query}, config=config)

# Example 4: A/B Testing
def ab_test_chain(query: str, test_variant: str):
    config = RunnableConfig(
        metadata={
            "test_id": "ab_test_001",
            "variant": test_variant,
            "test_group": "experimental",
            "feature_flags": {"new_model": True}
        },
        callbacks=[metadata_tracker]
    )
    
    chain = RunnableParallel({
        "result": lambda x: f"Result for variant {test_variant}"
    })
    
    return chain.invoke({"query": query}, config=config)

# Usage examples
if __name__ == "__main__":
    # Track user session
    result1 = process_user_request(
        user_id="user123",
        request_id="req456",
        query="analyze this"
    )

    # Track costs
    result2 = process_with_cost_tracking(
        query="process this",
        department="Research"
    )

    # Debug execution
    result3 = debug_chain_execution(
        query="debug this"
    )

    # A/B testing
    result4 = ab_test_chain(
        query="test this",
        test_variant="B"
    )

Chain started with metadata: {'user_id': 'user123', 'request_id': 'req456', 'session_start': '2024-03-06T10:00:00', 'query_type': 'analysis', 'priority': 'high'}
User ID: user123
Request ID: req456
Chain started with metadata: {'user_id': 'user123', 'request_id': 'req456', 'session_start': '2024-03-06T10:00:00', 'query_type': 'analysis', 'priority': 'high'}
User ID: user123
Request ID: req456
Chain completed for request: None
Chain started with metadata: {'user_id': 'user123', 'request_id': 'req456', 'session_start': '2024-03-06T10:00:00', 'query_type': 'analysis', 'priority': 'high'}
User ID: user123
Request ID: req456
Chain completed for request: None
Chain completed for request: None
Chain started with metadata: {'department': 'Research', 'cost_center': 'ML-Team', 'project_code': 'AI-2024', 'billing_type': 'internal'}
User ID: None
Request ID: None
Chain started with metadata: {'department': 'Research', 'cost_center': 'ML-Team', 'project_code': 'AI-2024', 'billing_type': 'internal'}

In [None]:
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
from langchain.chat_models import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

def word_by_word(input_stream: Iterator) -> Iterator[str]:
    for chunk in input_stream:
        text = str(chunk)
        for word in text.split():
            print(f"Yielding: {word}")  # Let's see what's being yielded
            yield word

word_streamer = RunnableGenerator(word_by_word)

# Method 1: Using invoke (consumes all yields and joins them)
result = word_streamer.invoke("Hello world")
print("invoke result:", result)

# Method 2: Using stream (gets iterator)
for word in word_streamer.stream("Hello world"):
    print("stream word:", word)


word: Hello
word: world
word: how
word: are
word: you
Helloworldhowareyou
