# Newsletter Processing Pipeline Development

This notebook contains the development and testing of the Arrgh! Aggregated Research Repository newsletter processing pipeline.

## Pipeline Overview

1. **Configuration Setup**: Load environment variables and settings
2. **HTML Processing**: Parse and clean newsletter HTML content
3. **Entity Extraction**: Use LLM to extract entities (6 types)
4. **Graph Operations**: Connect to Neo4j and manage graph database
5. **Entity Resolution**: Match extracted entities to existing nodes
6. **Fact Extraction**: Extract relationships and temporal facts
7. **Graph Updates**: Update Neo4j with new nodes and relationships
8. **Summary Generation**: Create human-readable summaries

## Entity Types
- **Organization**: Companies, institutions, government bodies
- **Person**: Individuals mentioned in content
- **Product**: Software, hardware, services
- **Event**: Conferences, announcements, launches
- **Location**: Geographic locations
- **Topic**: Subject areas, technologies

## 1. Configuration Setup

In [None]:
# Import necessary libraries
import os
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional, TypedDict
from datetime import datetime, timezone
import json

# Add src directory to path for imports
src_path = Path("..") / "src"
sys.path.insert(0, str(src_path))

# Load environment variables
from dotenv import load_dotenv
load_dotenv(Path("..") / ".env")

# Data processing
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
import html2text

# LLM and AI
from openai import OpenAI
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor

# Graph database
from neo4j import GraphDatabase

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("✅ All imports successful!")

In [None]:
# Configuration from environment variables
class Config:
    # LLM Configuration
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4-turbo")
    LLM_TEMPERATURE = float(os.getenv("LLM_TEMPERATURE", "0.1"))
    LLM_MAX_TOKENS = int(os.getenv("LLM_MAX_TOKENS", "2000"))
    
    # Neo4j Configuration
    NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
    NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
    NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
    NEO4J_DATABASE = os.getenv("NEO4J_DATABASE", "neo4j")
    
    # Processing Configuration
    MAX_ENTITIES_PER_NEWSLETTER = int(os.getenv("MAX_ENTITIES_PER_NEWSLETTER", "100"))
    FACT_EXTRACTION_BATCH_SIZE = int(os.getenv("FACT_EXTRACTION_BATCH_SIZE", "10"))
    PROCESSING_TIMEOUT = int(os.getenv("PROCESSING_TIMEOUT", "300"))
    ENTITY_CONFIDENCE_THRESHOLD = float(os.getenv("ENTITY_CONFIDENCE_THRESHOLD", "0.7"))
    FACT_CONFIDENCE_THRESHOLD = float(os.getenv("FACT_CONFIDENCE_THRESHOLD", "0.8"))
    
    # Feature Flags
    ENABLE_DEBUG_MODE = os.getenv("ENABLE_DEBUG_MODE", "false").lower() == "true"
    
config = Config()

print("🔧 Configuration loaded:")
print(f"  LLM Model: {config.LLM_MODEL}")
print(f"  Neo4j URI: {config.NEO4J_URI}")
print(f"  Max Entities: {config.MAX_ENTITIES_PER_NEWSLETTER}")
print(f"  Debug Mode: {config.ENABLE_DEBUG_MODE}")

## 2. Data Models and Types

In [None]:
# Data models for the processing pipeline
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from datetime import datetime

class Entity(BaseModel):
    """Represents an extracted entity from newsletter content."""
    name: str
    type: str  # Organization, Person, Product, Event, Location, Topic
    aliases: List[str] = Field(default_factory=list)
    confidence: float = Field(ge=0.0, le=1.0)
    context: Optional[str] = None
    properties: Dict[str, Any] = Field(default_factory=dict)

class Fact(BaseModel):
    """Represents a relationship or fact between entities."""
    subject_entity: str
    predicate: str  # relationship type
    object_entity: str
    confidence: float = Field(ge=0.0, le=1.0)
    temporal_context: Optional[str] = None
    date_mentioned: Optional[datetime] = None
    source_context: Optional[str] = None

class Newsletter(BaseModel):
    """Represents a newsletter to be processed."""
    html_content: str
    subject: str
    sender: str
    received_date: Optional[datetime] = None
    newsletter_id: Optional[str] = None

class ExtractionState(TypedDict):
    """State for the LangGraph workflow."""
    # Input
    newsletter: Newsletter
    
    # Processing stages
    cleaned_text: str
    extracted_entities: List[Entity]
    resolved_entities: List[Entity]
    extracted_facts: List[Fact]
    
    # Results
    neo4j_updates: Dict[str, Any]
    processing_metrics: Dict[str, Any]
    text_summary: str
    errors: List[str]
    
    # Metadata
    processing_start_time: datetime
    current_step: str

print("📊 Data models defined successfully!")

## 3. Sample Newsletter Data

In [None]:
# Sample newsletter HTML for testing
sample_newsletter_html = """
<!DOCTYPE html>
<html>
<head>
    <title>AI Weekly Newsletter #245</title>
</head>
<body>
    <h1>AI Weekly Newsletter #245</h1>
    <p>Welcome to this week's AI updates!</p>
    
    <h2>🚀 Major Announcements</h2>
    <p><strong>OpenAI</strong> announced the release of <strong>GPT-5</strong> at their developer conference in <strong>San Francisco</strong>. 
    CEO <strong>Sam Altman</strong> presented the new capabilities during the <strong>OpenAI DevDay 2024</strong> event.</p>
    
    <h2>🏢 Company Updates</h2>
    <p><strong>Microsoft</strong> expanded their <strong>Azure AI</strong> services with new enterprise features. 
    The announcement was made by <strong>Satya Nadella</strong> during the <strong>Microsoft Build 2024</strong> conference.</p>
    
    <h2>🎯 Industry News</h2>
    <p><strong>Google</strong> launched their new <strong>Gemini Pro</strong> model, focusing on <strong>AI Safety</strong> and 
    <strong>Responsible AI</strong> development. The launch event was held at <strong>Google I/O 2024</strong> in <strong>Mountain View</strong>.</p>
    
    <h2>🎓 Educational Content</h2>
    <p><strong>Stanford University</strong> announced a new <strong>AI Safety</strong> course taught by renowned professor 
    <strong>Fei-Fei Li</strong>. The course will cover <strong>Machine Learning</strong> ethics and <strong>AI Alignment</strong>.</p>
    
    <h2>💡 Research Highlights</h    <p>New research on <strong>Quantum Computing</strong> applications in <strong>AI</strong> was published by researchers at 
    <strong>MIT</strong> and <strong>IBM Research</strong>. The paper explores <strong>Quantum Machine Learning</strong> algorithms.</p>
    
    <p>Thanks for reading! See you next week.</p>
    <p>Best regards,<br>The AI Weekly Team</p>
</body>
</html>
"""

# Create sample newsletter object
sample_newsletter = Newsletter(
    html_content=sample_newsletter_html,
    subject="AI Weekly Newsletter #245",
    sender="ai-weekly@example.com",
    received_date=datetime.now(timezone.utc),
    newsletter_id="ai-weekly-245"
)

print("📧 Sample newsletter created:")
print(f"  Subject: {sample_newsletter.subject}")
print(f"  Sender: {sample_newsletter.sender}")
print(f"  Content length: {len(sample_newsletter.html_content)} characters")

## 4. HTML Processing Functions

In [None]:
def clean_html_content(html_content: str) -> str:
    """Clean and extract text from HTML content."""
    try:
        # Parse HTML with BeautifulSoup
        soup = BeautifulSoup(html_content, 'html.parser')
        
        # Remove script and style elements
        for script in soup(["script", "style"]):
            script.decompose()
        
        # Convert to text using html2text for better formatting
        h = html2text.HTML2Text()
        h.ignore_links = False
        h.ignore_images = True
        h.body_width = 0  # Don't wrap lines
        
        # Get text content
        text_content = h.handle(str(soup))
        
        # Clean up extra whitespace
        cleaned_text = ' '.join(text_content.split())
        
        return cleaned_text
    
    except Exception as e:
        print(f"❌ Error cleaning HTML: {e}")
        return ""

def extract_text_sections(html_content: str) -> Dict[str, str]:
    """Extract different sections of the newsletter."""
    try:
        soup = BeautifulSoup(html_content, 'html.parser')
        
        sections = {
            'title': '',
            'headers': [],
            'paragraphs': [],
            'links': []
        }
        
        # Extract title
        title_tag = soup.find('title') or soup.find('h1')
        if title_tag:
            sections['title'] = title_tag.get_text().strip()
        
        # Extract headers
        for header in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
            sections['headers'].append(header.get_text().strip())
        
        # Extract paragraphs
        for para in soup.find_all('p'):
            text = para.get_text().strip()
            if text:
                sections['paragraphs'].append(text)
        
        # Extract links
        for link in soup.find_all('a', href=True):
            sections['links'].append({
                'text': link.get_text().strip(),
                'url': link['href']
            })
        
        return sections
    
    except Exception as e:
        print(f"❌ Error extracting sections: {e}")
        return {}

# Test HTML processing
cleaned_text = clean_html_content(sample_newsletter.html_content)
sections = extract_text_sections(sample_newsletter.html_content)

print("🧹 HTML Processing Results:")
print(f"  Cleaned text length: {len(cleaned_text)} characters")
print(f"  Title: {sections.get('title', 'N/A')}")
print(f"  Headers found: {len(sections.get('headers', []))}")
print(f"  Paragraphs found: {len(sections.get('paragraphs', []))}")
print(f"  Links found: {len(sections.get('links', []))}")

# Show first 200 characters of cleaned text
print(f"\nFirst 200 characters of cleaned text:")
print(f"'{cleaned_text[:200]}...'")

## 5. LLM Integration for Entity Extraction

In [None]:
# Initialize OpenAI client
if config.OPENAI_API_KEY:
    openai_client = OpenAI(api_key=config.OPENAI_API_KEY)
    llm = ChatOpenAI(
        model=config.LLM_MODEL,
        temperature=config.LLM_TEMPERATURE,
        max_tokens=config.LLM_MAX_TOKENS,
        openai_api_key=config.OPENAI_API_KEY
    )
    print("✅ OpenAI client initialized")
else:
    print("⚠️ OpenAI API key not found. Please set OPENAI_API_KEY in your .env file")
    llm = None

In [None]:
# Entity extraction prompt template
ENTITY_EXTRACTION_PROMPT = """
You are an expert at extracting structured information from newsletter content. 
Extract entities from the following newsletter text and classify them into these categories:

**Entity Types:**
- **Organization**: Companies, institutions, government bodies
- **Person**: Individuals mentioned in content
- **Product**: Software, hardware, services, models
- **Event**: Conferences, announcements, launches
- **Location**: Geographic locations (cities, countries, regions)
- **Topic**: Subject areas, technologies, fields of study

**Instructions:**
1. Extract entities with high confidence (>0.7)
2. Provide alternative names/aliases if mentioned
3. Include context where the entity was mentioned
4. Rate confidence from 0.0 to 1.0
5. Return results as valid JSON

**Newsletter Content:**
{content}

**Required JSON Format:**
```json
{
  "entities": [
    {
      "name": "Entity Name",
      "type": "Organization|Person|Product|Event|Location|Topic",
      "aliases": ["Alternative Name 1", "Alternative Name 2"],
      "confidence": 0.95,
      "context": "The sentence or phrase where this entity was mentioned",
      "properties": {
        "additional_info": "any relevant details"
      }
    }
  ]
}
```

Return only valid JSON, no additional text.
"""

def extract_entities_with_llm(content: str) -> List[Entity]:
    """Extract entities from content using LLM."""
    if not llm:
        print("⚠️ LLM not initialized")
        return []
    
    try:
        # Create prompt
        prompt = ENTITY_EXTRACTION_PROMPT.format(content=content)
        
        # Get LLM response
        response = llm.invoke(prompt)
        
        # Parse JSON response
        import json
        result = json.loads(response.content)
        
        # Convert to Entity objects
        entities = []
        for entity_data in result.get('entities', []):
            entity = Entity(
                name=entity_data['name'],
                type=entity_data['type'],
                aliases=entity_data.get('aliases', []),
                confidence=entity_data['confidence'],
                context=entity_data.get('context'),
                properties=entity_data.get('properties', {})
            )
            entities.append(entity)
        
        return entities
    
    except Exception as e:
        print(f"❌ Error extracting entities: {e}")
        return []

# Test entity extraction (only if LLM is available)
if llm:
    print("🔍 Testing entity extraction...")
    extracted_entities = extract_entities_with_llm(cleaned_text[:1000])  # Test with first 1000 chars
    
    if extracted_entities:
        print(f"✅ Extracted {len(extracted_entities)} entities:")
        for i, entity in enumerate(extracted_entities[:5]):  # Show first 5
            print(f"  {i+1}. {entity.name} ({entity.type}) - Confidence: {entity.confidence}")
    else:
        print("⚠️ No entities extracted")
else:
    print("⚠️ Skipping entity extraction test - LLM not available")

## 6. Neo4j Graph Database Integration

In [None]:
# Neo4j Connection Manager
class Neo4jConnection:
    def __init__(self, uri: str, user: str, password: str):
        self.uri = uri
        self.user = user
        self.password = password
        self.driver = None
        
    def connect(self):
        """Establish connection to Neo4j."""
        try:
            self.driver = GraphDatabase.driver(
                self.uri, 
                auth=(self.user, self.password)
            )
            # Test connection
            with self.driver.session() as session:
                result = session.run("RETURN 1 as test")
                result.single()
            print("✅ Neo4j connection established")
            return True
        except Exception as e:
            print(f"❌ Neo4j connection failed: {e}")
            return False
    
    def close(self):
        """Close Neo4j connection."""
        if self.driver:
            self.driver.close()
            print("🔌 Neo4j connection closed")
    
    def execute_query(self, query: str, parameters: dict = None):
        """Execute a Cypher query."""
        if not self.driver:
            print("❌ No Neo4j connection")
            return None
        
        try:
            with self.driver.session() as session:
                result = session.run(query, parameters or {})
                return result.data()
        except Exception as e:
            print(f"❌ Query execution failed: {e}")
            return None

# Initialize Neo4j connection
neo4j_conn = Neo4jConnection(
    uri=config.NEO4J_URI,
    user=config.NEO4J_USER,
    password=config.NEO4J_PASSWORD
)

# Test connection
connection_success = neo4j_conn.connect()
if not connection_success:
    print("⚠️ Neo4j connection failed. Please check your configuration.")
    print("   Make sure Neo4j is running and credentials are correct.")

In [None]:
# Graph database schema setup
def setup_graph_constraints_and_indexes():
    """Create constraints and indexes for optimal graph performance."""
    if not neo4j_conn.driver:
        print("❌ No Neo4j connection available")
        return
    
    constraints_and_indexes = [
        # Unique constraints
        "CREATE CONSTRAINT unique_org_name IF NOT EXISTS FOR (o:Organization) REQUIRE o.name IS UNIQUE",
        "CREATE CONSTRAINT unique_person_name IF NOT EXISTS FOR (p:Person) REQUIRE p.name IS UNIQUE",
        "CREATE CONSTRAINT unique_product_name IF NOT EXISTS FOR (pr:Product) REQUIRE pr.name IS UNIQUE",
        "CREATE CONSTRAINT unique_event_name IF NOT EXISTS FOR (e:Event) REQUIRE e.name IS UNIQUE",
        "CREATE CONSTRAINT unique_location_name IF NOT EXISTS FOR (l:Location) REQUIRE l.name IS UNIQUE",
        "CREATE CONSTRAINT unique_topic_name IF NOT EXISTS FOR (t:Topic) REQUIRE t.name IS UNIQUE",
        "CREATE CONSTRAINT unique_newsletter_id IF NOT EXISTS FOR (n:Newsletter) REQUIRE n.id IS UNIQUE",
        
        # Performance indexes
        "CREATE INDEX newsletter_date_idx IF NOT EXISTS FOR (n:Newsletter) ON (n.received_date)",
        "CREATE INDEX entity_confidence_idx IF NOT EXISTS FOR (e:Organization) ON (e.confidence)",
        "CREATE INDEX entity_last_seen_idx IF NOT EXISTS FOR (e:Organization) ON (e.last_seen)",
    ]
    
    print("🔧 Setting up graph constraints and indexes...")
    
    for constraint in constraints_and_indexes:
        try:
            neo4j_conn.execute_query(constraint)
            print(f"  ✅ {constraint.split()[1]} {constraint.split()[2]}")
        except Exception as e:
            print(f"  ⚠️ {constraint.split()[1]} {constraint.split()[2]}: {e}")
    
    print("📊 Graph schema setup complete!")

# Set up schema (only if connected)
if connection_success:
    setup_graph_constraints_and_indexes()
else:
    print("⚠️ Skipping schema setup - no Neo4j connection")

## 7. Graph Operations Functions

In [None]:
# Graph operations for entity management
class GraphOperations:
    def __init__(self, neo4j_connection):
        self.neo4j_conn = neo4j_connection
    
    def create_or_update_entity(self, entity: Entity) -> dict:
        """Create or update an entity node in the graph."""
        query = f"""
        MERGE (e:{entity.type} {{name: $name}})
        ON CREATE SET 
            e.created_at = datetime(),
            e.confidence = $confidence,
            e.aliases = $aliases,
            e.mention_count = 1,
            e.properties = $properties
        ON MATCH SET
            e.last_seen = datetime(),
            e.mention_count = e.mention_count + 1,
            e.confidence = CASE 
                WHEN $confidence > e.confidence THEN $confidence 
                ELSE e.confidence 
            END
        RETURN e, 
               CASE WHEN e.created_at = e.last_seen THEN 'created' ELSE 'updated' END as operation
        """
        
        parameters = {
            'name': entity.name,
            'confidence': entity.confidence,
            'aliases': entity.aliases,
            'properties': entity.properties
        }
        
        result = self.neo4j_conn.execute_query(query, parameters)
        return result[0] if result else None
    
    def create_newsletter_node(self, newsletter: Newsletter) -> dict:
        """Create a newsletter node in the graph."""
        query = """
        CREATE (n:Newsletter {
            id: $newsletter_id,
            subject: $subject,
            sender: $sender,
            received_date: $received_date,
            created_at: datetime(),
            content_length: $content_length
        })
        RETURN n
        """
        
        parameters = {
            'newsletter_id': newsletter.newsletter_id,
            'subject': newsletter.subject,
            'sender': newsletter.sender,
            'received_date': newsletter.received_date,
            'content_length': len(newsletter.html_content)
        }
        
        result = self.neo4j_conn.execute_query(query, parameters)
        return result[0] if result else None
    
    def link_entity_to_newsletter(self, entity_name: str, entity_type: str, newsletter_id: str, context: str = None) -> dict:
        """Create a MENTIONED_IN relationship between entity and newsletter."""
        query = f"""
        MATCH (e:{entity_type} {{name: $entity_name}})
        MATCH (n:Newsletter {{id: $newsletter_id}})
        CREATE (e)-[r:MENTIONED_IN {{
            date: datetime(),
            context: $context
        }}]->(n)
        RETURN r
        """
        
        parameters = {
            'entity_name': entity_name,
            'newsletter_id': newsletter_id,
            'context': context
        }
        
        result = self.neo4j_conn.execute_query(query, parameters)
        return result[0] if result else None
    
    def find_similar_entities(self, entity_name: str, entity_type: str, similarity_threshold: float = 0.8) -> List[dict]:
        """Find entities with similar names for resolution."""
        query = f"""
        MATCH (e:{entity_type})
        WHERE e.name CONTAINS $search_term 
           OR ANY(alias IN e.aliases WHERE alias CONTAINS $search_term)
           OR $search_term CONTAINS e.name
        RETURN e, 
               e.mention_count as popularity,
               e.confidence as confidence
        ORDER BY popularity DESC, confidence DESC
        LIMIT 10
        """
        
        parameters = {'search_term': entity_name}
        result = self.neo4j_conn.execute_query(query, parameters)
        return result or []
    
    def get_graph_stats(self) -> dict:
        """Get basic statistics about the graph."""
        stats_query = """
        CALL {
            MATCH (o:Organization) RETURN count(o) as organizations
        }
        CALL {
            MATCH (p:Person) RETURN count(p) as people
        }
        CALL {
            MATCH (pr:Product) RETURN count(pr) as products
        }
        CALL {
            MATCH (e:Event) RETURN count(e) as events
        }
        CALL {
            MATCH (l:Location) RETURN count(l) as locations
        }
        CALL {
            MATCH (t:Topic) RETURN count(t) as topics
        }
        CALL {
            MATCH (n:Newsletter) RETURN count(n) as newsletters
        }
        CALL {
            MATCH ()-[r]->() RETURN count(r) as relationships
        }
        RETURN organizations, people, products, events, locations, topics, newsletters, relationships
        """
        
        result = self.neo4j_conn.execute_query(stats_query)
        return result[0] if result else {}

# Initialize graph operations
if connection_success:
    graph_ops = GraphOperations(neo4j_conn)
    
    # Get initial graph statistics
    stats = graph_ops.get_graph_stats()
    print("📊 Current graph statistics:")
    for key, value in stats.items():
        print(f"  {key.capitalize()}: {value}")
else:
    print("⚠️ Graph operations not available - no Neo4j connection")
    graph_ops = None

## 8. Complete Processing Pipeline Test

In [None]:
# Complete processing pipeline function
def process_newsletter_pipeline(newsletter: Newsletter) -> dict:
    """Process a newsletter through the complete pipeline."""
    start_time = datetime.now()
    results = {
        'status': 'success',
        'processing_time': 0,
        'entities_extracted': 0,
        'entities_new': 0,
        'entities_updated': 0,
        'newsletter_node_id': newsletter.newsletter_id,
        'summary': {},
        'text_summary': '',
        'errors': []
    }
    
    try:
        print(f"🚀 Starting newsletter processing: {newsletter.subject}")
        
        # Step 1: Clean HTML content
        print("  1️⃣ Cleaning HTML content...")
        cleaned_text = clean_html_content(newsletter.html_content)
        if not cleaned_text:
            results['errors'].append("Failed to clean HTML content")
            results['status'] = 'error'
            return results
        
        # Step 2: Extract entities with LLM
        print("  2️⃣ Extracting entities with LLM...")
        if llm:
            entities = extract_entities_with_llm(cleaned_text)
            results['entities_extracted'] = len(entities)
            print(f"    ✅ Extracted {len(entities)} entities")
        else:
            entities = []
            results['errors'].append("LLM not available for entity extraction")
        
        # Step 3: Create newsletter node in graph
        print("  3️⃣ Creating newsletter node...")
        if graph_ops:
            newsletter_node = graph_ops.create_newsletter_node(newsletter)
            if newsletter_node:
                print("    ✅ Newsletter node created")
            else:
                results['errors'].append("Failed to create newsletter node")
        else:
            results['errors'].append("Graph operations not available")
        
        # Step 4: Process entities and create/update nodes
        print("  4️⃣ Processing entities in graph...")
        entity_summary = {'Organization': 0, 'Person': 0, 'Product': 0, 'Event': 0, 'Location': 0, 'Topic': 0}
        new_entities = 0
        updated_entities = 0
        
        if graph_ops and entities:
            for entity in entities:
                try:
                    # Create or update entity
                    result = graph_ops.create_or_update_entity(entity)
                    if result:
                        if result.get('operation') == 'created':
                            new_entities += 1
                        else:
                            updated_entities += 1
                        
                        # Link to newsletter
                        graph_ops.link_entity_to_newsletter(
                            entity.name, 
                            entity.type, 
                            newsletter.newsletter_id, 
                            entity.context
                        )
                        
                        # Update summary
                        entity_summary[entity.type] += 1
                        
                except Exception as e:
                    results['errors'].append(f"Error processing entity {entity.name}: {e}")
            
            print(f"    ✅ Created {new_entities} new entities, updated {updated_entities} existing entities")
        
        # Step 5: Generate summary
        print("  5️⃣ Generating summary...")
        results['entities_new'] = new_entities
        results['entities_updated'] = updated_entities
        results['summary'] = entity_summary
        
        # Create text summary
        entity_mentions = []
        for entity_type, count in entity_summary.items():
            if count > 0:
                entity_mentions.append(f"{count} {entity_type.lower()}{'s' if count > 1 else ''}")
        
        processing_time = (datetime.now() - start_time).total_seconds()
        results['processing_time'] = processing_time
        
        results['text_summary'] = f"""Processed newsletter '{newsletter.subject}' from {newsletter.sender}. 
Extracted {results['entities_extracted']} entities ({', '.join(entity_mentions) if entity_mentions else 'none'}). 
Created {new_entities} new entities, updated {updated_entities} existing entities. 
Processing completed in {processing_time:.2f} seconds."""
        
        print(f"✅ Processing completed in {processing_time:.2f} seconds")
        
        return results
    
    except Exception as e:
        results['status'] = 'error'
        results['errors'].append(f"Pipeline error: {e}")
        results['processing_time'] = (datetime.now() - start_time).total_seconds()
        print(f"❌ Processing failed: {e}")
        return results

# Test the complete pipeline
print("🔄 Testing complete newsletter processing pipeline...")
pipeline_results = process_newsletter_pipeline(sample_newsletter)

print("\n📊 Pipeline Results:")
print(f"  Status: {pipeline_results['status']}")
print(f"  Processing time: {pipeline_results['processing_time']:.2f} seconds")
print(f"  Entities extracted: {pipeline_results['entities_extracted']}")
print(f"  New entities: {pipeline_results['entities_new']}")
print(f"  Updated entities: {pipeline_results['entities_updated']}")
print(f"  Entity summary: {pipeline_results['summary']}")

if pipeline_results['errors']:
    print(f"  Errors: {pipeline_results['errors']}")

print(f"\n📝 Text Summary:")
print(pipeline_results['text_summary'])

## 9. Results Analysis and Visualization

In [None]:
# Analyze and visualize results
if graph_ops and pipeline_results['status'] == 'success':
    # Get updated graph statistics
    updated_stats = graph_ops.get_graph_stats()
    
    print("📊 Updated graph statistics:")
    for key, value in updated_stats.items():
        print(f"  {key.capitalize()}: {value}")
    
    # Create visualization of entity types
    entity_data = pipeline_results['summary']
    entity_types = list(entity_data.keys())
    entity_counts = list(entity_data.values())
    
    if sum(entity_counts) > 0:
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # Bar chart of entity types
        bars = ax1.bar(entity_types, entity_counts, color=sns.color_palette("husl", len(entity_types)))
        ax1.set_title('Extracted Entities by Type')
        ax1.set_xlabel('Entity Type')
        ax1.set_ylabel('Count')
        ax1.tick_params(axis='x', rotation=45)
        
        # Add value labels on bars
        for bar, count in zip(bars, entity_counts):
            if count > 0:
                ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1, 
                        str(count), ha='center', va='bottom')
        
        # Pie chart of entity distribution
        non_zero_types = [t for t, c in zip(entity_types, entity_counts) if c > 0]
        non_zero_counts = [c for c in entity_counts if c > 0]
        
        if non_zero_counts:
            ax2.pie(non_zero_counts, labels=non_zero_types, autopct='%1.1f%%', startangle=90)
            ax2.set_title('Entity Distribution')
        else:
            ax2.text(0.5, 0.5, 'No entities extracted', ha='center', va='center', transform=ax2.transAxes)
        
        plt.tight_layout()
        plt.show()
    else:
        print("📊 No entities to visualize")
else:
    print("📊 Visualization skipped - no successful results or graph connection")

## 10. Next Steps and Production Preparation

### Development Summary

This notebook has successfully implemented and tested the core components of the newsletter processing pipeline:

✅ **Completed Components:**
- Configuration management with environment variables
- HTML processing and text extraction
- LLM integration for entity extraction
- Neo4j graph database connection and operations
- Entity creation and relationship management
- Complete processing pipeline
- Results analysis and visualization

### Next Development Steps:

1. **Fact Extraction**: Implement relationship extraction between entities
2. **Temporal Processing**: Add time-based relationship handling
3. **Entity Resolution**: Improve fuzzy matching and disambiguation
4. **Batch Processing**: Handle multiple newsletters efficiently
5. **Error Handling**: Add comprehensive error recovery
6. **Performance Optimization**: Optimize LLM calls and graph operations

### Production Migration:

1. **Extract Code to Modules**: Move notebook functions to `src/` directory
2. **FastAPI Integration**: Create REST endpoint using developed logic
3. **Testing**: Implement comprehensive test suite
4. **Deployment**: Configure for Cloud Run deployment
5. **Monitoring**: Add logging and metrics collection

### Key Insights:

- The pipeline successfully processes newsletter content end-to-end
- Entity extraction quality depends on LLM prompt engineering
- Graph operations are efficient for entity management
- Real-time processing is feasible for typical newsletter volumes
- Error handling is critical for production reliability

In [None]:
# Clean up connections
if neo4j_conn:
    neo4j_conn.close()

print("🧹 Cleanup completed")
print("\n🎉 Newsletter processing pipeline development complete!")
print("\n📋 Ready for production migration:")
print("  1. Extract functions to src/ modules")
print("  2. Create FastAPI endpoints")
print("  3. Add comprehensive testing")
print("  4. Configure deployment")
print("  5. Set up monitoring")