# Week 2: arXiv API Integration & PDF Processing

**What We're Building This Week:**

Week 2 focuses on implementing the core data ingestion pipeline that will automatically fetch, process, and store arXiv papers. This is the foundation that feeds our RAG system with fresh academic content.

## Week 2 Focus Areas

### 🎯 Core Objectives
- **arXiv API Integration**: Build a robust client with rate limiting and retry logic
- **PDF Processing Pipeline**: Download and parse scientific PDFs with structured content extraction
- **Database Storage**: Persist paper metadata and content in PostgreSQL
- **Error Handling**: Implement comprehensive error handling and graceful degradation
- **Automation Ready**: Prepare components for Airflow orchestration

### 🔧 What We'll Test In This Notebook
1. **arXiv API Client** - Fetch CS.AI papers with proper rate limiting
2. **PDF Download System** - Download and cache PDFs with error handling  
3. **Docling PDF Parser** - Extract structured content (sections, tables, figures)
4. **Database Integration** - Store and retrieve papers from PostgreSQL
5. **Complete Pipeline** - End-to-end processing from arXiv to database
6. **Production Readiness** - Error handling, logging, and performance metrics


### 📊 Success Metrics
- arXiv API calls succeed with proper rate limiting
- PDF download and caching works reliably  
- Docling extracts structured content from scientific PDFs
- Database stores complete paper metadata
- Pipeline handles errors gracefully and continues processing
- All components ready for Airflow automation (Week 2+)

---

## Week 2 Component Status
| Component | Purpose | Status |
|-----------|---------|--------|
| **arXiv API Client** | Fetch CS.AI papers with rate limiting | ✅ Complete |
| **PDF Downloader** | Download and cache PDFs locally | ✅ Complete |
| **Docling Parser** | Extract structured content from PDFs | ✅ Complete |
| **Metadata Fetcher** | Orchestrate complete pipeline | ✅ Complete |
| **Database Storage** | Store papers in PostgreSQL | ⚠️ Needs volume refresh |
| **Airflow DAGs** | Automate daily ingestion | ⚠️ Needs container update |


## ⚠️ IMPORTANT: Week 2 Database Schema Update

**NEW USERS OR SCHEMA CONFLICTS**: If you're starting Week 2 fresh or experiencing database schema conflicts, use this clean start approach:

### Fresh Start (Recommended for Week 2)
```bash
# Complete clean slate - removes all data but ensures correct schema
docker compose down -v

# Build fresh containers with latest code
docker compose up --build -d
```

**When to use this:**
- First time running Week 2 
- Schema errors or column missing errors
- Want to start with clean database
- Previous Week 1 data not important

**Note**: This destroys existing data but ensures you have the correct Week 2 schema with all new columns for PDF processing and arXiv metadata.

---

## Prerequisites Check

**Before starting:**
1. Week 1 infrastructure completed
2. UV environment activated
3. Docker Desktop running

**Why fresh containers?** Week 2 includes new Airflow dependencies and code changes that require rebuilding images rather than using cached layers.

In [None]:
# Check if Fresh Containers are Built and All Services Healthy
import subprocess
import requests
from pathlib import Path

print("WEEK 2 CONTAINER & SERVICE HEALTH CHECK")
print("=" * 50)

# Find project root
current_dir = Path.cwd()
if current_dir.name == "week2" and current_dir.parent.name == "notebooks":
    project_root = current_dir.parent.parent
elif (current_dir / "compose.yml").exists():
    project_root = current_dir
else:
    print("✗ Could not find project root")
    exit()

print(f"Project root: {project_root}")

# Step 1: Check if containers are built and running
print("\n1. Checking container status...")
try:
    result = subprocess.run(
        ["docker", "compose", "ps", "--format", "table"],
        cwd=str(project_root),
        capture_output=True,
        text=True,
        timeout=10
    )
    
    if result.returncode == 0 and result.stdout.strip():
        print("✓ Containers are running:")
        for line in result.stdout.strip().split('\n'):
            print(f"   {line}")
    else:
        print("✗ No containers running or docker compose failed")
        print("Please run the build commands from the markdown cell above")
        exit()
        
except Exception as e:
    print(f"✗ Error checking containers: {e}")
    print("Please run the build commands from the markdown cell above")
    exit()

# Step 2: Check all service health (corrected endpoints)
print("\n2. Checking service health...")
services_to_test = {
    "FastAPI": "http://localhost:8000/api/v1/health",
    "PostgreSQL (via API)": "http://localhost:8000/api/v1/health", 
    "Ollama": "http://localhost:11434/api/version",
    "OpenSearch": "http://localhost:9200/_cluster/health",
    "Airflow": "http://localhost:8080/health"
}

all_healthy = True
for service_name, url in services_to_test.items():
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            print(f"✓ {service_name}: Healthy")
        else:
            print(f"✗ {service_name}: HTTP {response.status_code}")
            all_healthy = False
    except requests.exceptions.ConnectionError:
        print(f"✗ {service_name}: Not accessible")
        all_healthy = False
    except Exception as e:
        print(f"✗ {service_name}: {type(e).__name__}")
        all_healthy = False

print("\n" + "=" * 50)
if all_healthy:
    print("✓ ALL SERVICES HEALTHY! Ready for Week 2 development.")
else:
    print("✗ Some services need attention.")
    print("If you just rebuilt containers, wait 1-2 minutes and run this cell again.")
    print("Airflow and OpenSearch take longest to start up.")

In [None]:
# Environment Check
import sys
from pathlib import Path

print(f"Python Version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print(f"Environment: {sys.executable}")

# Find project root
current_dir = Path.cwd()
if current_dir.name == "week2" and current_dir.parent.name == "notebooks":
    project_root = current_dir.parent.parent
elif (current_dir / "compose.yml").exists():
    project_root = current_dir
else:
    project_root = None

if project_root and (project_root / "compose.yml").exists():
    print(f"✓ Project root: {project_root}")
    # Add project to Python path
    sys.path.insert(0, str(project_root))
else:
    print("✗ Missing compose.yml - check directory")
    exit()

## Service Health Verification

Ensure all services from Week 1 are still running correctly:

### 🔗 Service Access Points
- **FastAPI**: http://localhost:8000/docs (API documentation)
- **PostgreSQL**: via API or `docker exec -it rag-postgres psql -U rag_user -d rag_db`
- **OpenSearch**: http://localhost:9200/_cluster/health
- **Ollama**: http://localhost:11434 (LLM service)
- **Airflow**: http://localhost:8080 (Username: `admin`, Password: `admin`)

In [None]:
# Test Service Connectivity
import requests
import subprocess
import json

services_to_test = {
    "FastAPI": "http://localhost:8000/api/v1/health",
    "PostgreSQL (via API)": "http://localhost:8000/api/v1/health", 
    "Ollama": "http://localhost:11434/api/version",
    "OpenSearch": "http://localhost:9200/_cluster/health",
    "Airflow": "http://localhost:8080/health"  
}

print("WEEK 2 PREREQUISITE CHECK")
print("=" * 50)

all_healthy = True

for service_name, url in services_to_test.items():
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            print(f"✓ {service_name}: Healthy")
        else:
            print(f"✗ {service_name}: HTTP {response.status_code}")
            all_healthy = False
    except requests.exceptions.ConnectionError:
        print(f"✗ {service_name}: Not accessible")
        all_healthy = False
    except Exception as e:
        print(f"✗ {service_name}: {type(e).__name__}")
        all_healthy = False

print()
if all_healthy:
    print("All services healthy! Ready for Week 2 development.")
else:
    print("Some services need attention. Check Week 1 notebook.")

## 1. arXiv API Client Testing

Test the arXiv API client with rate limiting and retry logic:

In [None]:
# Test arXiv API Client
import asyncio
from datetime import datetime, timedelta

# Import our arXiv client
from src.services.arxiv.factory import make_arxiv_client

print("TESTING ARXIV API CLIENT")
print("=" * 40)

# Create client
arxiv_client = make_arxiv_client()
print(f"✓ Client created: {arxiv_client.base_url}")
print(f"   Rate limit: {arxiv_client.rate_limit_delay}s")
print(f"   Max results: {arxiv_client.max_results}")
print(f"   Category: {arxiv_client.search_category}")
print()

In [None]:
# Test Paper Fetching
async def test_paper_fetching():
    """Test fetching papers from arXiv with rate limiting."""
    
    print("Test 1: Fetch Recent CS.AI Papers")
    try:
        papers = await arxiv_client.fetch_papers(
            max_results=2, 
            sort_by="submittedDate",
            sort_order="descending"
        )
        
        print(f"✓ Fetched {len(papers)} papers")
        
        if papers:
            for i, paper in enumerate(papers[:2], 1):
                print(f"   {i}. [{paper.arxiv_id}] {paper.title[:60]}...")
                print(f"      Authors: {', '.join(paper.authors[:2])}{'...' if len(paper.authors) > 2 else ''}")
                print(f"      Categories: {', '.join(paper.categories)}")
                print(f"      Published: {paper.published_date}")
                print()
        
        return papers
        
    except Exception as e:
        print(f"✗ Error fetching papers: {e}")
        if "503" in str(e):
            print("   arXiv API temporarily unavailable (normal)")
            print("   Rate limiting and error handling working correctly")
        return []

# Run the test
papers = await test_paper_fetching()

In [None]:
# Test Date Filtering
async def test_date_filtering():
    """Test date range filtering functionality."""
    
    print("Test 2: Date Range Filtering")
    
    # Use specific dates: 
    from_date = "20250808"  
    to_date = "20250809"    
    try:
        date_papers = await arxiv_client.fetch_papers(
            max_results=5,
            from_date=from_date,
            to_date=to_date
        )
        
        print(f"✓ Date filtering test: {len(date_papers)} papers from {from_date}-{to_date}")
        
        if date_papers:
            for i, paper in enumerate(date_papers, 1):
                print(f"   {i}. [{paper.arxiv_id}] {paper.title[:60]}...")
                print(f"      Authors: {', '.join(paper.authors[:2])}{'...' if len(paper.authors) > 2 else ''}")
                print(f"      Categories: {', '.join(paper.categories)}")
                print(f"      Published: {paper.published_date}")
                print()
        
        return date_papers
        
    except Exception as e:
        print(f"✗ Date filtering error: {e}")
        return []

# Run date filtering test
date_papers = await test_date_filtering()

## 2. PDF Download and Caching

Test PDF download functionality with caching:

In [None]:
# Test PDF Download
async def test_pdf_download(test_papers):
    """Test PDF downloading with caching."""

    print("Test 3: PDF Download & Caching")
    
    if not test_papers:
        print("No papers available for PDF download test")
        return None
    
    # Test with first paper
    test_paper = test_papers[0]
    print(f"Testing PDF download for: {test_paper.arxiv_id}")
    print(f"Title: {test_paper.title[:60]}...")
    
    try:
        # Download PDF 
        pdf_path = await arxiv_client.download_pdf(test_paper)
        
        if pdf_path and pdf_path.exists():
            size_mb = pdf_path.stat().st_size / (1024 * 1024)
            print(f"✓ PDF downloaded: {pdf_path.name} ({size_mb:.2f} MB)")
            
            return pdf_path
        else:
            print("✗ PDF download failed")
            return None
            
    except Exception as e:
        print(f"✗ PDF download error: {e}")
        return None

# Run PDF download test 
pdf_path = await test_pdf_download(date_papers[:1])

## 3. Docling PDF Processing

Test PDF parsing with Docling for structured content extraction:

In [None]:
# Test PDF Parsing with Docling
from src.services.pdf_parser.factory import make_pdf_parser_service
from src.config import get_settings
from pathlib import Path

print("Test 4: PDF Parsing with Docling")
print("=" * 40)

# Create PDF parser
pdf_parser = make_pdf_parser_service()
settings = get_settings()
print("PDF parser service created")
print(f"Config: {settings.pdf_parser.max_pages} pages, {settings.pdf_parser.max_file_size_mb}MB")

# Test parsing with actual PDF files
cache_dir = Path("data/arxiv_pdfs")
if cache_dir.exists():
    pdf_files = list(cache_dir.glob("*.pdf"))
    print(f"\nFound {len(pdf_files)} PDF files to test parsing")
    
    if pdf_files:
        # Test parsing the first PDF
        test_pdf = pdf_files[0]
        print(f"Testing PDF parsing with: {test_pdf.name}")
        
        try:
            pdf_content = await pdf_parser.parse_pdf(test_pdf)
            
            if pdf_content:
                print(f"✓ PDF parsing successful!")
                print(f"  Sections: {len(pdf_content.sections)}")
                print(f"  Raw text length: {len(pdf_content.raw_text)} characters")
                print(f"  Parser used: {pdf_content.parser_used}")
                
                # Show first section as example
                if pdf_content.sections:
                    first_section = pdf_content.sections[0]
                    print(f"  First section: '{first_section.title}' ({len(first_section.content)} chars)")
            else:
                print("✗ PDF parsing failed (Docling compatibility issue)")
                print("This is expected - not all PDFs work with Docling")
                
        except Exception as e:
            print(f"✗ PDF parsing error: {e}")
            print("This demonstrates the error handling in action")
    else:
        print("No PDF files available for parsing test")
else:
    print("No PDF cache directory found")

## 4. Database Storage Testing

Test storing papers in PostgreSQL database:

In [None]:
# Test Database Storage
from src.db.factory import make_database
from src.repositories.paper import PaperRepository
from src.schemas.arxiv.paper import PaperCreate
from dateutil import parser as date_parser

print("Test 5: Database Storage")
print("=" * 40)

# Create database connection
database = make_database()
print("✓ Database connection created")

if papers:
    test_paper = papers[0]
    print(f"Storing paper: {test_paper.arxiv_id}")
    
    try:
        with database.get_session() as session:
            paper_repo = PaperRepository(session)
            
            # Convert to database format
            published_date = date_parser.parse(test_paper.published_date) if isinstance(test_paper.published_date, str) else test_paper.published_date
            
            paper_create = PaperCreate(
                arxiv_id=test_paper.arxiv_id,
                title=test_paper.title,
                authors=test_paper.authors,
                abstract=test_paper.abstract,
                categories=test_paper.categories,
                published_date=published_date,
                pdf_url=test_paper.pdf_url
            )
            
            # Store paper (upsert to avoid duplicates)
            stored_paper = paper_repo.upsert(paper_create)
            
            if stored_paper:
                print(f"✓ Paper stored with ID: {stored_paper.id}")
                print(f"   Database ID: {stored_paper.id}")
                print(f"   arXiv ID: {stored_paper.arxiv_id}")
                print(f"   Title: {stored_paper.title[:50]}...")
                print(f"   Authors: {len(stored_paper.authors)} authors")
                print(f"   Categories: {', '.join(stored_paper.categories)}")
                
                # Test retrieval
                retrieved_paper = paper_repo.get_by_arxiv_id(test_paper.arxiv_id)
                if retrieved_paper:
                    print(f"✓ Paper retrieval test passed")
                else:
                    print(f"✗ Paper retrieval failed")
            else:
                print("✗ Paper storage failed")
                
    except Exception as e:
        print(f"✗ Database error: {e}")
else:
    print("No papers available for database storage test")

In [None]:
# Test Complete Pipeline
from src.services.metadata_fetcher import make_metadata_fetcher

print("Test 6: Complete Metadata Fetcher Pipeline")
print("=" * 50)

# Create metadata fetcher
metadata_fetcher = make_metadata_fetcher(arxiv_client, pdf_parser)
print("✓ Metadata fetcher service created")

# Test with small batch
print("Running small batch test (2 papers, no PDF processing for speed)...")

try:
    with database.get_session() as session:
        results = await metadata_fetcher.fetch_and_process_papers(
            max_results=2,  
            process_pdfs=False,  
            store_to_db=True,
            db_session=session
        )
    
    print("\nPIPELINE RESULTS:")
    print(f"   Papers fetched: {results.get('papers_fetched', 0)}")
    print(f"   PDFs downloaded: {results.get('pdfs_downloaded', 0)}")
    print(f"   PDFs parsed: {results.get('pdfs_parsed', 0)}")
    print(f"   Papers stored: {results.get('papers_stored', 0)}")
    print(f"   Processing time: {results.get('processing_time', 0):.1f}s")
    print(f"   Errors: {len(results.get('errors', []))}")
    
    if results.get('errors'):
        print("\nErrors encountered:")
        for error in results.get('errors', [])[:3]:  # Show first 3 errors
            print(f"   - {error}")
    
    if results.get('papers_fetched', 0) > 0:
        print("\n✓ Pipeline test successful!")
    else:
        print("\nNo papers fetched - may be arXiv API unavailability")
        
except Exception as e:
    print(f"✗ Pipeline error: {e}")

In [None]:
# Test Airflow DAGs
print("Test 7: Airflow DAG Status")
print("=" * 40)

print("  Airflow UI Access:")
print("   URL: http://localhost:8080")
print("   Username: admin")
print("   Password: admin")
print()

# Check DAG status using docker exec
try:
    result = subprocess.run(
        ["docker", "exec", "rag-airflow", "airflow", "dags", "list"],
        capture_output=True,
        text=True,
        timeout=10
    )
    
    if result.returncode == 0:
        lines = result.stdout.strip().split('\n')
        dag_lines = [line for line in lines if 'arxiv' in line.lower() or 'hello' in line.lower()]
        
        print("Available DAGs:")
        for line in dag_lines:
            if '|' in line:
                parts = [part.strip() for part in line.split('|')]
                if len(parts) >= 3:
                    dag_id = parts[0]
                    is_paused = parts[2]
                    status = "Active" if is_paused == "False" else "Paused"
                    print(f"   - {dag_id}: {status}")
        
        # Check for import errors
        error_result = subprocess.run(
            ["docker", "exec", "rag-airflow", "airflow", "dags", "list-import-errors"],
            capture_output=True,
            text=True,
            timeout=10
        )
        
        if "docling" in error_result.stderr:
            print("\nKnown Issue: Docling not installed in Airflow container")
            print("   - This is expected for Week 2")
            print("   - DAG structure is complete, runtime needs container fix")
            print("   - Solution: Add docling to Airflow container startup")
        elif error_result.returncode == 0:
            print("\n✓ No DAG import errors found")
        
    else:
        print(f"✗ Could not list DAGs: {result.stderr}")
        
except Exception as e:
    print(f"✗ Airflow test error: {e}")

print("\n  To view DAGs graphically:")
print("   1. Open http://localhost:8080 in your browser")
print("   2. Login with admin/admin")
print("   3. Click on 'arxiv_paper_ingestion' DAG to see the workflow")

In [None]:
# Test Complete Pipeline with PDF Processing
print("Test 8: Complete Pipeline with PDF Processing")
print("=" * 50)

# Reuse metadata fetcher from Test 6
print("✓ Using metadata fetcher service from previous test")

# Test with small batch including PDF processing
print("Running enhanced test (3 papers with PDF processing)...")

try:
    with database.get_session() as session:
        results = await metadata_fetcher.fetch_and_process_papers(
            max_results=3,  # Small batch
            from_date="20250813",  # Recent date
            to_date="20250814",
            process_pdfs=True,  
            store_to_db=True,
            db_session=session
        )
    
    print("\nENHANCED PIPELINE RESULTS:")
    print(f"   Papers fetched: {results.get('papers_fetched', 0)}")
    print(f"   PDFs downloaded: {results.get('pdfs_downloaded', 0)}")
    print(f"   PDFs parsed: {results.get('pdfs_parsed', 0)}")
    print(f"   Papers stored: {results.get('papers_stored', 0)}")
    print(f"   Processing time: {results.get('processing_time', 0):.1f}s")
    print(f"   Errors: {len(results.get('errors', []))}")
    
    # Show success rates
    if results.get('papers_fetched', 0) > 0:
        download_rate = (results['pdfs_downloaded'] / results['papers_fetched']) * 100
        parse_rate = (results['pdfs_parsed'] / results['pdfs_downloaded']) * 100 if results.get('pdfs_downloaded', 0) > 0 else 0
        print(f"   Download success rate: {download_rate:.1f}%")
        print(f"   Parse success rate: {parse_rate:.1f}%")
    
    if results.get('errors'):
        print("\nErrors encountered (showing graceful error handling):")
        for error in results.get('errors', [])[:3]:  # Show first 3 errors
            print(f"   - {error}")
    
    if results.get('papers_fetched', 0) > 0:
        print("\n✓ Enhanced pipeline test successful!")
        if results.get('errors'):
            print("✓ System continued processing despite PDF failures")
    else:
        print("\n! No papers fetched - may be arXiv API unavailability")
        
except Exception as e:
    print(f"✗ Pipeline error: {e}")