<a href="https://colab.research.google.com/github/MeirKaD/data-enrichment-agent/blob/main/data_enrichment_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AI-Powered Data Enrichment Agent

## Overview

This notebook demonstrates building an intelligent data enrichment agent using:

- **LangGraph**: Agentic workflow orchestration
- **Bright Data MCP**: Model Context Protocol integration for web data access
- **Claude Sonnet 4**: Advanced reasoning and structured extraction

### What This Agent Does

1. Takes a research topic and JSON schema as input
2. Autonomously searches the web using Bright Data's SERP API
3. Scrapes relevant websites with anti-bot bypass
4. Extracts and structures data matching your schema
5. Returns validated JSON output

---

## Setup

### Install Dependencies

In [1]:
!pip install -q langgraph langchain-openai langchain-mcp-adapters python-dotenv nest-asyncio

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/84.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.8/84.8 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h

### Configure Environment Variables

**For Google Colab:**
```python
from google.colab import userdata
import os
os.environ['BRIGHT_DATA_API_KEY'] = userdata.get('BRIGHT_DATA_API_KEY')
os.environ['ANTHROPIC_API_KEY'] = userdata.get('ANTHROPIC_API_KEY')
```

**For local Jupyter:**
Use the cell below with .env file

In [2]:
import os
import sys

# Check if running in Colab
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    from google.colab import userdata
    os.environ['BRIGHT_DATA_API_KEY'] = userdata.get('BRIGHT_DATA_API_KEY')
    os.environ['ANTHROPIC_API_KEY'] = userdata.get('ANTHROPIC_API_KEY')
    print("✓ Loaded keys from Colab secrets")
else:
    from dotenv import load_dotenv
    load_dotenv()
    print("✓ Loaded keys from .env file")

# Verify keys are set
assert os.getenv("BRIGHT_DATA_API_KEY"), "BRIGHT_DATA_API_KEY not set"
assert os.getenv("ANTHROPIC_API_KEY"), "ANTHROPIC_API_KEY not set"

print("✓ Environment configured")

✓ Loaded keys from Colab secrets
✓ Environment configured


### Configure Async Event Loop (Required for Colab)

In [3]:
import nest_asyncio
nest_asyncio.apply()

print("✓ Async event loop configured")

✓ Async event loop configured


### Suppress Verbose Warnings

In [4]:
import logging
import warnings

# Suppress MCP notification validation warnings
logging.getLogger().addFilter(
    lambda record: "Failed to validate notification" not in record.getMessage()
)
warnings.filterwarnings("ignore", message=".*Failed to validate notification.*")
warnings.filterwarnings("ignore", category=DeprecationWarning)

print("✓ Logging configured")

✓ Logging configured


---

## Agent Implementation

### 1. Define Agent State

The state tracks:
- Research topic
- Target extraction schema
- Conversation messages
- Extracted information

In [5]:
import json
import asyncio
from dataclasses import dataclass, field
from typing import Any, Annotated, List, Optional

from langchain_core.messages import AIMessage, HumanMessage, BaseMessage
from langgraph.graph.message import add_messages

@dataclass
class AgentState:
    """State for the enrichment agent."""
    topic: str
    extraction_schema: dict[str, Any]
    messages: Annotated[List[BaseMessage], add_messages] = field(default_factory=list)
    info: Optional[dict[str, Any]] = None

print("✓ Agent state defined")

✓ Agent state defined


### 2. System Prompt

Instructs the agent on its research capabilities and workflow

In [6]:
SYSTEM_PROMPT = """You are a research agent. Your task is to gather information about a topic and extract structured data.

You have access to these tools:
- search_engine: Search the web for information (Google/Bing/Yandex)
- scrape_as_markdown: Get content from a specific URL with bot detection bypass
- web_data_* tools: Fast, reliable structured data extraction from major platforms
- submit_info: Call this when you have gathered all the required information

Research topic: {topic}

Required information schema:
{schema}

Search for relevant information, scrape important pages, then call submit_info with the extracted data."""

print("✓ System prompt configured")

✓ System prompt configured


### 3. Create the Agent Graph

This builds the LangGraph workflow with:
- MCP client connection to Bright Data
- Claude LLM integration
- Tool execution node
- Conditional routing logic
- **Enhanced error handling for debugging**

In [7]:
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph
from langgraph.prebuilt import ToolNode
from langchain_mcp_adapters.client import MultiServerMCPClient

async def create_agent():
    """Create the enrichment agent graph."""

    try:
        # Configure MCP client
        print("Connecting to Bright Data MCP...")
        client = MultiServerMCPClient({
            "bright_data": {
                "url": f"https://mcp.brightdata.com/sse?token={os.getenv('BRIGHT_DATA_API_KEY')}",
                "transport": "sse",
            }
        })

        # Get available tools from MCP
        print("Fetching available tools...")
        tools = await client.get_tools()
        print(f"✓ Connected to Bright Data MCP ({len(tools)} tools available)")

    except Exception as e:
        print(f"\n❌ Error connecting to MCP:")
        print(f"Error type: {type(e).__name__}")
        print(f"Error message: {str(e)}")

        # Try to extract more details from ExceptionGroup
        if hasattr(e, 'exceptions'):
            print(f"\nSub-exceptions ({len(e.exceptions)} total):")
            for i, sub_exc in enumerate(e.exceptions, 1):
                print(f"  {i}. {type(sub_exc).__name__}: {str(sub_exc)}")

        raise

    # Initialize the model
    llm = ChatOpenAI(
        openai_api_key=os.getenv("ANTHROPIC_API_KEY"),
        openai_api_base="https://api.anthropic.com/v1",
        model_name="claude-sonnet-4-20250514"
    )

    async def call_model(state: AgentState) -> dict:
        """Call the LLM to decide next action."""
        prompt = SYSTEM_PROMPT.format(
            topic=state.topic,
            schema=json.dumps(state.extraction_schema, indent=2)
        )

        # Build messages: system prompt as first human message, then conversation
        messages = [HumanMessage(content=prompt)] + list(state.messages)

        # Create dynamic submit_info tool based on schema
        info_tool = {
            "name": "submit_info",
            "description": "Submit the extracted information when done researching. Call this with the structured data matching the required schema.",
            "parameters": state.extraction_schema,
        }

        # Bind all tools including the dynamic info tool
        model = llm.bind_tools(list(tools) + [info_tool])

        response = await model.ainvoke(messages)

        # Check if submitting info
        info = None
        if hasattr(response, 'tool_calls') and response.tool_calls:
            for tc in response.tool_calls:
                if tc["name"] == "submit_info":
                    info = tc["args"]
                    break

        return {"messages": [response], "info": info}

    def route(state: AgentState) -> str:
        """Route to next node based on last message."""
        # If we have extracted info, we're done
        if state.info:
            return "__end__"

        # Check the last message
        if not state.messages:
            return "agent"

        last_msg = state.messages[-1]

        if isinstance(last_msg, AIMessage) and hasattr(last_msg, 'tool_calls') and last_msg.tool_calls:
            # Check if it's a submit_info call
            for tc in last_msg.tool_calls:
                if tc["name"] == "submit_info":
                    return "__end__"
            # Otherwise, execute the tools
            return "tools"

        return "agent"

    # Build graph
    graph = StateGraph(AgentState)
    graph.add_node("agent", call_model)
    graph.add_node("tools", ToolNode(tools))
    graph.add_edge("__start__", "agent")
    graph.add_conditional_edges("agent", route)
    graph.add_edge("tools", "agent")

    return graph.compile()

print("✓ Agent graph builder defined")

✓ Agent graph builder defined


### 4. Enrichment Function

Simple API to run the agent

In [8]:
async def enrich(topic: str, schema: dict) -> dict:
    """Run the enrichment agent."""
    print(f"\n{'='*60}")
    print(f"Starting enrichment for: {topic}")
    print(f"{'='*60}\n")

    agent = await create_agent()
    result = await agent.ainvoke({
        "topic": topic,
        "extraction_schema": schema,
    })
    return result.get("info", {})

print("✓ Enrichment function ready")

✓ Enrichment function ready


---

## Demo: Extract Company Information

### Define Extraction Schema

Specify exactly what information you want to extract

In [9]:
company_schema = {
    "type": "object",
    "properties": {
        "company_name": {"type": "string"},
        "industry": {"type": "string"},
        "headquarters": {"type": "string"},
        "founded": {"type": "string"},
        "key_products": {"type": "array", "items": {"type": "string"}},
    },
    "required": ["company_name", "industry"]
}

print("Schema defined:")
print(json.dumps(company_schema, indent=2))

Schema defined:
{
  "type": "object",
  "properties": {
    "company_name": {
      "type": "string"
    },
    "industry": {
      "type": "string"
    },
    "headquarters": {
      "type": "string"
    },
    "founded": {
      "type": "string"
    },
    "key_products": {
      "type": "array",
      "items": {
        "type": "string"
      }
    }
  },
  "required": [
    "company_name",
    "industry"
  ]
}


### Run the Agent

Watch the agent autonomously research and extract information

In [10]:
result = await enrich("Stripe payments company", company_schema)

print("\n" + "="*60)
print("EXTRACTED INFORMATION")
print("="*60)
print(json.dumps(result, indent=2))


Starting enrichment for: Stripe payments company

Connecting to Bright Data MCP...
Fetching available tools...
✓ Connected to Bright Data MCP (4 tools available)

EXTRACTED INFORMATION
{
  "company_name": "Stripe, Inc.",
  "industry": "Financial services and software as a service (SaaS) / Financial technology (Fintech)",
  "headquarters": "South San Francisco, California, United States and Dublin, Ireland (dual headquarters)",
  "founded": "2010",
  "key_products": [
    "Payments",
    "Billing",
    "Connect",
    "Terminal",
    "Radar",
    "Sigma",
    "Atlas",
    "Issuing",
    "Link",
    "Tax",
    "Revenue Recognition",
    "Financial Accounts",
    "Data Pipeline",
    "Identity",
    "Climate",
    "Financial Connections",
    "Checkout",
    "Payment Links",
    "Elements"
  ]
}


---

## Additional Examples

### Example 2: Competitor Analysis

In [11]:
competitor_schema = {
    "type": "object",
    "properties": {
        "competitors": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "market_position": {"type": "string"},
                    "key_differentiator": {"type": "string"}
                }
            }
        }
    },
    "required": ["competitors"]
}

result = await enrich("Stripe competitors in payment processing", competitor_schema)

print("\nCompetitor Analysis:")
print(json.dumps(result, indent=2))


Starting enrichment for: Stripe competitors in payment processing

Connecting to Bright Data MCP...
Fetching available tools...
✓ Connected to Bright Data MCP (4 tools available)

Competitor Analysis:
{
  "competitors": [
    {
      "name": "PayPal",
      "market_position": "Global market leader with 43.4% of the online payment processing market share, serves small to enterprise-level businesses across various industries",
      "key_differentiator": "Highest consumer trust and recognition, robust fraud protection policies, supports multiple currencies and payment methods including PayPal Credit and installments"
    },
    {
      "name": "Square",
      "market_position": "Strong position in small to medium businesses, particularly in retail and restaurants, with integrated POS systems and ~2% overall payment processing market share",
      "key_differentiator": "Combines payment processing with comprehensive business management tools including inventory management, employee sched

### Example 3: Product Feature Extraction

In [12]:
product_schema = {
    "type": "object",
    "properties": {
        "product_name": {"type": "string"},
        "category": {"type": "string"},
        "key_features": {"type": "array", "items": {"type": "string"}},
        "pricing_model": {"type": "string"},
        "target_audience": {"type": "string"}
    },
    "required": ["product_name", "category"]
}

result = await enrich("Claude AI by Anthropic", product_schema)

print("\nProduct Information:")
print(json.dumps(result, indent=2))


Starting enrichment for: Claude AI by Anthropic

Connecting to Bright Data MCP...
Fetching available tools...
✓ Connected to Bright Data MCP (4 tools available)

Product Information:
{
  "product_name": "Claude AI",
  "category": "Generative AI Chatbot and Large Language Model Family",
  "key_features": [
    "Large context windows up to 1 million tokens",
    "Extended thinking mode with controllable reasoning budgets",
    "Multimodal capabilities (text, images, and audio processing)",
    "Constitutional AI safety framework",
    "Tool use and agentic capabilities",
    "Computer use for desktop automation",
    "Code execution in sandboxed environments",
    "Long-form content generation",
    "Real-time web search capabilities",
    "Vision analysis and PDF support",
    "Multiple model variants (Haiku, Sonnet, Opus)",
    "Enterprise-grade security and compliance",
    "Integration with AWS Bedrock and Google Vertex AI"
  ],
  "pricing_model": "Freemium with multiple tiers: Free

---

## Key Advantages

### 1. **Autonomous Research**
The agent independently decides what to search and which pages to scrape

### 2. **Schema-Driven Extraction**
Define your data structure once, get consistent JSON output

### 3. **Enterprise-Grade Infrastructure**
- Bright Data's global proxy network
- Anti-bot detection bypass
- Geo-targeting capabilities
- 99.99% uptime SLA

---

## Next Steps

- **Scale**: Process batches of topics concurrently
- **Customize**: Add domain-specific extraction logic
- **Integrate**: Connect to your data pipeline
- **Extend**: Add more MCP tools for LinkedIn, GitHub, etc.

---

## Resources

- [Bright Data MCP Documentation](https://docs.brightdata.com/mcp)
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
- [Claude API Documentation](https://docs.anthropic.com/)

---