# 🏗️ Full Pipeline Walkthrough: Multi-Document Extraction


**What you'll learn:**
- Complete custom pipeline with Neo4j GraphRAG
- Page-by-page PDF processing with Gemini vision
- Robust error handling (JSON repair, timeouts, failures)
- Basic entity resolution across multiple documents
- Working with pre-loaded data

**Important:** The extraction process takes 1+ hour for all documents. We'll walk through the code, then use pre-loaded data for queries (part 4)

---

## 📊 What This Pipeline Does

This notebook shows a pipeline that:
1. **Loads PDFs page-by-page** - Each page processed individually
2. **Extracts entities with Gemini vision** - Sees tables, logos, visual layout
3. **Creates lexical graph** - Document→Chunk→Entity relationships
4. **Handles errors gracefully** - JSON repair, timeouts, skip failures
5. **Writes to Neo4j** - Complete knowledge graph with all connections

**Documents processed:** 4 pharmaceutical pipeline reports (~150 pages total)



## 📦 Step 1: Install Dependencies



In [None]:
%%capture
%pip install google-genai pymupdf python-dotenv "neo4j-graphrag[experimental]" json-repair neo4j


## 🔐 Step 2: Configure Credentials



In [1]:
from dotenv import load_dotenv
import os

load_dotenv()

# Replace with provided credentials
NEO4J_URI = os.getenv('NEO4J_URI', 'neo4j+s://SHARED-INSTANCE.databases.neo4j.io')
NEO4J_USERNAME = os.getenv('NEO4J_USERNAME', 'neo4j')
NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD', 'shared-password-here')
NEO4J_DATABASE = os.getenv('NEO4J_DATABASE', 'neo4j')

# Gemini API key (only needed if you want to run extraction)
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', 'your-key-here')

print("✓ Credentials configured")
print(f"  Connecting to: {NEO4J_URI.split('@')[-1] if '@' in NEO4J_URI else NEO4J_URI}")


✓ Credentials configured
  Connecting to: neo4j://127.0.0.1:7687


---

# 🔧 Pipeline Components

The following cells define the custom components for our pipeline.  
**You can read through the code to understand how it works.**


## 🎯 Step 3: Define Schema

Same pharmaceutical schema as previous notebooks.


In [2]:
from neo4j_graphrag.experimental.components.schema import SchemaBuilder, NodeType, RelationshipType

# Node types
node_types = [
    NodeType(label="Molecule", description="Drug or therapeutic molecule"),
    NodeType(label="Company", description="Pharmaceutical company"),
    NodeType(label="Target", description="Biological target (protein, pathway, etc)"),
    NodeType(label="Disease", description="Disease or medical condition"),
]

# Relationship types
relationship_types = [
    RelationshipType(label="TREATS", description="Molecule treats disease"),
    RelationshipType(label="TARGETS", description="Molecule targets biological target"),
    RelationshipType(label="ASSOCIATED", description="Disease associated with target"),
    RelationshipType(label="IN_PIPELINE", description="Company has molecule in pipeline"),
]

# Expected patterns
patterns = [
    ("Molecule", "TREATS", "Disease"),
    ("Company", "IN_PIPELINE", "Molecule"),
    ("Molecule", "TARGETS", "Target"),
    ("Disease", "ASSOCIATED", "Target"),
]

print("✓ Schema components defined")


✓ Schema components defined


## 📄 Step 4: Page-by-Page PDF Loader

Custom component that loads PDFs and processes each page individually.

**Why page-by-page?**
- Better extraction quality (focused context per page)
- More granular error handling
- Easier to track progress
- Can parallelize in production


In [3]:
import fitz  # PyMuPDF
from neo4j_graphrag.experimental.components.pdf_loader import PdfLoader
from neo4j_graphrag.experimental.components.types import TextChunk, TextChunks, DocumentInfo
from neo4j_graphrag.experimental.pipeline.component import DataModel
from pydantic import validate_call
from pathlib import Path
from typing import Union

class PageByPageResult(DataModel):
    """Result containing chunks and document info."""
    chunks: TextChunks
    document_info: DocumentInfo

class PageByPagePdfLoader(PdfLoader):
    """Custom PDF loader that processes each page separately."""
    
    @validate_call
    async def run(self, filepath: Union[str, Path]) -> PageByPageResult:
        filepath = str(filepath)
        doc = fitz.open(filepath)
        chunks = []
        
        # Process each page
        for page_num in range(len(doc)):
            page = doc[page_num]
            text = page.get_text()  # Plain text
            
            # Extract single page as PDF bytes
            page_pdf = fitz.open()
            page_pdf.insert_pdf(doc, from_page=page_num, to_page=page_num)
            pdf_bytes = page_pdf.tobytes()
            page_pdf.close()
            
            # Create chunk with both text and PDF bytes
            chunks.append(TextChunk(
                text=text,
                index=page_num,
                metadata={
                    'pdf_bytes': pdf_bytes,  # For Gemini vision
                    'page_number': page_num + 1,
                    'source_file': os.path.basename(filepath)
                }
            ))
        
        doc.close()
        
        document_info = DocumentInfo(
            path=filepath,
            document_type="pdf"
        )
        
        return PageByPageResult(
            chunks=TextChunks(chunks=chunks),
            document_info=document_info
        )

print("✓ PageByPagePdfLoader defined")


✓ PageByPagePdfLoader defined


## 🛡️ Step 5: Robust Gemini PDF Extractor

This extractor includes 3 fixes:

1. **JSON Repair** - Fixes malformed JSON from Gemini (sometimes happens with complex outputs)
2. **Timeouts** - Prevents hanging on slow API calls (60s limit per page)
3. **OnError.IGNORE** - Continues processing even if individual pages fail


In [4]:
from typing import Any, Optional
import json
import asyncio
from google import genai
from google.genai import types
from json_repair import repair_json
from neo4j_graphrag.experimental.components.entity_relation_extractor import EntityRelationExtractor, OnError
from neo4j_graphrag.experimental.components.types import Neo4jGraph, Neo4jNode, Neo4jRelationship, LexicalGraphConfig
from neo4j_graphrag.experimental.components.schema import GraphSchema
from neo4j_graphrag.experimental.components.lexical_graph import LexicalGraphBuilder

class GeminiPdfExtractor(EntityRelationExtractor):
    """Robust Gemini extractor with error handling."""
    
    def __init__(self, gemini_client, create_lexical_graph: bool = True, on_error: OnError = OnError.IGNORE):
        super().__init__(create_lexical_graph=create_lexical_graph, on_error=on_error)
        self.client = gemini_client
    
    def _build_schema_text(self, schema: GraphSchema) -> str:
        """Convert schema to prompt text."""
        node_labels = [node.label for node in schema.node_types]
        rel_types = [rel.label for rel in schema.relationship_types]
        
        schema_lines = []
        schema_lines.append("Node Labels: " + ", ".join(node_labels))
        schema_lines.append("Relationship Types: " + ", ".join(rel_types))
        
        if schema.patterns:
            schema_lines.append("Patterns:")
            for start, rel, end in schema.patterns:
                schema_lines.append(f"- ({start})-[{rel}]->({end})")
        
        return "\n".join(schema_lines)
    
    async def extract_for_chunk(self, schema: GraphSchema, chunk: TextChunk, pdf_bytes: bytes) -> Neo4jGraph:
        """Extract entities from a single page with error handling."""
        schema_text = self._build_schema_text(schema) if schema else ""
        
        prompt_template = f"""You are a medical researcher extracting information from pharmaceutical documents.

Extract entities and relationships from this PDF page.

Return JSON:
{{"nodes": [{{"id": "0", "label": "entity_type", "properties": {{"name": "entity_name"}}}}],
  "relationships": [{{"type": "REL_TYPE", "start_node_id": "0", "end_node_id": "1", "properties": {{"details": "description"}}}}]}}

Schema:
{schema_text}
"""
        
        try:
            # FIX 2: Add timeout (60 seconds)
            async def call_gemini():
                return self.client.models.generate_content(
                    model="gemini-2.5-flash",
                    contents=[
                        types.Part.from_bytes(data=pdf_bytes, mime_type='application/pdf'),
                        prompt_template
                    ],
                    config=types.GenerateContentConfig(response_mime_type="application/json")
                )
            
            response = await asyncio.wait_for(call_gemini(), timeout=60.0)
            
            # FIX 1: JSON repair for malformed JSON
            try:
                graph_data = json.loads(response.text)
            except json.JSONDecodeError as e:
                print(f"    ⚠ JSON error, attempting repair...")
                repaired_json = repair_json(response.text)
                graph_data = json.loads(repaired_json)
                print(f"    ✓ JSON repaired")
            
            # Convert to Neo4j graph objects
            nodes = [
                Neo4jNode(
                    id=node['id'],
                    label=node['label'],
                    properties=node.get('properties', {})
                )
                for node in graph_data.get('nodes', [])
            ]
            
            relationships = [
                Neo4jRelationship(
                    type=rel['type'],
                    start_node_id=rel['start_node_id'],
                    end_node_id=rel['end_node_id'],
                    properties=rel.get('properties', {})
                )
                for rel in graph_data.get('relationships', [])
            ]
            
            return Neo4jGraph(nodes=nodes, relationships=relationships)
            
        except asyncio.TimeoutError:
            print(f"    ✗ Timeout - skipping page")
            return Neo4jGraph(nodes=[], relationships=[])
        except Exception as e:
            print(f"    ✗ Error: {type(e).__name__} - skipping page")
            return Neo4jGraph(nodes=[], relationships=[])
    
    @validate_call
    async def run(
        self, 
        chunks: TextChunks, 
        document_info: Optional[DocumentInfo] = None,
        lexical_graph_config: Optional[LexicalGraphConfig] = None,
        schema: Optional[GraphSchema] = None, 
        **kwargs: Any
    ) -> Neo4jGraph:
        """Process all chunks."""
        # Create lexical graph (Document→Chunk relationships)
        lexical_graph_builder = None
        lexical_graph = None
        
        if self.create_lexical_graph:
            config = lexical_graph_config or LexicalGraphConfig()
            lexical_graph_builder = LexicalGraphBuilder(config=config)
            lexical_graph_result = await lexical_graph_builder.run(
                text_chunks=chunks, 
                document_info=document_info
            )
            lexical_graph = lexical_graph_result.graph
        
        schema = schema or GraphSchema(node_types=())
        chunk_graphs = []
        
        # Process each page
        for chunk in chunks.chunks:
            pdf_bytes = chunk.metadata.pop('pdf_bytes')
            page_num = chunk.metadata.get('page_number', '?')
            
            print(f"  Processing page {page_num}...")
            
            try:
                chunk_graph = await self.extract_for_chunk(schema, chunk, pdf_bytes)
                self.update_ids(chunk_graph, chunk)
                
                if lexical_graph_builder:
                    await lexical_graph_builder.process_chunk_extracted_entities(
                        chunk_graph, chunk
                    )
                
                chunk_graphs.append(chunk_graph)
            except Exception as e:
                # FIX 3: Continue on error
                print(f"    ✗ Failed, continuing...")
                continue
        
        # Combine all graphs
        if lexical_graph:
            graph = lexical_graph.model_copy(deep=True)
        else:
            graph = Neo4jGraph()
        
        for chunk_graph in chunk_graphs:
            graph.nodes.extend(chunk_graph.nodes)
            graph.relationships.extend(chunk_graph.relationships)
        
        return graph

print("✓ GeminiPdfExtractor defined")


✓ GeminiPdfExtractor defined


---

# 🚀 Pipeline Setup (Commented Out - 1+ Hour Runtime)

The following cells show how to build and run the pipeline.  
**These are commented out** since extraction takes over an hour.


## Step 6: Build Pipeline (Commented Out)

This shows how to connect all components together.


In [None]:
# COMMENTED OUT - This code works but takes 1+ hour to run
#
# import neo4j
# from neo4j_graphrag.experimental.pipeline import Pipeline
# from neo4j_graphrag.experimental.components.kg_writer import Neo4jWriter
#
# # Initialize components
# gemini_client = genai.Client(api_key=GEMINI_API_KEY)
# driver = neo4j.GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
#
# pdf_loader = PageByPagePdfLoader()
# schema_builder = SchemaBuilder()
# extractor = GeminiPdfExtractor(gemini_client, create_lexical_graph=True)
# writer = Neo4jWriter(driver=driver, neo4j_database=NEO4J_DATABASE)
#
# # Build pipeline
# pipe = Pipeline()
# pipe.add_component(pdf_loader, "pdf_loader")
# pipe.add_component(schema_builder, "schema")
# pipe.add_component(extractor, "extractor")
# pipe.add_component(writer, "writer")
#
# # Connect components
# pipe.connect("pdf_loader", "extractor", {"chunks": "pdf_loader.chunks", "document_info": "pdf_loader.document_info"})
# pipe.connect("schema", "extractor", {"schema": "schema"})
# pipe.connect("extractor", "writer", {"graph": "extractor"})
#
# print("✓ Pipeline built")

print("⚠️  Pipeline code commented out (takes 1+ hour to run)")


## Step 7: Process Documents (Commented Out)

This is how you would run the pipeline on all documents.


In [None]:
# COMMENTED OUT - This code works but takes 1+ hour
#
# documents = [
#     "/workshop-data/AbbVie Long-Term Guidance and Pipeline Update.pdf",
#     "/workshop-data/BMY-2024-Q1-Results-Investor-Presentation-with-Appendix.pdf",
#     "/workshop-data/JNJ-Pipeline-2Q2024.pdf",
#     "/workshop-data/ph-rd-pipeline-2025-07-24-update-20250725.pdf",
# ]
#
# async def process_document(filepath):
#     print(f"\n{'='*60}")
#     print(f"Processing: {os.path.basename(filepath)}")
#     print('='*60)
#     
#     pipe_inputs = {
#         "pdf_loader": {"filepath": filepath},
#         "schema": {
#             "node_types": node_types,
#             "relationship_types": relationship_types,
#             "patterns": patterns,
#         }
#     }
#     
#     result = await pipe.run(pipe_inputs)
#     print(f"✓ Completed: {os.path.basename(filepath)}")
#     return result
#
# # Process all documents
# import time
# results = []
# start_time = time.time()
#
# for doc_path in documents:
#     if os.path.exists(doc_path):
#         try:
#             result = await process_document(doc_path)
#             results.append(result)
#         except Exception as e:
#             print(f"✗ Error processing {doc_path}: {e}")
#
# elapsed = time.time() - start_time
# print(f"\n✅ All documents processed in {elapsed/60:.1f} minutes")

print("⚠️  Extraction code commented out")
print("💡 Extraction would process ~150 pages across 4 documents")


---

# 🗄️ Accessing Pre-Loaded Data

Instead of running the 1+ hour extraction, we'll connect to a database with pre-loaded data.

**Two options:**

### Option 1: Shared Aura Instance (Recommended)
- Instructor provides credentials
- Read-only access
- Instant access to full dataset

### Option 2: Load from Dump File
- Download Neo4j dump file from: `[LINK TO BE PROVIDED]`
- Load into your own Neo4j instance
- Full control for experimentation

**For this workshop, use the shared Aura instance** (credentials in Step 2).


## Step 8: Connect to Neo4j with Pre-Loaded Data


In [5]:
import neo4j

# Connect to Neo4j (using credentials from Step 2)
driver = neo4j.GraphDatabase.driver(
    NEO4J_URI,
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD)
)

# Test connection
try:
    driver.verify_connectivity()
    print(f"✓ Connected to Neo4j")
    print(f"  URI: {NEO4J_URI}")
    print(f"  Database: {NEO4J_DATABASE}")
except Exception as e:
    print(f"❌ Connection failed: {e}")
    print("Check your credentials in Step 2")


✓ Connected to Neo4j
  URI: neo4j://127.0.0.1:7687
  Database: neo4j


## Step 9: Explore the Graph

Let's see what's in the pre-loaded database!


In [6]:
def run_query(query):
    """Helper to run Cypher queries."""
    with driver.session(database=NEO4J_DATABASE) as session:
        result = session.run(query)
        return result.data()

# Get graph statistics
print("📊 Graph Statistics:\n")

# Nodes by type
for label in ["Molecule", "Company", "Target", "Disease", "Document", "Chunk"]:
    count = run_query(f"MATCH (n:{label}) RETURN count(n) as count")[0]['count']
    print(f"  {label}: {count:,}")

# Relationships by type
print("\nRelationships:")
for rel_type in ["TREATS", "TARGETS", "ASSOCIATED", "IN_PIPELINE", "HAS_CHUNK", "NEXT_CHUNK", "FROM_CHUNK"]:
    count = run_query(f"MATCH ()-[r:{rel_type}]->() RETURN count(r) as count")[0]['count']
    if count > 0:
        print(f"  {rel_type}: {count:,}")

print("\n✓ Rich knowledge graph with ~150 pages extracted!")




📊 Graph Statistics:

  Molecule: 360
  Company: 61
  Target: 164
  Disease: 308
  Document: 4
  Chunk: 84

Relationships:
  TREATS: 499
  TARGETS: 203
  ASSOCIATED: 157
  IN_PIPELINE: 335
  NEXT_CHUNK: 80
  FROM_CHUNK: 1,320

✓ Rich knowledge graph with ~150 pages extracted!


## Step 10: Sample Queries

Let's run some interesting queries on the full dataset!


In [7]:
# Query 1: Top companies by pipeline size
print("🏢 Top Companies by Pipeline Size:\n")
query = """
MATCH (c:Company)-[:IN_PIPELINE]->(m:Molecule)
RETURN c.name as company, count(m) as molecules
ORDER BY molecules DESC
LIMIT 5
"""
results = run_query(query)
for r in results:
    print(f"  {r['company']}: {r['molecules']} molecules")

print()


🏢 Top Companies by Pipeline Size:

  Bristol Myers Squibb: 100 molecules
  Bayer: 59 molecules
  AbbVie: 59 molecules
  J&J: 53 molecules
  Calibr: 2 molecules



In [8]:
# Query 2: Most targeted diseases
print("🎯 Most Targeted Diseases:\n")
query = """
MATCH (m:Molecule)-[:TREATS]->(d:Disease)
RETURN d.name as disease, count(m) as molecules
ORDER BY molecules DESC
LIMIT 5
"""
results = run_query(query)
for r in results:
    print(f"  {r['disease']}: {r['molecules']} molecules")

print()


🎯 Most Targeted Diseases:

  Solid Tumors: 19 molecules
  Advanced Solid Tumors: 16 molecules
  NSCLC: 11 molecules
  Oncology: 10 molecules
  Hematology: 10 molecules



In [9]:
# Query 3: Molecules with their context (company and indication)
print("💊 Sample Molecules with Context:\n")
query = """
MATCH (c:Company)-[:IN_PIPELINE]->(m:Molecule)-[:TREATS]->(d:Disease)
RETURN m.name as molecule, c.name as company, d.name as disease
LIMIT 10
"""
results = run_query(query)
for r in results:
    print(f"  {r['molecule']} ({r['company']}) → treats {r['disease']}")

print()


💊 Sample Molecules with Context:

  Skyrizi (AbbVie) → treats Psoriasis/Psoriatic Arthritis
  Skyrizi (AbbVie) → treats Inflammatory Bowel Disease
  Skyrizi (AbbVie) → treats Psoriatic Arthritis
  Skyrizi (AbbVie) → treats Crohn's Disease
  Skyrizi (AbbVie) → treats Psoriasis
  Skyrizi (AbbVie) → treats UC
  Rinvoq (AbbVie) → treats Inflammatory Bowel Disease
  Rinvoq (AbbVie) → treats Rheumatology
  Rinvoq (AbbVie) → treats Dermatology
  Rinvoq (AbbVie) → treats Rheumatoid Arthritis



---

## 🎓 What You've Learned

Congratulations! You've seen:

✅ **Production-ready pipeline** with robust error handling  
✅ **Page-by-page processing** for better extraction quality  
✅ **Gemini vision** for understanding complex PDF layouts  
✅ **Lexical graph** connecting Documents→Chunks→Entities  
✅ **Multi-document extraction** across 4 pharmaceutical reports  
✅ **Complete knowledge graph** ready for advanced queries  

## 🔑 Key Takeaways

1. **Custom components** give you full control over the pipeline
2. **Error handling** (JSON repair, timeouts) is essential for production
3. **Lexical graph** provides traceable entity-to-source connections
4. **Page-by-page** processing enables better context and parallelization
5. **Pre-loading data** speeds up development and workshops

---

## 💡 Next Steps

In **Part 4**, you'll see:
- Agentic GraphRAG patterns
- Text2Cypher with MCP
- Building conversational agents
- Advanced query patterns

**Next Notebook:** Agentic GraphRAG →


---

## 🔧 Cleanup


In [None]:
# Close driver connection
driver.close()
print("✓ Connection closed")
