##### TransformChain - A chain that applies a Python function (not LLM) to transform input data.

##### Input → Custom Function → Transformed Output

In [1]:
import warnings
warnings.filterwarnings(action="ignore")

In [2]:
from typing import Dict, Any, TypedDict
from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, chain
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv

load_dotenv()

True

In [3]:
llm = HuggingFaceEndpoint(
    repo_id="Qwen/Qwen2.5-7B-Instruct",
    task="text-generation",
    temperature=0.8, 
    top_p=0.95,
    max_new_tokens=512,
)
chat_model = ChatHuggingFace(llm=llm)

In [9]:
# ────────────────────────────────────────────────
# Modern style: using RunnableLambda + @chain decorator
# ────────────────────────────────────────────────

# Option 1: Clean input (most readable style today)
# @chain
def clean_user_input(inputs: Dict[str, Any]) -> Dict[str, Any]:
    """Clean and preprocess user input"""
    text = inputs["user_input"].strip().lower()
    text = " ".join(text.split())  # normalize whitespace
    
    words = text.split()
    keywords = [w for w in words if len(w) > 3][:5]
    
    return {
        "cleaned_input": text,
        "word_count": len(words),
        "potential_keywords": ", ".join(keywords),
        "original_input": text,           # sometimes useful to keep
    }

In [10]:
# ────────────────────────────────────────────────
# Analysis prompt (modern style — often use ChatPromptTemplate)
# ────────────────────────────────────────────────

analysis_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a concise intent & sentiment analyzer.
        Always respond strictly in this format:

        1. Sentiment (positive/negative/neutral)
        2. Main Intent (question/complaint/request/feedback/other)
        3. Urgency Level (low/medium/high)
        4. Suggested Response Category (support/sales/returns/technical/other)"""),
    ("user", """Text: {cleaned_input}
        Word count: {word_count}
        Keywords: {potential_keywords}""")
])

In [11]:
# Most common modern composition style
analysis_chain = (analysis_prompt | chat_model | StrOutputParser())

In [15]:
# ────────────────────────────────────────────────
# Combined pipeline (most popular patterns in 2024/2025)
# ────────────────────────────────────────────────

# Style A — clean & recommended (RunnableLambda + pipe)
clean_and_analyze = (
    RunnableLambda(clean_user_input)
    | analysis_chain
)

# Style B — even more compact with @chain
@chain
def clean_and_analyze_compact(user_input: str) -> str:
    cleaned = clean_user_input.invoke({"user_input": user_input})
    return analysis_chain.invoke(cleaned)

In [16]:
# ────────────────────────────────────────────────
# Usage examples
# ────────────────────────────────────────────────
test_inputs = [
    " HELLO, I need HELP with my ORDER! It's been 5 DAYS!!! ",
    "Can you please tell me about return policy? Thanks.",
    "This product is terrible. Worst experience ever. DO NOT BUY!"
]

print("\n" + "="*65)
print("MODERN LANGCHAIN PIPELINE (2025 style)")
print("="*65)

for i, text in enumerate(test_inputs, 1):
    print(f"\n── Test {i} ──")
    print(f"Input : {text!r}")
    
    # Option A
    result = clean_and_analyze.invoke({"user_input": text})
    print("\nAnalysis:")
    print(result)
    
    # Option B (more direct)
    # result = clean_and_analyze_compact.invoke(text)
    # print(result)


MODERN LANGCHAIN PIPELINE (2025 style)

── Test 1 ──
Input : " HELLO, I need HELP with my ORDER! It's been 5 DAYS!!! "

Analysis:
1. Sentiment: neutral
2. Main Intent: request
3. Urgency Level: high
4. Suggested Response Category: support

── Test 2 ──
Input : 'Can you please tell me about return policy? Thanks.'

Analysis:
1. Positive
2. Request
3. Low
4. Returns

── Test 3 ──
Input : 'This product is terrible. Worst experience ever. DO NOT BUY!'

Analysis:
1. Sentiment (negative)
2. Main Intent (complaint)
3. Urgency Level (high)
4. Suggested Response Category (returns)


In [None]:
from typing import Dict, Any, List
from langchain_core.runnables import RunnableLambda, RunnablePassthrough, chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Replace with your actual LLM
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm = chat_model


# ────────────────────────────────────────────────
# 1. Clean user input
# ────────────────────────────────────────────────
@chain
def clean_user_input(inputs: Dict[str, Any]) -> Dict[str, Any]:
    text = inputs["user_input"].strip().lower()
    text = " ".join(text.split())
    words = text.split()
    keywords = [w for w in words if len(w) > 3][:5]
    
    return {
        "cleaned_input": text,
        "word_count": len(words),
        "potential_keywords": ", ".join(keywords),
        "original_query": inputs["user_input"]
    }


# ────────────────────────────────────────────────
# 2. Format retrieved documents (formerly format_api_response)
# ────────────────────────────────────────────────
@chain
def format_retrieved_docs(inputs: Dict[str, Any]) -> Dict[str, Any]:
    """
    Expecting: inputs["retrieved_docs"] = list of dicts or strings
    Formats them into clean, readable context for the LLM
    """

    print("\n\n\n", inputs, "\n\n\n")
    
    raw_docs = inputs["retrieved_docs"]
    
    if not raw_docs:
        formatted_context = "No relevant information retrieved."
    else:
        formatted_parts = []
        for i, doc in enumerate(raw_docs[:5], 1):  # Limit to top 5
            if isinstance(doc, dict):
                content = doc.get("content") or doc.get("text") or str(doc)
                source = doc.get("source", f"document_{i}")
                score = doc.get("score", doc.get("relevance_score", "N/A"))
            else:
                content = str(doc)
                source = f"document_{i}"
                score = "N/A"
            
            preview = content[:600] + "..." if len(content) > 600 else content
            part = f"""[Document {i}]
Source: {source}
Relevance: {score}

{preview}

"""
            formatted_parts.append(part)
        
        formatted_context = "\n".join(formatted_parts)
    
    return {
        "context": formatted_context,
        "doc_count": len(raw_docs),
        "sources_used": ", ".join(
            [d.get("source", f"doc_{i}") for i, d in enumerate(raw_docs[:5], 1)]
            if raw_docs else ["none"]
        )
    }


# ────────────────────────────────────────────────
# 3. Final RAG + Analysis Chain
# ────────────────────────────────────────────────
rag_analysis_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are an expert customer support analyst.
Use the provided context (retrieved knowledge) and user query to give a precise, helpful response.

Guidelines:
- Base your answer primarily on the retrieved context if relevant.
- If context is irrelevant or empty, say so clearly.
- Always include: sentiment, intent, urgency, and suggested action.
- Be professional and concise.
"""),
    ("human", """User Query: {cleaned_input}
Original: {original_query}
   
Analysis required:
1. Sentiment (positive/negative/neutral)
2. Main Intent (question/complaint/request/feedback/other)
3. Urgency Level (low/medium/high)
4. Suggested Response Category (support/returns/billing/technical/sales/other)
5. Brief Recommended Reply (2-3 sentences max)

If no useful context, note: "No relevant knowledge found." """)
])

final_chain = (
    # Step 1: Clean input
    clean_user_input
    # Step 2: Simulate retrieval (replace this lambda with real retriever)
    | RunnableLambda(lambda x: {
        **x,
        # MOCK RETRIEVAL — replace with actual vectorstore.as_retriever().invoke(...)
        "retrieved_docs": [
            {"content": "Our return policy allows returns within 30 days of purchase with original receipt. Refunds are processed within 5-7 business days.", "source": "returns_policy.md", "score": 0.92},
            {"content": "Orders typically ship within 1-2 business days. Delays may occur during holidays.", "source": "shipping_faq.html", "score": 0.85},
        ] if any(kw in x["cleaned_input"] for kw in ["return", "order", "ship", "delay"]) else []
    })
    # Step 3: Format retrieved docs
    | format_retrieved_docs
    # Step 4: Generate final analysis
    | rag_analysis_prompt
    | llm
    | StrOutputParser()
)


# ────────────────────────────────────────────────
# Test the full pipeline
# ────────────────────────────────────────────────
test_inputs = [
    "HELLO, I need HELP with my ORDER! It's been 5 DAYS!!! ",
    "Can you please tell me about return policy? Thanks.",
    "This product is terrible. Worst experience ever. DO NOT BUY!",
    "What are your opening hours on weekends?"
]

print("\n" + "="*70)
print("FULL RAG + FORMATTING + ANALYSIS PIPELINE (2025 LangChain Style)")
print("="*70)

try:
    for i, user_input in enumerate(test_inputs, 1):
        print(f"\n{'─'*30} TEST CASE {i} {'─'*30}")
        print(f"Input: {user_input!r}")
        
        result = final_chain.invoke({"user_input": user_input})
        
        print("\nFinal Analysis:")
        print(result)
        print("─" * 70)
except Exception as e:
    print(e)