<a href="https://colab.research.google.com/github/d0k7/prospect-to-lead-workflow/blob/main/Analytos_ai.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Securely input your key in Colab

from getpass import getpass
import os

OPENAI_KEY = getpass("Enter your OpenAI API key (will not be shown): ")
os.environ["OPENAI_API_KEY"] = OPENAI_KEY


Enter your OpenAI API key (will not be shown): ··········


In [3]:
# Section 0 — Notes
print("Notebook sections:\n1) Install deps\n2) Create files\n3) Run workflow\n4) Inspect outputs\n5) Run tests\n6) Swap to live example\n7) Submission checklist\n")


Notebook sections:
1) Install deps
2) Create files
3) Run workflow
4) Inspect outputs
5) Run tests
6) Swap to live example
7) Submission checklist



In [4]:
# Section 1 — Install deps
!pip install -q pydantic python-dotenv rich pytest


In [5]:
!pip install openai



In [6]:
%%bash
set -euo pipefail
echo "Writing project files..."

# requirements
cat > requirements.txt <<'REQ'
pydantic
python-dotenv
rich
pytest
REQ

# workflow.json (mock mode)
cat > workflow.json <<'JSON'
{
  "workflow_name": "OutboundLeadGeneration",
  "mode": "live",
  "description": "Demo pipeline: prospect_search -> enrichment -> scoring -> outreach_content -> outreach_executor -> response_tracker -> feedback_trainer (mock)",
  "steps": [
    {
      "id": "prospect_search",
      "agent": "ProspectSearchAgent",
      "inputs": {
        "icp": {
          "industry": "SaaS",
          "location": "USA",
          "employee_count": {"min": 100, "max": 1000}
        },
        "signals": ["recent_funding", "hiring_for_sales"]
      },
      "instructions": "Search for companies matching ICP (mock)."
    },
    {
      "id": "enrichment",
      "agent": "DataEnrichmentAgent",
      "inputs": {"leads": "{{prospect_search.output.leads}}"},
      "instructions": "Enrich leads with domain, title, and tech stack (mock)."
    },
    {
      "id": "scoring",
      "agent": "ScoringAgent",
      "inputs": {"leads": "{{enrichment.output.enriched_leads}}"},
      "instructions": "Score and rank leads (mock)."
    },
    {
      "id": "outreach_content",
      "agent": "OutreachContentAgent",
      "inputs": {"ranked_leads": "{{scoring.output.ranked_leads}}", "persona": "SDR", "tone": "friendly"},
      "instructions": "Generate outreach messages (mock)."
    },
    {
      "id": "outreach_executor",
      "agent": "OutreachExecutorAgent",
      "inputs": {"messages": "{{outreach_content.output.messages}}"},
      "instructions": "Send or queue outreach (mock)."
    },
    {
      "id": "response_tracker",
      "agent": "ResponseTrackerAgent",
      "inputs": {"sent": "{{outreach_executor.output.sent}}"},
      "instructions": "Check responses and collect metrics (mock)."
    },
    {
      "id": "feedback_trainer",
      "agent": "FeedbackTrainerAgent",
      "inputs": {"metrics": "{{response_tracker.output.metrics}}", "messages": "{{outreach_content.output.messages}}"},
      "instructions": "Analyze and propose improvements (mock)."
    }
  ]
}
JSON

# Create agents directory
mkdir -p agents

# agents/base_agent.py
cat > agents/base_agent.py <<'PY'
"""Base Agent abstraction for the LangGraph pipeline."""

from abc import ABC, abstractmethod
from typing import Dict, Any
import logging

logger = logging.getLogger("langgraph")
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(levelname)s [%(name)s] %(message)s")
handler.setFormatter(formatter)
if not logger.handlers:
    logger.addHandler(handler)
logger.setLevel(logging.INFO)

class BaseAgent(ABC):
    """Abstract base class for all agents.

    Each agent receives a config (the step dict from workflow.json) and inputs (resolved).
    The agent returns a structured JSON-serializable dict as output.
    """

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.step_id = config.get("id", "unknown")

    @abstractmethod
    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Run agent logic and return a dict."""
        raise NotImplementedError
PY

# agents/prospect_search.py
cat > agents/prospect_search.py <<'PY'
"""Prospect search (mock)"""

from .base_agent import BaseAgent
from typing import Dict, Any, List
import random

class ProspectSearchAgent(BaseAgent):
    """Mock prospect search; in production replace with Clay/Apollo calls."""

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        icp = inputs.get("icp", {})
        signals = inputs.get("signals", ["recent_funding"])
        companies = ["Acme SaaS", "Nimbus Analytics", "PulseSoft", "BrightLayer"]
        leads: List[Dict[str, Any]] = []
        for i, comp in enumerate(companies):
            leads.append({
                "company": comp,
                "contact_name": f"{comp.split()[0]} Founder",
                "email": f"founder@{comp.replace(' ','').lower()}.com",
                "linkedin": f"https://linkedin.com/in/{comp.replace(' ','').lower()}",
                "signal": random.choice(signals)
            })
        return {"leads": leads}
PY

# agents/data_enrichment.py
cat > agents/data_enrichment.py <<'PY'
"""Data enrichment (mock)"""

from .base_agent import BaseAgent
from typing import Dict, Any, List
import random

class DataEnrichmentAgent(BaseAgent):
    """Mock enrichment: add domain, title, tech_stack."""

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        leads = inputs.get("leads", [])
        enriched: List[Dict[str, Any]] = []
        for lead in leads:
            domain = lead.get("company", "").replace(" ", "").lower() + ".com"
            lead_enriched = {**lead,
                             "domain": domain,
                             "title": random.choice(["CEO", "Head of GTM", "VP Sales"]),
                             "tech_stack": random.sample(["aws", "gcp", "stripe", "segment", "postgres"], 2)}
            enriched.append(lead_enriched)
        return {"enriched_leads": enriched}
PY

# agents/scoring.py
cat > agents/scoring.py <<'PY'
"""Scoring agent (mock)"""

from .base_agent import BaseAgent
from typing import Dict, Any, List

class ScoringAgent(BaseAgent):
    """Simple rule-based scoring for demo."""

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        leads = inputs.get("leads", [])
        ranked: List[Dict[str, Any]] = []
        for lead in leads:
            score = 0
            if lead.get("signal") == "recent_funding":
                score += 40
            tech_stack = lead.get("tech_stack", [])
            score += min(20, 5 * len(tech_stack))
            if len(lead.get("company", "")) < 20:
                score += 10
            lead["score"] = score
            ranked.append(lead)
        ranked.sort(key=lambda x: x["score"], reverse=True)
        return {"ranked_leads": ranked}
PY

# agents/outreach_content.py
cat > agents/outreach_content.py <<'PY'
"""Outreach content generator (mock). Replace with OpenAI integration for live mode."""

from .base_agent import BaseAgent
from typing import Dict, Any, List

class OutreachContentAgent(BaseAgent):
    """Generates email subject + body from ranked leads."""
    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        ranked = inputs.get("ranked_leads", [])
        persona = inputs.get("persona", "SDR")
        messages: List[Dict[str, Any]] = []
        for lead in ranked:
            subject = f"Quick question about {lead['company']}"
            body = (
                f"Hi {lead['contact_name']},\n\n"
                f"I saw {lead['company']} is {lead.get('signal')} and using {', '.join(lead.get('tech_stack', []))}. "
                "We help teams like yours reduce CAC and improve activation. Would you be open to a quick 15-min chat next week?\n\n"
                f"Best,\n{persona}"
            )
            messages.append({"lead_company": lead["company"], "email_subject": subject, "email_body": body})
        return {"messages": messages}
PY

# agents/outreach_executor.py
cat > agents/outreach_executor.py <<'PY'
"""Outreach executor (mock): pretend to send emails / queue them."""

from .base_agent import BaseAgent
from typing import Dict, Any, List
import time

class OutreachExecutorAgent(BaseAgent):
    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        messages = inputs.get("messages", [])
        sent: List[Dict[str, Any]] = []
        for msg in messages:
            # simulate send latency
            time.sleep(0.02)
            sent.append({"to_company": msg["lead_company"], "status": "queued", "subject": msg["email_subject"]})
        return {"sent": sent}
PY

# agents/response_tracker.py
cat > agents/response_tracker.py <<'PY'
"""Response tracker (mock): generate simple metrics."""

from .base_agent import BaseAgent
from typing import Dict, Any
import random

class ResponseTrackerAgent(BaseAgent):
    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        sent = inputs.get("sent", [])
        opens = random.randint(10, 50)
        clicks = random.randint(0, 10)
        replies = random.randint(0, 5)
        return {"metrics": {"opens": opens, "clicks": clicks, "replies": replies}}
PY

# agents/feedback_trainer.py
cat > agents/feedback_trainer.py <<'PY'
"""Feedback trainer (mock): produce recommendations from metrics."""

from .base_agent import BaseAgent
from typing import Dict, Any, List

class FeedbackTrainerAgent(BaseAgent):
    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        metrics = inputs.get("metrics", {})
        messages = inputs.get("messages", [])
        recs: List[str] = []
        opens = metrics.get("opens", 0)
        replies = metrics.get("replies", 0)
        if opens < 20:
            recs.append("Subject lines are weak — test 3 new variants focused on outcomes.")
        if replies < 2:
            recs.append("Messages may be too generic — add specific case study + CTA variation.")
        if not recs:
            recs.append("Performance looks good — continue current cadence.")
        # In prod: write to Google Sheets or trigger human approval flow.
        return {"recommendations": recs}
PY

# langgraph_builder.py (high-quality)
cat > langgraph_builder.py <<'PY'
"""LangGraph builder & runner.

- Validates the workflow (lightweight)
- Resolves {{step.output.field}} templates across steps
- Dynamically instantiates agents from agents/ package
- Executes steps sequentially and saves demo_output.json
"""

from typing import Any, Dict, Optional
from pathlib import Path
import json
import re
import importlib
import os
from pydantic import BaseModel, Field, ValidationError
from rich.console import Console

console = Console()
TEMPLATE_RE = re.compile(r"\{\{([^}]+)\}\}")

class StepModel(BaseModel):
    id: str
    agent: str
    inputs: Optional[Dict[str, Any]] = Field(default_factory=dict)
    instructions: Optional[str] = None

class WorkflowModel(BaseModel):
    workflow_name: str
    mode: Optional[str] = "mock"
    description: Optional[str] = None
    steps: list[StepModel]

def load_workflow(path: str = "workflow.json") -> WorkflowModel:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"workflow.json not found at {p.resolve()}")
    raw = json.loads(p.read_text())
    return WorkflowModel(**raw)

def camel_to_snake(name: str) -> str:
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    s2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
    return s2.replace('__', '_').lower()

def resolve_template_string(s: str, context: Dict[str, Any]) -> str:
    def repl(m):
        ref = m.group(1).strip()
        parts = ref.split('.')
        cur = context
        try:
            for p in parts:
                if isinstance(cur, list) and p.isdigit():
                    cur = cur[int(p)]
                else:
                    cur = cur[p]
            # if complex, return JSON string to be parsed later
            return json.dumps(cur) if isinstance(cur, (dict, list)) else str(cur)
        except Exception:
            return m.group(0)
    return TEMPLATE_RE.sub(repl, s)

def deep_resolve(obj, context):
    if isinstance(obj, str):
        return resolve_template_string(obj, context)
    if isinstance(obj, dict):
        return {k: deep_resolve(v, context) for k, v in obj.items()}
    if isinstance(obj, list):
        return [deep_resolve(v, context) for v in obj]
    return obj

def instantiate_agent(agent_name: str, config: Dict[str, Any]):
    module_slug = camel_to_snake(agent_name).replace('_agent','')
    module_path = f"agents.{module_slug}"
    mod = importlib.import_module(module_path)
    cls = getattr(mod, agent_name)
    return cls(config)

def run_workflow(workflow: WorkflowModel, save_output: bool = True) -> Dict[str, Any]:
    mode = workflow.mode or os.getenv("WORKFLOW_MODE", "mock")
    console.print(f"[bold]Workflow:[/bold] {workflow.workflow_name}  [bold]Mode:[/bold] {mode}\n")
    context: Dict[str, Any] = {"config": workflow.dict()}
    outputs: Dict[str, Any] = {}

    for step in workflow.steps:
        sid = step.id
        agent_name = step.agent
        console.rule(f"[blue]Step: {sid} -> {agent_name}")
        raw_inputs = step.inputs or {}
        resolution_context = {**context, **outputs}
        resolved_inputs = deep_resolve(raw_inputs, resolution_context)

        # parse JSON strings back to objects where applicable
        def try_parse(v):
            if isinstance(v, str):
                try:
                    return json.loads(v)
                except Exception:
                    return v
            return v
        if isinstance(resolved_inputs, dict):
            resolved_inputs = {k: try_parse(v) for k, v in resolved_inputs.items()}

        console.print(f"[yellow]Resolved inputs for {sid}:[/yellow]")
        console.print_json(json.dumps(resolved_inputs, default=str))

        try:
            agent = instantiate_agent(agent_name, step.dict())
        except Exception as e:
            console.print(f"[red]Agent import failed for {agent_name}: {e}[/red]")
            raise

        out = agent.run(resolved_inputs)
        if not isinstance(out, dict):
            raise TypeError(f"Agent {agent_name} must return a dict; got {type(out)}")
        outputs[sid] = {"output": out}
        console.print(f"[green]Output from {sid}:[/green]")
        console.print_json(json.dumps(out, default=str))

    if save_output:
        Path("demo_output.json").write_text(json.dumps(outputs, indent=2, default=str))
        console.print(f"[bold]Saved outputs to demo_output.json[/bold]")

    return outputs

def main():
    wf = load_workflow()
    run_workflow(wf)

if __name__ == "__main__":
    main()
PY

# README
cat > README.md <<'MD'
# Prospect-to-Lead LangGraph Demo (Mock)

High-quality, modular mock pipeline you can run in Colab or locally.

## Quick start
1. Install: pip install -r requirements.txt
2. Run: python langgraph_builder.py
3. Output: demo_output.json

## How to convert to live
- Implement the real API calls inside individual agents (e.g., agents/outreach_content.py -> OpenAI)
- Use environment variables or secret manager for keys
- Add tests and CI
MD

# tests/test_pipeline.py (basic smoke tests)
mkdir -p tests
cat > tests/test_pipeline.py <<'PYT'
import json
from pathlib import Path
from langgraph_builder import load_workflow, run_workflow

def test_workflow_load_and_run():
    wf = load_workflow("workflow.json")
    outputs = run_workflow(wf, save_output=False)
    assert isinstance(outputs, dict)
    # ensure expected step keys exist
    assert "prospect_search" in outputs
    assert "enrichment" in outputs
PYT

echo "Files written."


Writing project files...
Files written.


In [7]:
# Section 3 (Python-style)
# Run pipeline
!python3 langgraph_builder.py

# Print a short snippet of the output file
from pathlib import Path
out_path = Path("demo_output.json")
if out_path.exists():
    print("------ demo_output.json snippet ------")
    print(out_path.read_text()[:1000])
else:
    print("demo_output.json not found. Make sure Section 2 (file creation) ran successfully.")


[1mWorkflow:[0m OutboundLeadGeneration  [1mMode:[0m live

/content/langgraph_builder.py:81: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  context: Dict[str, Any] = {"config": workflow.dict()}
[92m───────────────── [0m[34mStep: prospect_search -> ProspectSearchAgent[0m[92m ─────────────────[0m
[33mResolved inputs for prospect_search:[0m
[1m{[0m
  [1;34m"icp"[0m: [1m{[0m
    [1;34m"industry"[0m: [32m"SaaS"[0m,
    [1;34m"location"[0m: [32m"USA"[0m,
    [1;34m"employee_count"[0m: [1m{[0m
      [1;34m"min"[0m: [1;36m100[0m,
      [1;34m"max"[0m: [1;36m1000[0m
    [1m}[0m
  [1m}[0m,
  [1;34m"signals"[0m: [1m[[0m
    [32m"recent_funding"[0m,
    [32m"hiring_for_sales"[0m
  [1m][0m
[1m}[0m
/content/langgraph_builder.py:107: PydanticDeprecatedSince20: The `dict` method is

In [8]:
# Section 4 — Inspect outputs
import json
from rich.console import Console
from rich.table import Table
console = Console()

with open("demo_output.json") as f:
    outputs = json.load(f)

# Print simple metric summary if present
metrics = outputs.get("response_tracker", {}).get("output", {}).get("metrics", {})
table = Table(title="Campaign Metrics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="magenta")
if metrics:
    for k, v in metrics.items():
        table.add_row(k, str(v))
else:
    table.add_row("metrics", "not found")

console.print(table)
# Print recommendations
recs = outputs.get("feedback_trainer", {}).get("output", {}).get("recommendations", [])
console.print("\n[bold]Recommendations:[/bold]")
for r in recs:
    console.print(f"- {r}")


In [9]:
%%bash
set -e
cat > tests/conftest.py <<'PY'
import os, sys
from pathlib import Path

# Ensure project root is on sys.path so tests can import modules like langgraph_builder
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))
PY

# Run pytest
pytest -q


.                                                                        [100%]
1 passed in 0.20s


In [10]:
# Section 5 — Run tests
!pytest -q


[32m.[0m[32m                                                                        [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.19s[0m[0m


In [11]:
# install official client
!pip install -q openai


In [12]:
# Paste and run this entire cell in Colab (Python cell).
from pathlib import Path
import textwrap, os, subprocess, json, sys

# 1) Robust runner: catch step errors and continue (save error in outputs)
langgraph_code = textwrap.dedent("""
from typing import Any, Dict
from pathlib import Path
import json, re, importlib, os
from pydantic import BaseModel, Field
from rich.console import Console

console = Console()
TEMPL_RE = re.compile(r"\\{\\{([^}]+)\\}\\}")

class StepModel(BaseModel):
    id: str
    agent: str
    inputs: Dict[str, Any] = Field(default_factory=dict)
    instructions: str = None

class WorkflowModel(BaseModel):
    workflow_name: str
    mode: str = "mock"
    description: str = None
    steps: list[StepModel]

def load_workflow(path: str = "workflow.json") -> WorkflowModel:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"workflow.json not found at {p.resolve()}")
    raw = json.loads(p.read_text())
    return WorkflowModel(**raw)

def resolve_template_string(s: str, context: Dict[str, Any]):
    def repl(m):
        ref = m.group(1).strip()
        parts = ref.split('.')
        cur = context
        try:
            for p in parts:
                if isinstance(cur, list) and p.isdigit():
                    cur = cur[int(p)]
                else:
                    cur = cur[p]
            return json.dumps(cur) if isinstance(cur, (dict, list)) else str(cur)
        except Exception:
            return m.group(0)
    return TEMPL_RE.sub(repl, s)

def deep_resolve(obj, context):
    if isinstance(obj, str):
        return resolve_template_string(obj, context)
    if isinstance(obj, dict):
        return {k: deep_resolve(v, context) for k, v in obj.items()}
    if isinstance(obj, list):
        return [deep_resolve(v, context) for v in obj]
    return obj

def camel_to_snake(name: str) -> str:
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\\1_\\2', name)
    s2 = re.sub('([a-z0-9])([A-Z])', r'\\1_\\2', s1)
    return s2.replace('__','_').lower()

def instantiate_agent(agent_name: str, config: Dict[str, Any]):
    module_slug = camel_to_snake(agent_name).replace('_agent','')
    module_path = f"agents.{module_slug}"
    mod = importlib.import_module(module_path)
    cls = getattr(mod, agent_name)
    return cls(config)

def run_workflow(workflow: WorkflowModel, save_output: bool = True) -> Dict[str, Any]:
    mode = workflow.mode or os.getenv("WORKFLOW_MODE","mock")
    console.print(f"[bold]Workflow:[/bold] {workflow.workflow_name}  [bold]Mode:[/bold] {mode}\\n")
    context = {"config": workflow.model_dump()}
    outputs = {}
    for step in workflow.steps:
        sid = step.id
        agent_name = step.agent
        console.rule(f"[blue]Step: {sid} -> {agent_name}")
        raw_inputs = step.inputs or {}
        resolved_inputs = deep_resolve(raw_inputs, {**context, **outputs})
        # parse JSON strings
        def try_parse(v):
            if isinstance(v, str):
                try:
                    return json.loads(v)
                except Exception:
                    return v
            return v
        if isinstance(resolved_inputs, dict):
            resolved_inputs = {k: try_parse(v) for k, v in resolved_inputs.items()}
        console.print(f"[yellow]Resolved inputs for {sid}:[/yellow]")
        console.print_json(json.dumps(resolved_inputs, default=str))
        try:
            agent = instantiate_agent(agent_name, step.model_dump())
        except Exception as e:
            console.print(f"[red]Agent import failed for {agent_name}: {e}[/red]")
            outputs[sid] = {"output": {"error": f"import_error: {e}"}}
            continue
        try:
            out = agent.run(resolved_inputs)
            outputs[sid] = {"output": out}
            console.print(f"[green]Output from {sid}:[/green]")
            console.print_json(json.dumps(out, default=str))
        except Exception as e:
            # Log the error for this step and continue the next steps
            console.print(f"[red]Step {sid} failed: {e}[/red]")
            outputs[sid] = {"output": {"error": str(e)}}
            continue

    if save_output:
        Path("demo_output.json").write_text(json.dumps(outputs, indent=2, default=str))
        console.print(f"[bold]Saved outputs to demo_output.json[/bold]")
    return outputs

def main():
    wf = load_workflow()
    run_workflow(wf)

if __name__ == "__main__":
    main()
""")

Path("langgraph_builder.py").write_text(langgraph_code, encoding="utf-8")
print("Patched langgraph_builder.py -> per-step errors will no longer abort the whole run.")

# 2) Hardened outreach_content.py: batching, small sleep, cheap model, fallback per lead
outreach_code = textwrap.dedent('''\
"""Hardened OutreachContentAgent: batching + fallback-first behavior."""

from typing import Dict, Any, List
import os, time, json, re, logging
from agents.base_agent import BaseAgent

logger = logging.getLogger("langgraph")

try:
    from openai import OpenAI
    try:
        from openai.error import RateLimitError
    except Exception:
        RateLimitError = Exception
except Exception:
    OpenAI = None
    RateLimitError = Exception

class OutreachContentAgent(BaseAgent):
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.api_key = os.getenv("OPENAI_API_KEY")
        self.client = None
        if OpenAI is not None and self.api_key:
            try:
                self.client = OpenAI(api_key=self.api_key)
            except Exception as e:
                logger.warning(f"OpenAI client init failed: {e}; will fallback.")
                self.client = None
        # prefer low-cost model by default
        self.model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
        self.max_tokens = int(os.getenv("OPENAI_MAX_TOKENS", "120"))
        self.batch_sleep = float(os.getenv("OUTREACH_BATCH_SLEEP", "0.6"))  # seconds between calls
        self.max_retries = 1  # keep retries small to avoid cost/quota pressure

    def _build_prompt(self, lead: Dict[str, Any], persona: str, tone: str) -> str:
        return (
            f"You are an outbound SDR writing a short cold email (subject + one short paragraph body) "
            f"for the {persona} persona. Tone: {tone}.\\n\\n"
            f"Lead company: {lead.get('company')}\\n"
            f"Contact name: {lead.get('contact_name')}\\n"
            f"Relevant signals: {lead.get('signal')}\\n"
            f"Tech stack: {', '.join(lead.get('tech_stack', []))}\\n\\n"
            "Requirements:\\n"
            "- Subject: 3-6 words emphasizing outcome.\\n"
            "- Body: <= 80 words, short social proof clause, single CTA proposing a 15-min call.\\n"
            "- Output JSON ONLY with keys: subject, body.\\n\\n"
            "Now produce the JSON only (no extra text)."
        )

    def _extract_json(self, raw: str) -> Dict[str, str]:
        try:
            return json.loads(raw)
        except Exception:
            m = re.search(r"(\\{.*\\})", raw, flags=re.S)
            if m:
                try:
                    return json.loads(m.group(1))
                except Exception:
                    pass
        return {}

    def _call_openai_once(self, prompt: str) -> str:
        if self.client is None:
            raise RuntimeError("OpenAI client not configured.")
        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role":"system","content":"You are a helpful assistant that outputs JSON."},
                {"role":"user","content":prompt}
            ],
            max_tokens=self.max_tokens,
            temperature=0.2,
            timeout=30
        )
        # parse common shapes:
        if hasattr(resp, "choices") and len(resp.choices) > 0:
            choice = resp.choices[0]
            if hasattr(choice, "message") and getattr(choice.message, "content", None):
                return choice.message.content
            if getattr(choice, "text", None):
                return choice.text
        if getattr(resp, "output_text", None):
            return resp.output_text
        if getattr(resp, "text", None):
            return resp.text
        return str(resp)

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        ranked = inputs.get("ranked_leads", [])
        persona = inputs.get("persona", "SDR")
        tone = inputs.get("tone", "friendly")
        messages: List[Dict[str, str]] = []
        # Process one lead at a time to minimize concurrency & quota pressure
        for lead in ranked:
            subject, body = None, None
            # If no client, use deterministic fallback immediately
            if not self.client:
                subject = f"Quick question about {lead.get('company')}"
                body = (
                    f"Hi {lead.get('contact_name')},\\n\\n"
                    f"I noticed {lead.get('company')} is {lead.get('signal')} and using {', '.join(lead.get('tech_stack', []))}. "
                    "Would you be open to a quick 15-min chat?\\n\\nBest,\\n" + persona
                )
            else:
                prompt = self._build_prompt(lead, persona, tone)
                try:
                    raw = self._call_openai_once(prompt)
                    parsed = self._extract_json(raw)
                    subject = parsed.get("subject") or f"Quick question about {lead.get('company')}"
                    body = parsed.get("body") or (
                        f"Hi {lead.get('contact_name')},\\n\\nQuick note: would you be open to a 15-min chat?\\n\\nBest,\\n{persona}"
                    )
                    # sleep between calls to be polite and avoid burst rate limiting
                    time.sleep(self.batch_sleep)
                except Exception as e:
                    # If OpenAI returns 429 or any error, log and fallback for this lead
                    logger.error(f"OpenAI generation failed for {lead.get('company')}: {e}")
                    subject = f"Quick question about {lead.get('company')}"
                    body = (
                        f"Hi {lead.get('contact_name')},\\n\\n"
                        f"I noticed {lead.get('company')} is {lead.get('signal')} and using {', '.join(lead.get('tech_stack', []))}. "
                        "Would you be open to a quick 15-min chat?\\n\\nBest,\\n" + persona
                    )
            messages.append({"lead_company": lead.get("company"), "email_subject": subject.strip(), "email_body": body.strip()})
        return {"messages": messages}
''')

Path("agents/outreach_content.py").write_text(outreach_code, encoding="utf-8")
print("Wrote hardened agents/outreach_content.py (batching + fallback).")

# 3) Set safer runtime env to reduce chance of 429s
os.environ["OPENAI_MODEL"] = os.environ.get("OPENAI_MODEL", "gpt-3.5-turbo")
os.environ["OPENAI_MAX_TOKENS"] = os.environ.get("OPENAI_MAX_TOKENS", "120")
os.environ["OUTREACH_BATCH_SLEEP"] = os.environ.get("OUTREACH_BATCH_SLEEP", "0.6")
os.environ["WORKFLOW_MODE"] = "live"  # you want live

print("Environment set: OPENAI_MODEL=", os.environ["OPENAI_MODEL"],
      "OPENAI_MAX_TOKENS=", os.environ["OPENAI_MAX_TOKENS"],
      "OUTREACH_BATCH_SLEEP=", os.environ["OUTREACH_BATCH_SLEEP"],
      "WORKFLOW_MODE=", os.environ["WORKFLOW_MODE"])

# 4) Run the pipeline (uses your session's OPENAI_API_KEY)
print("\\nRunning langgraph_builder.py ... (this will attempt live generation but will fallback on failures)\\n")
proc = subprocess.run([sys.executable, "langgraph_builder.py"], text=True, capture_output=True, env=os.environ)

print("=== STDOUT ===")
print(proc.stdout[:10000])
print("=== STDERR ===")
print(proc.stderr[:10000])

# 5) Print demo_output.json if produced
demo = Path("demo_output.json")
if demo.exists():
    print("\\n--- demo_output.json ---\\n")
    print(demo.read_text()[:2000])
else:
    print("\\nNo demo_output.json produced. Check stderr above.")


Patched langgraph_builder.py -> per-step errors will no longer abort the whole run.
Wrote hardened agents/outreach_content.py (batching + fallback).
Environment set: OPENAI_MODEL= gpt-3.5-turbo OPENAI_MAX_TOKENS= 120 OUTREACH_BATCH_SLEEP= 0.6 WORKFLOW_MODE= live
\nRunning langgraph_builder.py ... (this will attempt live generation but will fallback on failures)\n
=== STDOUT ===
Workflow: OutboundLeadGeneration  Mode: live

───────────────── Step: prospect_search -> ProspectSearchAgent ─────────────────
Resolved inputs for prospect_search:
{
  "icp": {
    "industry": "SaaS",
    "location": "USA",
    "employee_count": {
      "min": 100,
      "max": 1000
    }
  },
  "signals": [
    "recent_funding",
    "hiring_for_sales"
  ]
}
Output from prospect_search:
{
  "leads": [
    {
      "company": "Acme SaaS",
      "contact_name": "Acme Founder",
      "email": "founder@acmesaas.com",
      "linkedin": "https://linkedin.com/in/acmesaas",
      "signal": "hiring_for_sales"
    },
    {

In [13]:
# Pretty-print the generated messages for demo (Colab Python cell)
import json
from rich.table import Table
from rich.console import Console
from pathlib import Path

console = Console()
p = Path("demo_output.json")
if not p.exists():
    print("demo_output.json not found. Run the pipeline first.")
else:
    data = json.load(p.open())
    msgs = data.get("outreach_content", {}).get("output", {}).get("messages", [])
    t = Table("Company", "Subject", "Body (truncated)")
    for m in msgs:
        body = m.get("email_body", "").replace("\n", " ")
        t.add_row(m.get("lead_company",""), m.get("email_subject",""), body[:140] + ("..." if len(body)>140 else ""))
    console.print(t)


In [14]:
# Decrease cost + throttle calls and re-run pipeline
import os, subprocess
os.environ["OPENAI_MODEL"] = "gpt-3.5-turbo"      # cheaper
os.environ["OPENAI_MAX_TOKENS"] = "80"           # smaller responses
os.environ["OUTREACH_BATCH_SLEEP"] = "1.0"       # 1 second between leads
os.environ["WORKFLOW_MODE"] = "live"
print("Settings applied. Re-running pipeline now (this will attempt live gen but throttle requests).")

proc = subprocess.run(["python3","langgraph_builder.py"], text=True, capture_output=True, env=os.environ)
print("=== STDOUT ===")
print(proc.stdout[:8000])
print("=== STDERR ===")
print(proc.stderr[:8000])


Settings applied. Re-running pipeline now (this will attempt live gen but throttle requests).
=== STDOUT ===
Workflow: OutboundLeadGeneration  Mode: live

───────────────── Step: prospect_search -> ProspectSearchAgent ─────────────────
Resolved inputs for prospect_search:
{
  "icp": {
    "industry": "SaaS",
    "location": "USA",
    "employee_count": {
      "min": 100,
      "max": 1000
    }
  },
  "signals": [
    "recent_funding",
    "hiring_for_sales"
  ]
}
Output from prospect_search:
{
  "leads": [
    {
      "company": "Acme SaaS",
      "contact_name": "Acme Founder",
      "email": "founder@acmesaas.com",
      "linkedin": "https://linkedin.com/in/acmesaas",
      "signal": "hiring_for_sales"
    },
    {
      "company": "Nimbus Analytics",
      "contact_name": "Nimbus Founder",
      "email": "founder@nimbusanalytics.com",
      "linkedin": "https://linkedin.com/in/nimbusanalytics",
      "signal": "recent_funding"
    },
    {
      "company": "PulseSoft",
      "cont

In [15]:
# Overwrite outreach_content with jittered backoff implementation
from pathlib import Path
code = r'''
"""OutreachContentAgent with exponential backoff + jitter."""

from typing import Dict, Any, List
import os, time, json, re, random, logging
from agents.base_agent import BaseAgent

logger = logging.getLogger("langgraph")

try:
    from openai import OpenAI
    try:
        from openai.error import RateLimitError
    except Exception:
        RateLimitError = Exception
except Exception:
    OpenAI = None
    RateLimitError = Exception

class OutreachContentAgent(BaseAgent):
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.api_key = os.getenv("OPENAI_API_KEY")
        self.client = None
        if OpenAI is not None and self.api_key:
            try:
                self.client = OpenAI(api_key=self.api_key)
            except Exception as e:
                logger.warning(f"OpenAI client init failed: {e}; will fallback.")
                self.client = None
        self.model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
        self.max_tokens = int(os.getenv("OPENAI_MAX_TOKENS", "120"))
        self.batch_sleep = float(os.getenv("OUTREACH_BATCH_SLEEP", "0.6"))
        self.max_attempts = int(os.getenv("OUTREACH_MAX_ATTEMPTS", "3"))
        self.base_backoff = float(os.getenv("OUTREACH_BASE_BACKOFF", "1.0"))

    def _build_prompt(self, lead: Dict[str, Any], persona: str, tone: str) -> str:
        return (
            f"You are an outbound SDR writing a short cold email (subject + one short paragraph body) "
            f"for the {persona} persona. Tone: {tone}.\\n\\n"
            f"Lead company: {lead.get('company')}\\n"
            f"Contact name: {lead.get('contact_name')}\\n"
            f"Relevant signals: {lead.get('signal')}\\n"
            f"Tech stack: {', '.join(lead.get('tech_stack', []))}\\n\\n"
            "Requirements:\\n- Subject: 3-6 words emphasizing outcome.\\n"
            "- Body: <= 80 words, short social proof clause, single CTA.\\n"
            "- Output JSON ONLY with keys: subject, body.\\n\\n"
            "Now produce the JSON only."
        )

    def _extract_json(self, raw: str) -> Dict[str, str]:
        try:
            return json.loads(raw)
        except Exception:
            m = re.search(r"(\\{.*\\})", raw, flags=re.S)
            if m:
                try:
                    return json.loads(m.group(1))
                except Exception:
                    pass
        return {}

    def _call_with_backoff(self, prompt: str) -> str:
        if self.client is None:
            raise RuntimeError("OpenAI client not configured.")
        last_exc = None
        for attempt in range(1, self.max_attempts + 1):
            try:
                resp = self.client.chat.completions.create(
                    model=self.model,
                    messages=[{"role":"system","content":"You are a helpful assistant that outputs JSON."},
                              {"role":"user","content":prompt}],
                    max_tokens=self.max_tokens,
                    temperature=0.2,
                    timeout=30
                )
                if hasattr(resp, "choices") and len(resp.choices) > 0:
                    choice = resp.choices[0]
                    if hasattr(choice, "message") and getattr(choice.message, "content", None):
                        return choice.message.content
                    if getattr(choice, "text", None):
                        return choice.text
                if getattr(resp, "output_text", None):
                    return resp.output_text
                if getattr(resp, "text", None):
                    return resp.text
                return str(resp)
            except Exception as e:
                last_exc = e
                # if fatal quota error, stop retrying
                if isinstance(e, RateLimitError) or getattr(e, "http_status", None) == 429 or getattr(e, "code", None) == "insufficient_quota":
                    logger.error(f"Fatal OpenAI rate/quota error: {e}")
                    break
                backoff = self.base_backoff * (2 ** (attempt - 1))
                jitter = random.uniform(0, backoff * 0.5)
                wait = backoff + jitter
                logger.warning(f"OpenAI call failed attempt {attempt}: {e}. Backing off {wait:.2f}s.")
                time.sleep(wait)
        raise last_exc

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        ranked = inputs.get("ranked_leads", [])
        persona = inputs.get("persona", "SDR")
        tone = inputs.get("tone", "friendly")
        messages: List[Dict[str, str]] = []
        for lead in ranked:
            if not self.client:
                # deterministic fallback
                subject = f"Quick question about {lead.get('company')}"
                body = (f"Hi {lead.get('contact_name')},\\n\\n"
                        f"I noticed {lead.get('company')} is {lead.get('signal')} and using {', '.join(lead.get('tech_stack', []))}. "
                        "Would you be open to a quick 15-min chat?\\n\\nBest,\\n" + persona)
            else:
                prompt = self._build_prompt(lead, persona, tone)
                try:
                    raw = self._call_with_backoff(prompt)
                    parsed = self._extract_json(raw)
                    subject = parsed.get("subject") or f"Quick question about {lead.get('company')}"
                    body = parsed.get("body") or (f"Hi {lead.get('contact_name')},\\n\\nQuick note: would you be open to a 15-min chat?\\n\\nBest,\\n{persona}")
                except Exception as e:
                    logger.error(f"OpenAI generation failed for {lead.get('company')}: {e}")
                    subject = f"Quick question about {lead.get('company')}"
                    body = (f"Hi {lead.get('contact_name')},\\n\\n"
                            f"I noticed {lead.get('company')} is {lead.get('signal')} and using {', '.join(lead.get('tech_stack', []))}. "
                            "Would you be open to a quick 15-min chat?\\n\\nBest,\\n" + persona)
            messages.append({"lead_company": lead.get("company"), "email_subject": subject.strip(), "email_body": body.strip()})
            time.sleep(self.batch_sleep)
        return {"messages": messages}
'''
Path("agents/outreach_content.py").write_text(code, encoding="utf-8")
print("Wrote outreach_content.py with jittered backoff. Re-run the pipeline when ready.")


Wrote outreach_content.py with jittered backoff. Re-run the pipeline when ready.


In [16]:
# Run pipeline in live mode
import os
os.environ["WORKFLOW_MODE"] = "live"  # ensure langgraph_builder sees live
!python3 langgraph_builder.py


[1mWorkflow:[0m OutboundLeadGeneration  [1mMode:[0m live

[92m───────────────── [0m[34mStep: prospect_search -> ProspectSearchAgent[0m[92m ─────────────────[0m
[33mResolved inputs for prospect_search:[0m
[1m{[0m
  [1;34m"icp"[0m: [1m{[0m
    [1;34m"industry"[0m: [32m"SaaS"[0m,
    [1;34m"location"[0m: [32m"USA"[0m,
    [1;34m"employee_count"[0m: [1m{[0m
      [1;34m"min"[0m: [1;36m100[0m,
      [1;34m"max"[0m: [1;36m1000[0m
    [1m}[0m
  [1m}[0m,
  [1;34m"signals"[0m: [1m[[0m
    [32m"recent_funding"[0m,
    [32m"hiring_for_sales"[0m
  [1m][0m
[1m}[0m
[32mOutput from prospect_search:[0m
[1m{[0m
  [1;34m"leads"[0m: [1m[[0m
    [1m{[0m
      [1;34m"company"[0m: [32m"Acme SaaS"[0m,
      [1;34m"contact_name"[0m: [32m"Acme Founder"[0m,
      [1;34m"email"[0m: [32m"founder@acmesaas.com"[0m,
      [1;34m"linkedin"[0m: [32m"https://linkedin.com/in/acmesaas"[0m,
      [1;34m"signal"[0m: [32m"hiring_for_sales"[0