# Workflow 1: ArXiv Ingestion Pipeline - Local Testing

In [2]:
import sys
from pathlib import Path
import asyncio
from datetime import datetime
from typing import List, Dict, Any, Optional
import json
from dotenv import load_dotenv

load_dotenv()

project_root = Path.cwd()
sys.path.insert(0, str(project_root.parent))

from notebook_setup import *


from src.services.arxiv.client import ArxivClient
from src.services.arxiv.metadata_extractor import MetadataExtractor
from src.services.pdf_parser.factory import make_pdf_parser_service
from src.services.pdf_parser.docling_utils import (
    serialize_docling_document, deserialize_docling_document,
    extract_full_text, get_document_metadata
)
from src.database import get_sync_session
from src.models.paper import Paper
from src.config import get_settings


settings = get_settings()
workflow_data = {}
print("✓ Setup complete")

✅ Environment configured for host machine:
   - Database: postgresql://researchmind:password@localhost:5433/researchmind
   - Redis: redis://localhost:6379/0
   - Qdrant: localhost:6333
   - Neo4j: bolt://localhost:7687


  from .autonotebook import tqdm as notebook_tqdm


✓ Setup complete


## Step 1: Fetch ArXiv Papers

In [19]:
CATEGORIES = settings.arxiv_categories[:2]
MAX_RESULTS = 3

client = ArxivClient()
papers = []

for category in CATEGORIES:
    try:
        category_papers = await client.search_papers(
            query=f"cat:{category}",
            max_results=MAX_RESULTS,
            start=3100,
            sort_by="submittedDate",
            sort_order="ascending"

        )
        papers.extend(category_papers)
        print(f"✓ Fetched {len(category_papers)} papers from {category}")
    except Exception as e:
        print(f"✗ Error fetching {category}: {e}")

workflow_data['fetch_papers'] = papers
print(f"\nTotal: {len(papers)} papers")
if papers:
    print(f"Sample: {papers[0].get('title', 'N/A')[:80]}...")

papers

[ 2025-11-02 23:40:26,535 ] [researchmind] | Module: client |Function: _make_request | Line: 50 - INFO - Fetching arXiv data: http://export.arxiv.org/api/query?search_query=cat%3Acs.AI&start=3100&max_results=3&sortBy=submittedDate&sortOrder=ascending
[ 2025-11-02 23:40:26,896 ] [researchmind] | Module: client |Function: search_papers | Line: 178 - INFO - Found 3 papers for query: cat:cs.AI
✓ Fetched 3 papers from cs.AI

Total: 3 papers
Sample: Fuzzy Inference Systems Optimization...


[{'arxiv_id': '1110.3385v1',
  'title': 'Fuzzy Inference Systems Optimization',
  'abstract': 'This paper compares various optimization methods for fuzzy inference system optimization. The optimization methods compared are genetic algorithm, particle swarm optimization and simulated annealing. When these techniques were implemented it was observed that the performance of each technique within the fuzzy inference system classification was context dependent.',
  'authors': ['Pretesh Patel', 'Tshilidzi Marwala'],
  'categories': ['cs.AI'],
  'primary_category': 'cs.AI',
  'published': '2011-10-15T05:39:34Z',
  'updated': '2011-10-15T05:39:34Z',
  'pdf_url': 'http://arxiv.org/pdf/1110.3385v1',
  'arxiv_url': 'http://arxiv.org/abs/1110.3385v1',
  'doi': '',
  'journal_ref': ''},
 {'arxiv_id': '1110.3672v1',
  'title': 'Reasoning about Actions with Temporal Answer Sets',
  'abstract': 'In this paper we combine Answer Set Programming (ASP) with Dynamic Linear Time Temporal Logic (DLTL) to def

## Step 2: Parse PDFs

In [20]:
papers = workflow_data.get('fetch_papers', [])
download_dir = Path(settings.papers_storage_path)
download_dir.mkdir(exist_ok=True, parents=True)

arxiv_client = ArxivClient()
pdf_parser = make_pdf_parser_service()
parsed_papers = []

for i, paper in enumerate(papers, 1):
    try:
        arxiv_id = paper.get('arxiv_id')
        pdf_url = paper.get('pdf_url')
        
        if not arxiv_id or not pdf_url:
            parsed_papers.append(paper)
            continue
        
        print(f"[{i}/{len(papers)}] {arxiv_id}... ", end="")
        
        pdf_path = await arxiv_client.download_pdf(
            pdf_url=pdf_url,
            download_path=download_dir / f"{arxiv_id.replace('/', '_')}.pdf",
            max_file_size_mb=settings.pdf_parser_max_file_size_mb
        )
        
        if not pdf_path or not pdf_path.exists():
            print("download failed")
            parsed_papers.append(paper)
            continue
        
        docling_doc = await pdf_parser.parse_pdf(pdf_path)
        
        if docling_doc:
            paper_with_content = {**paper}
            paper_with_content['docling_document'] = serialize_docling_document(docling_doc)
            paper_with_content['docling_document_raw'] = docling_doc
            paper_with_content['_temp_full_text'] = extract_full_text(docling_doc)
            paper_with_content['is_processed'] = True
            
            doc_meta = get_document_metadata(docling_doc)
            print(f"✓ ({doc_meta.get('text_count', 0)} elements)")
            parsed_papers.append(paper_with_content)
        else:
            print("parse failed")
            parsed_papers.append(paper)
        
        pdf_path.unlink(missing_ok=True)
        
    except Exception as e:
        print(f"✗ {e}")
        parsed_papers.append(paper)

workflow_data['parse_pdfs'] = parsed_papers
print(f"\n✓ Parsed {sum(1 for p in parsed_papers if p.get('docling_document'))}/{len(parsed_papers)} papers")

[1/3] 1110.3385v1... [ 2025-11-02 23:40:33,219 ] [researchmind] | Module: client |Function: download_pdf | Line: 272 - INFO - Downloading PDF from: http://arxiv.org/pdf/1110.3385v1
[ 2025-11-02 23:40:34,016 ] [researchmind] | Module: client |Function: download_pdf | Line: 296 - INFO - Downloaded PDF (0.3MB) to: data/papers/1110.3385v1.pdf
[ 2025-11-02 23:40:34,022 ] [docling.datamodel.document] | Module: document |Function: _guess_format | Line: 328 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
[ 2025-11-02 23:40:34,065 ] [docling.document_converter] | Module: document_converter |Function: _convert | Line: 318 - INFO - Going to convert document batch...
[ 2025-11-02 23:40:34,066 ] [docling.document_converter] | Module: document_converter |Function: _get_pipeline | Line: 363 - INFO - Initializing pipeline for StandardPdfPipeline with options hash 4b58af54f11ef1b41b300238c4ddafc5
[ 2025-11-02 23:40:34,186 ] [docling.models.factories.base_factory] | Module: base_factory |Function:

## Step 3: Extract Metadata

In [21]:
papers = workflow_data.get('parse_pdfs', [])
extractor = MetadataExtractor()

for i, paper in enumerate(papers, 1):
    try:
        arxiv_id = paper.get('arxiv_id', 'unknown')
        print(f"[{i}/{len(papers)}] {arxiv_id}... ", end="")
        
        if paper.get('docling_document'):
            doc = deserialize_docling_document(paper['docling_document'])
            
            if not paper.get('_temp_full_text'):
                paper['_temp_full_text'] = extract_full_text(doc)
            
            paper['content'] = paper['_temp_full_text']
        
        metadata = await extractor.extract_metadata(paper)
        paper.update(metadata)
        
        paper.pop('_temp_full_text', None)
        paper.pop('content', None)
        
        print(f"✓ (words: {paper.get('word_count', 'N/A')})")
        
    except Exception as e:
        print(f"✗ {e}")

workflow_data['extract_metadata'] = papers
print(f"\n✓ Metadata extraction complete")

[1/3] 1110.3385v1... [ 2025-11-02 23:40:56,390 ] [researchmind] | Module: metadata_extractor |Function: extract_metadata | Line: 236 - INFO - Extracting metadata for paper: 1110.3385v1


[ 2025-11-02 23:41:01,992 ] [httpx] | Module: _client |Function: _send_single_request | Line: 1025 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
[ 2025-11-02 23:41:01,998 ] [researchmind] | Module: metadata_extractor |Function: extract_metadata | Line: 263 - INFO - Extracted metadata: 0 metrics, 2 authors
✓ (words: 8475)
[2/3] 1110.3672v1... [ 2025-11-02 23:41:02,015 ] [researchmind] | Module: metadata_extractor |Function: extract_metadata | Line: 236 - INFO - Extracting metadata for paper: 1110.3672v1
[ 2025-11-02 23:41:14,634 ] [httpx] | Module: _client |Function: _send_single_request | Line: 1025 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
[ 2025-11-02 23:41:14,637 ] [researchmind] | Module: metadata_extractor |Function: extract_metadata | Line: 263 - INFO - Extracted metadata: 0 metrics, 3 authors
✓ (words: 20386)
[3/3] 1110.3888v2... [ 2025-11-02 23:41:14,648 ] [researchmind] | Module: metadata

## Step 4: Persist to Database

In [22]:


def normalize_paper(data: Dict[str, Any]) -> Dict[str, Any]:
    """Normalize paper data for database."""
    def to_dt(s):
        if not s: return None
        try:
            if isinstance(s, str) and s.endswith("Z"):
                s = s.replace("Z", "+00:00")
            return datetime.fromisoformat(s) if isinstance(s, str) else s
        except: return None
    
    def sanitize(text):
        if not text or not isinstance(text, str): return text
        return ''.join(c for c in text if ord(c) >= 32 or c in '\n\t\r').strip() or None

    return {
        k: v for k, v in {
            "arxiv_id": data.get("arxiv_id"),
            "arxiv_url": sanitize(data.get("arxiv_url")) or "",
            "pdf_url": sanitize(data.get("pdf_url")) or "",
            "title": sanitize(data.get("title")) or "Untitled",
            "abstract": sanitize(data.get("abstract")) or "",
            "authors": data.get("authors") or [],
            "published_date": to_dt(data.get("published")) or datetime.now(),
            "updated_date": to_dt(data.get("updated")),
            "primary_category": sanitize(data.get("primary_category")) or "unknown",
            "categories": data.get("categories") or [],
            "is_processed": data.get("is_processed", False),
            "is_embedded": data.get("is_embedded", False),
            "docling_document": data.get("docling_document"),
            "word_count": data.get("word_count"),
            "metrics": data.get("metrics"),
        }.items() if v is not None
    }

papers = workflow_data.get('extract_metadata', [])
persisted, skipped = 0, 0

with get_sync_session() as session:
    for i, raw in enumerate(papers, 1):
        try:
            data = normalize_paper(raw)
            arxiv_id = data.get("arxiv_id")
            
            if not arxiv_id:
                skipped += 1
                continue
            
            existing = session.query(Paper).filter_by(arxiv_id=arxiv_id).first()
            
            if existing:
                print(f"[{i}] → {arxiv_id} (updating)")
                for key, value in data.items():
                    if hasattr(existing, key):
                        setattr(existing, key, value)
                skipped += 1
            else:
                print(f"[{i}] + {arxiv_id} (new)")
                filtered = {k: v for k, v in data.items() if hasattr(Paper, k) and v is not None}
                paper = Paper(**filtered)
                session.add(paper)
                persisted += 1
                
        except Exception as e:
            print(f"[{i}] ✗ {raw.get('arxiv_id', 'unknown')}: {e}")
            session.rollback()
            skipped += 1
    
    if persisted > 0:
        session.commit()

workflow_data['persist_db'] = {"persisted": persisted, "skipped": skipped}
print(f"\n✓ Complete: {persisted} new, {skipped} updated/skipped")

[1] + 1110.3385v1 (new)
[2] + 1110.3672v1 (new)
[3] + 1110.3888v2 (new)

✓ Complete: 3 new, 0 updated/skipped


# test addign referncesa

In [3]:
import asyncio
from src.services.arxiv.citation_extractor import CitationExtractor

test_ids = ["1110.3385v1", "1110.3672v1", "1110.3888v2"]

async def run_extraction(ids):
    extractor = CitationExtractor()
    try:
        results = []
        for aid in ids:
            res = await extractor.get_citations_and_references(aid)
            results.append(res)
            print(f"- {aid} | source={res.get('source')} | "
                  f"citations={res.get('citation_count')} | "
                  f"references={res.get('reference_count')} | "
                  f"s2_id={res.get('s2_paper_id')}")
        return results
    finally:
        await extractor.close()

results = await run_extraction(test_ids)
print("\nSample result keys:", results[0].keys() if results else [])

[ 2025-11-03 22:26:31,885 ] [httpx] | Module: _client |Function: _send_single_request | Line: 1740 - INFO - HTTP Request: GET https://api.semanticscholar.org/graph/v1/paper/arXiv:1110.3385?fields=paperId%2CcitationCount%2CreferenceCount%2CinfluentialCitationCount%2Ctitle%2CexternalIds "HTTP/1.1 200 OK"
[ 2025-11-03 22:26:32,211 ] [httpx] | Module: _client |Function: _send_single_request | Line: 1740 - INFO - HTTP Request: GET https://api.semanticscholar.org/graph/v1/paper/9023e74478f7299fcc4780aa21595dc5d9b76919/references?fields=citedPaper.title%2CcitedPaper.authors%2CcitedPaper.year%2CcitedPaper.externalIds%2CcitedPaper.paperId&limit=500&offset=0 "HTTP/1.1 200 OK"
[ 2025-11-03 22:26:32,261 ] [httpx] | Module: _client |Function: _send_single_request | Line: 1740 - INFO - HTTP Request: GET https://api.semanticscholar.org/graph/v1/paper/9023e74478f7299fcc4780aa21595dc5d9b76919/citations?fields=citingPaper.title%2CcitingPaper.authors%2CcitingPaper.year%2CcitingPaper.externalIds%2CcitingP

## Summary

In [7]:
print("=" * 60)
print("WORKFLOW 1 SUMMARY")
print("=" * 60)
print(f"Fetched:  {len(workflow_data.get('fetch_papers', []))} papers")
print(f"Parsed:   {sum(1 for p in workflow_data.get('parse_pdfs', []) if p.get('docling_document'))} papers")
print(f"Persisted: {workflow_data.get('persist_db', {}).get('persisted', 0)} new")
print(f"Updated:   {workflow_data.get('persist_db', {}).get('skipped', 0)} existing")
print("=" * 60)

WORKFLOW 1 SUMMARY
Fetched:  10 papers
Parsed:   10 papers
Persisted: 0 new
Updated:   10 existing
