# DSPy Pipeline for Structuring Unstructured Data

This notebook demonstrates how to use DSPy to extract structured entities and relationships from web content.

## Key Concepts
1. **Entity Extraction**: Using Pydantic models to force structured LLM outputs
2. **Confidence-based Deduplication**: Retry loops until LLM confidence meets threshold
3. **Mermaid Graph Generation**: Visualizing relationships with strict entity validation

---

## 1. Setup & Installation

In [None]:
# Install required packages
!pip install -q dspy-ai pydantic requests beautifulsoup4 html2text pandas

In [None]:
# Import libraries
import os
import csv
import re
import dspy
import requests
import pandas as pd
from bs4 import BeautifulSoup
import html2text
from pydantic import BaseModel, Field
from typing import List, Dict, Tuple, Optional, Set
from IPython.display import display, Markdown

print("Libraries imported successfully!")

## 2. Configure Longcat API

Get your free API key from [Longcat API Platform](https://api.longcat.chat)

In [None]:
# Set your Longcat API key here
LONGCAT_API_KEY = "your-api-key-here"  # Replace with your actual key


# Configure DSPy with Longcat API
lm = dspy.LM(
    model="openai/LongCat-Flash-Chat",
    api_base="https://api.longcat.chat/openai/v1",
    api_key=LONGCAT_API_KEY,
    temperature=0.7,
    max_tokens=4096
)
dspy.configure(lm=lm)
print("✓ DSPy configured with Longcat API")

## 3. Define Pydantic Models & DSPy Signatures

These enforce structured outputs from the LLM - no more regex parsing!

In [None]:
# Pydantic Models for Structured Output
class EntityWithAttr(BaseModel):
    """An entity with its semantic type."""
    entity: str = Field(description="The named entity")
    attr_type: str = Field(description="Semantic type (e.g., Concept, Process, Drug)")

class Triple(BaseModel):
    """A relationship triple between entities."""
    source: str = Field(description="Source entity")
    relation: str = Field(description="Relationship type")
    target: str = Field(description="Target entity")

# DSPy Signatures
class ExtractEntities(dspy.Signature):
    """Extract named entities from text with their semantic types."""
    paragraph: str = dspy.InputField(desc="Text to extract entities from")
    entities: List[EntityWithAttr] = dspy.OutputField(desc="Extracted entities")

class ExtractTriples(dspy.Signature):
    """Extract relationship triples using only provided entities."""
    paragraph: str = dspy.InputField(desc="Text to extract relationships from")
    entity_list: List[str] = dspy.InputField(desc="Valid entities to use")
    triples: List[Triple] = dspy.OutputField(desc="Relationship triples")

class DeduplicateEntities(dspy.Signature):
    """Deduplicate entities by grouping synonyms/variants."""
    items: List[str] = dspy.InputField(desc="Entities to deduplicate")
    deduplicated: List[str] = dspy.OutputField(desc="Canonical entity names")
    confidence: float = dspy.OutputField(desc="Confidence score 0-1")

print("✓ Models and signatures defined")

## 4. URL Scraping Module

In [None]:
def scrape_url(url: str, timeout: int = 30) -> Optional[str]:
    """Scrape and clean text content from a URL."""
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    }
    
    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Remove non-content elements
        for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
            element.decompose()
        
        # Find main content
        main_content = None
        for selector in ['article', 'main', '.content', '#content', '.article-body']:
            main_content = soup.select_one(selector) if selector.startswith('.') or selector.startswith('#') else soup.find(selector)
            if main_content:
                break
        
        if not main_content:
            main_content = soup.find('body')
        
        if not main_content:
            return None
        
        # Convert to text
        h = html2text.HTML2Text()
        h.ignore_links = True
        h.ignore_images = True
        h.body_width = 0
        
        text = h.handle(str(main_content))
        
        # Clean text
        text = re.sub(r'\n{3,}', '\n\n', text)
        text = re.sub(r' {2,}', ' ', text)
        text = re.sub(r'\[.*?\]', '', text)
        
        # Remove short lines
        lines = [l for l in text.split('\n') if len(l.strip()) > 20 or l.strip() == '']
        return '\n'.join(lines).strip()
        
    except Exception as e:
        print(f"Error scraping {url}: {e}")
        return None

def chunk_text(text: str, max_size: int = 2500) -> List[str]:
    """Split text into chunks for processing."""
    paragraphs = text.split('\n\n')
    chunks = []
    current = ""
    
    for para in paragraphs:
        if len(current) + len(para) + 2 <= max_size:
            current += para + "\n\n"
        else:
            if current:
                chunks.append(current.strip())
            current = para + "\n\n"
    
    if current:
        chunks.append(current.strip())
    
    return chunks

print("✓ Scraping functions defined")

## 5. Entity Extraction & Deduplication

In [None]:
# Initialize predictors (using dspy.Predict - current API)
entity_predictor = dspy.Predict(ExtractEntities)
triple_predictor = dspy.Predict(ExtractTriples)
dedup_predictor = dspy.Predict(DeduplicateEntities)

def extract_entities(text: str) -> List[EntityWithAttr]:
    """Extract entities from text."""
    try:
        result = entity_predictor(paragraph=text)
        return result.entities
    except Exception as e:
        print(f"Entity extraction error: {e}")
        return []

def extract_triples(text: str, entity_list: List[str]) -> List[Triple]:
    """Extract relationship triples from text."""
    try:
        result = triple_predictor(paragraph=text, entity_list=entity_list)
        
        # Filter to valid entities only
        valid_entities = {e.lower().strip() for e in entity_list}
        valid_triples = [
            t for t in result.triples
            if t.source.lower().strip() in valid_entities and t.target.lower().strip() in valid_entities
        ]
        return valid_triples
    except Exception as e:
        print(f"Triple extraction error: {e}")
        return []

def deduplicate_with_confidence(items: List[str], target_confidence: float = 0.85, max_retries: int = 3):
    """
    Deduplicate with confidence loop - keeps retrying until confidence >= target.
    This is a key safety feature: LLMs hallucinate, so we validate!
    """
    if not items:
        return [], {}
    
    # Remove exact duplicates first
    seen = {}
    unique_items = []
    for item in items:
        key = item.lower().strip()
        if key not in seen:
            seen[key] = item
            unique_items.append(item)
    
    best_result = None
    best_confidence = 0.0
    
    for attempt in range(max_retries):
        try:
            pred = dedup_predictor(items=unique_items)
            confidence = float(pred.confidence)
            
            if confidence > best_confidence:
                best_confidence = confidence
                best_result = pred
            
            if confidence >= target_confidence:
                print(f"  ✓ Deduplication confidence: {confidence:.2f}")
                break
        except Exception as e:
            print(f"  Dedup attempt {attempt+1} failed: {e}")
    
    if best_result is None:
        return unique_items, {item: item for item in unique_items}
    
    # Build mapping
    mapping = {}
    dedup_lower = {d.lower().strip(): d for d in best_result.deduplicated}
    
    for item in unique_items:
        item_lower = item.lower().strip()
        if item_lower in dedup_lower:
            mapping[item] = dedup_lower[item_lower]
        else:
            mapping[item] = item
    
    return best_result.deduplicated, mapping

print("✓ Extraction and deduplication functions defined")

## 6. Mermaid Diagram Generation

In [None]:
def clean_label(label: str, max_len: int = 40) -> str:
    """Clean label for Mermaid syntax."""
    label = re.sub(r'["\'\[\]{}()<>|\\]', '', label)
    label = label.replace('&', 'and').replace('#', '').replace('--', '-')
    return label[:max_len-3] + "..." if len(label) > max_len else label.strip()

def clean_node_id(name: str) -> str:
    """Convert name to valid Mermaid node ID."""
    node_id = re.sub(r'[^a-zA-Z0-9]', '_', name.lower())
    node_id = re.sub(r'_+', '_', node_id).strip('_')
    if node_id and not node_id[0].isalpha():
        node_id = 'n_' + node_id
    return node_id or 'unknown'

def triples_to_mermaid(triples: List[Triple], entities: List[EntityWithAttr], title: str = "") -> str:
    """
    Convert triples to Mermaid graph.
    Only allows entities from deduplicated list as nodes - prevents garbage!
    """
    entity_set = {e.entity.strip().lower() for e in entities}
    entity_display = {e.entity.strip().lower(): e.entity for e in entities}
    
    lines = []
    if title:
        lines.extend(["---", f"title: {clean_label(title, 60)}", "---"])
    lines.append("graph TD")
    
    edges = []
    for t in triples:
        src_lower = t.source.strip().lower()
        dst_lower = t.target.strip().lower()
        
        if src_lower not in entity_set or dst_lower not in entity_set:
            continue
        if src_lower == dst_lower:
            continue
        
        src_disp = entity_display.get(src_lower, t.source)
        dst_disp = entity_display.get(dst_lower, t.target)
        
        edge = f"    {clean_node_id(src_disp)}[\"{clean_label(src_disp)}\"] -- {clean_label(t.relation)} --> {clean_node_id(dst_disp)}[\"{clean_label(dst_disp)}\"]"
        edges.append(edge)
    
    lines.extend(list(dict.fromkeys(edges)))
    return "\n".join(lines)

print("✓ Mermaid generation functions defined")

## 7. Define URLs to Process

In [None]:
ASSIGNMENT_URLS = [
    "https://en.wikipedia.org/wiki/Sustainable_agriculture",
    "https://www.nature.com/articles/d41586-025-03353-5",
    "https://www.sciencedirect.com/science/article/pii/S1043661820315152",
    "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC10457221/",
    "https://www.fao.org/3/y4671e/y4671e06.htm",
    "https://www.medscape.com/viewarticle/time-reconsider-tramadol-chronic-pain-2025a1000ria",
    "https://www.sciencedirect.com/science/article/pii/S0378378220307088",
    "https://www.frontiersin.org/news/2025/09/01/rectangle-telescope-finding-habitable-planets",
    "https://www.medscape.com/viewarticle/second-dose-boosts-shingles-protection-adults-aged-65-years-2025a1000ro7",
    "https://www.theguardian.com/global-development/2025/oct/13/astro-ambassadors-stargazers-himalayas-hanle-ladakh-india",
]

print(f"✓ {len(ASSIGNMENT_URLS)} URLs to process")

## 8. Run the Full Pipeline

In [None]:
# Process all URLs
all_results = []

for i, url in enumerate(ASSIGNMENT_URLS, 1):
    print(f"\n{'='*60}")
    print(f"Processing URL {i}/10: {url[:60]}...")
    print(f"{'='*60}")
    
    try:
        # Scrape
        content = scrape_url(url)
        if not content or len(content) < 100:
            print(f"  ✗ Could not scrape content")
            all_results.append({'url': url, 'entities': [], 'triples': []})
            continue
        print(f"  ✓ Scraped {len(content)} chars")
        
        # Chunk
        chunks = chunk_text(content)[:5]
        print(f"  ✓ Processing {len(chunks)} chunks")
        
        # Extract entities
        all_entities = []
        for chunk in chunks:
            entities = extract_entities(chunk)
            all_entities.extend(entities)
        print(f"  ✓ Extracted {len(all_entities)} raw entities")
        
        # Deduplicate
        entity_names = [e.entity for e in all_entities]
        dedup_names, mapping = deduplicate_with_confidence(entity_names)
        
        # Create deduplicated entity objects
        type_votes = {}
        for e in all_entities:
            canon = mapping.get(e.entity, e.entity)
            if canon not in type_votes:
                type_votes[canon] = {}
            type_votes[canon][e.attr_type] = type_votes[canon].get(e.attr_type, 0) + 1
        
        dedup_entities = []
        for name in dedup_names:
            if name in type_votes:
                best_type = max(type_votes[name].items(), key=lambda x: x[1])[0]
            else:
                best_type = "Concept"
            dedup_entities.append(EntityWithAttr(entity=name, attr_type=best_type))
        print(f"  ✓ Deduplicated to {len(dedup_entities)} entities")
        
        # Extract triples
        all_triples = []
        for chunk in chunks[:3]:  # Limit for API
            triples = extract_triples(chunk, dedup_names)
            all_triples.extend(triples)
        
        # Map and deduplicate triples
        mapped_triples = []
        seen = set()
        for t in all_triples:
            key = (t.source.lower(), t.relation.lower(), t.target.lower())
            if key not in seen:
                seen.add(key)
                mapped_triples.append(t)
        print(f"  ✓ Extracted {len(mapped_triples)} unique triples")
        
        all_results.append({
            'url': url,
            'entities': dedup_entities,
            'triples': mapped_triples
        })
        
    except Exception as e:
        print(f"  ✗ Error: {e}")
        all_results.append({'url': url, 'entities': [], 'triples': []})

print(f"\n{'='*60}")
print("PROCESSING COMPLETE")
print(f"{'='*60}")

## 9. Generate Mermaid Diagrams

In [None]:
# Generate and save Mermaid diagrams
for i, result in enumerate(all_results, 1):
    filename = f"mermaid_{i}.md"
    
    if result['entities'] and result['triples']:
        title = result['url'].split('/')[-1][:50] or f"Document {i}"
        mermaid_content = triples_to_mermaid(result['triples'], result['entities'], title)
    elif result['entities']:
        mermaid_content = "graph TD\n    subgraph Entities\n"
        for e in result['entities'][:10]:
            mermaid_content += f"        {clean_node_id(e.entity)}[\"{clean_label(e.entity)}\"]\n"
        mermaid_content += "    end"
    else:
        mermaid_content = "graph TD\n    note[No entities extracted]"
    
    with open(filename, 'w') as f:
        f.write(f"```mermaid\n{mermaid_content}\n```\n")
    
    print(f"✓ Saved {filename}")

print("\nAll Mermaid diagrams saved!")

## 10. Generate CSV Output

In [None]:
# Generate tags.csv
rows = []
for result in all_results:
    url = result['url']
    seen_tags = set()
    
    for entity in result['entities']:
        tag_key = entity.entity.lower().strip()
        if tag_key not in seen_tags:
            seen_tags.add(tag_key)
            rows.append({
                'link': url,
                'tag': entity.entity,
                'tag_type': entity.attr_type
            })

# Create DataFrame and save
df = pd.DataFrame(rows)
df.to_csv('tags.csv', index=False)

print(f"✓ Saved tags.csv with {len(rows)} entries")
print("\nPreview:")
display(df.head(20))

## 11. Preview Mermaid Diagrams

In [None]:
# Display first mermaid diagram
with open('mermaid_1.md', 'r') as f:
    content = f.read()
    display(Markdown(content))
    print("\nNote: Mermaid rendering requires a compatible viewer.")
    print("Copy the content to https://mermaid.live to visualize.")

## 12. Summary Statistics

In [None]:
print("="*60)
print("PIPELINE SUMMARY")
print("="*60)
print(f"Total URLs processed: {len(all_results)}")
print(f"URLs with entities: {sum(1 for r in all_results if r['entities'])}")
print(f"Total entities: {sum(len(r['entities']) for r in all_results)}")
print(f"Total relationships: {sum(len(r['triples']) for r in all_results)}")
print("\nOutputs:")
print("  - mermaid_1.md through mermaid_10.md")
print("  - tags.csv")

# Entity type distribution
type_counts = {}
for r in all_results:
    for e in r['entities']:
        type_counts[e.attr_type] = type_counts.get(e.attr_type, 0) + 1

print("\nEntity Types Distribution:")
for t, c in sorted(type_counts.items(), key=lambda x: -x[1])[:10]:
    print(f"  {t}: {c}")

---

## Key Takeaways

1. **Pydantic + DSPy** = Structured outputs without regex parsing
2. **Confidence loops** = Self-correcting LLM calls for critical tasks
3. **Entity validation** = Prevents garbage nodes in knowledge graphs

This pipeline demonstrates production-ready patterns for converting unstructured text into structured, queryable data.