# Company Enrichment with LangChain Agents on Databricks

This notebook demonstrates how to build an intelligent company enrichment pipeline using LangChain agents with Nimble tools on Databricks. The agent automatically searches for and extracts company information (address, funding, employees, investors, founders) and stores it in a Delta table.

**Key Features:**
- ü§ñ LangChain agent with Nimble Search & Extract tools
- üîÑ Two-step search strategy (fast search ‚Üí targeted extraction)
- üíæ Delta table storage with automatic updates
- üìä Structured output with Pydantic models
- üß† Summarization middleware for efficient token usage

## Step 1: Install Dependencies

Install required packages and restart Python to ensure all libraries are loaded properly.

In [None]:
%pip install langchain langchain-community databricks-langchain langchain-nimble --quiet
dbutils.library.restartPython()

## Step 2: Create Delta Table with Sample Companies

Set up a Delta table with sample companies to enrich. The table includes fields for company metadata and enrichment status tracking.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType
from delta.tables import DeltaTable

# Configuration - Update with your schema and table name
TABLE_NAME = "main.default.company_enrichment_demo"

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Sample companies to enrich
companies_data = [
    ("Anthropic", "anthropic.com"),
    ("OpenAI", "openai.com"),
    ("Databricks", "databricks.com"),
    ("Nimble", "nimbleway.com"),
    ("Scale AI", "scale.com"),
    ("Cursor", "cursor.com"),
    ("Linear", "linear.app"),
]

# Define schema - storing investors/founders as JSON strings
schema = StructType([
    StructField("company_name", StringType(), False),
    StructField("website", StringType(), False),
    StructField("address", StringType(), True),
    StructField("funding", StringType(), True),
    StructField("employees", StringType(), True),
    StructField("investors", StringType(), True),
    StructField("founders", StringType(), True),
    StructField("enrichment_status", StringType(), True)
])

# Create initial DataFrame with just company name and website
df = spark.createDataFrame(companies_data, ["company_name", "website"])

# Add remaining columns with NULL values
for field in schema.fields[2:]:
    if field.name == "enrichment_status":
        df = df.withColumn(field.name, lit("pending"))
    else:
        df = df.withColumn(field.name, lit(None).cast(StringType()))

# Create or replace Delta table
df.write.format("delta").mode("overwrite").saveAsTable(TABLE_NAME)

print(f"‚úÖ Created table: {TABLE_NAME}")
print(f"üìä Initial row count: {df.count()}")

display(spark.table(TABLE_NAME))

## Step 3: Initialize LangChain Agent

Create an agent with Nimble tools and summarization middleware. The agent uses a two-step strategy: fast search for quick results, then targeted extraction for missing data.

**Important:** Get your Nimble API key at [nimbleway.com](https://app.nimbleway.com/account_settings/api_keys)

In [None]:
import os
import getpass
from typing import List
from pydantic import BaseModel, Field
from databricks_langchain import ChatDatabricks
from langchain.agents import create_agent
from langchain_nimble import NimbleExtractTool, NimbleSearchTool
from langchain.agents.middleware import SummarizationMiddleware

# Securely set API key
if not os.environ.get("NIMBLE_API_KEY"):
    os.environ["NIMBLE_API_KEY"] = getpass.getpass("NIMBLE_API_KEY:\n")

llm_model = ChatDatabricks(endpoint="databricks-claude-sonnet-4-5")

# Agent prompt with two-step strategy
prompt_template = """
You are a company enrichment agent. Use this two-step approach:

**Step 1: Fast Search**
Use search_tool with deep_search=false to get quick snippets and URLs.
Extract as much information as possible from the snippets.

**Step 2: Targeted Extraction (if needed)**
If information is missing, use extract_tool on 1-2 relevant URLs from the search results.
Focus on official company websites, LinkedIn, or Crunchbase.

**Required Information:**
- address: Full headquarters address
- funding: Total funding (e.g., "$100M Series B")
- employees: Count or range (e.g., "500-1000")
- investors: List of investor names
- founders: List of founder names

Return "Not found" for missing strings, empty list [] for missing arrays.
"""

# Define structured output
class CompanyInfo(BaseModel):
    """Company enrichment information"""
    address: str = Field(description="Company headquarters address")
    funding: str = Field(description="Total funding raised")
    employees: str = Field(description="Employee count or range")
    investors: List[str] = Field(description="List of investors")
    founders: List[str] = Field(description="List of founders")

# Initialize agent
agent = create_agent(
    model=llm_model,
    tools=[NimbleSearchTool(), NimbleExtractTool()],
    system_prompt=prompt_template,
    response_format=CompanyInfo,
    middleware=[
        SummarizationMiddleware(
            model=llm_model,
            trigger=("tokens", 20000),
            keep=("messages", 10),
        ),
    ]
)

print("‚úÖ Agent initialized with Nimble tools")

## Step 4: Define Enrichment Function

Create an async function that uses the agent to enrich company data and returns structured results compatible with Delta table storage.

In [None]:
import json

async def enrich_company(company_name: str, website: str) -> dict:
    """Use agent to enrich company data with structured output"""
    try:
        query = f"Find address, funding, employees, investors, and founders for {company_name} (website: {website})"
        
        # Stream agent execution
        async for step in agent.astream(
            {"messages": [{"role": "user", "content": query}]},
            stream_mode="values",
        ):
            pass  # Process streaming steps silently
            
        # Extract structured response
        structured = step["structured_response"]
        result = structured.model_dump()
        
        # Convert lists to JSON strings for Delta table storage
        result = {
            "address": result.get("address", "Not found"),
            "funding": result.get("funding", "Not found"),
            "employees": result.get("employees", "Not found"),
            "investors": json.dumps(result.get("investors", [])),
            "founders": json.dumps(result.get("founders", []))
        }
        
        return result
        
    except Exception as e:
        print(f"‚ùå Error enriching {company_name}: {str(e)}")
        return {
            "address": "Error",
            "funding": "Error",
            "employees": "Error",
            "investors": "[]",
            "founders": "[]"
        }

print("‚úÖ Enrichment function ready")

## Step 5: Run Enrichment Process

Process all pending companies, enrich their data using the agent, and update the Delta table with results. The final display shows all enriched company information.

In [None]:
from pyspark.sql.functions import col, lit

# Load Delta table
delta_table = DeltaTable.forName(spark, TABLE_NAME)

# Get pending companies
pending = spark.table(TABLE_NAME).filter(col("enrichment_status") == "pending").collect()

print(f"üöÄ Enriching {len(pending)} companies...\n")

success_count = 0
error_count = 0

for idx, row in enumerate(pending, 1):
    company = row.company_name
    website = row.website
    
    print(f"[{idx}/{len(pending)}] Processing {company}...")
    
    try:
        # Enrich with agent
        data = await enrich_company(company, website)
        
        # Update Delta table
        delta_table.update(
            condition=col("company_name") == company,
            set={
                "address": lit(data["address"]),
                "funding": lit(data["funding"]),
                "employees": lit(data["employees"]),
                "investors": lit(data["investors"]),
                "founders": lit(data["founders"]),
                "enrichment_status": lit("completed" if data["address"] != "Error" else "failed")
            }
        )
        
        if data["address"] != "Error":
            print(f"   ‚úÖ {data['address']}\n")
            success_count += 1
        else:
            print(f"   ‚ö†Ô∏è  Failed - marked as failed\n")
            error_count += 1
            
    except Exception as e:
        print(f"   ‚ùå Unexpected error: {str(e)}\n")
        error_count += 1
        # Mark as failed but continue processing
        delta_table.update(
            condition=col("company_name") == company,
            set={"enrichment_status": lit("failed")}
        )

print("=" * 60)
print(f"üéâ Enrichment Complete!")
print(f"   ‚úÖ Success: {success_count}")
print(f"   ‚ùå Failed: {error_count}")
print(f"\nüìä View results:")
display(spark.table(TABLE_NAME))