In [53]:
# Setup and Installation
!pip install arxiv requests psutil

shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
The folder you are executing pip from can no longer be found.


In [54]:
from pathlib import Path
import os

BASE_DIR = Path("/content/arxiv_scraper")
BASE_DIR.mkdir(exist_ok=True)
os.chdir(BASE_DIR)

# Create subdirectories
for dir_name in ["lib", "data", "logs", "data/papers", "data/temp", "data/statistics"]:
    (BASE_DIR / dir_name).mkdir(parents=True, exist_ok=True)

print("Directory structure created")

Directory structure created


In [55]:
%%writefile config.py
from pathlib import Path

# Project paths
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / "data"
PAPERS_DIR = DATA_DIR / "papers"
TEMP_DIR = DATA_DIR / "temp"
LOGS_DIR = BASE_DIR / "logs"
STATS_DIR = DATA_DIR / "statistics"

# Create directories if they don't exist
for directory in [PAPERS_DIR, TEMP_DIR, LOGS_DIR, STATS_DIR]:
    directory.mkdir(parents=True, exist_ok=True)

# arXiv settings
ARXIV_ID_RANGES = [
    ("2312", 15844, 16343), # Take only 500 papers for report
    # ("2312", 15844, 99999),
    # ("2401", 0, 3095),
]

# API settings
SEMANTIC_SCHOLAR_API_BASE = "https://api.semanticscholar.org/graph/v1"
REQUEST_DELAY = 3  # Delay between reference API calls
MAX_RETRIES = 7
RETRY_DELAY = 3.0


# arXiv API settings
ARXIV_MAX_RESULTS = 100
ARXIV_DELAY = 3.0
ARXIV_BATCH_SIZE = 100

# Download settings
PAPER_DOWNLOAD_BATCH_SIZE = 5  # Concurrent papers to download (main parallel)
REFERENCE_BATCH_SIZE = 2  # Concurrent reference API calls

# Figure extensions to remove
FIGURE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.pdf', '.eps', '.ps', '.gif', '.svg', '.bmp']

# File size limits
MAX_FILE_SIZE = 100 * 1024 * 1024

# Stats settings
STATS_SAVE_INTERVAL = 10

# Cleanup settings
DELETE_RAW_AFTER_EXTRACT = True
DELETE_TEMP_AFTER_COPY = True




# Semantic Scholar fields
SEMANTIC_SCHOLAR_FIELDS = "references,references.title,references.authors,references.year,references.paperId,references.externalIds"
SEMANTIC_KEY_API = "FkKyqq5p9R96KizMlPWBwa0YtPjCvyq24ddno82O"

Writing config.py


In [56]:
%%writefile lib/statistics_tracker.py
import psutil
import time
import json
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import logging
import config

logger = logging.getLogger(__name__)


class StatisticsTracker:
    """Track and report statistics for the scraping process."""

    def __init__(self):
        self.stats = {
            'start_time': None,
            'end_time': None,
            'total_duration': 0,

            # Paper statistics
            'papers': {
                'total_identified': 0,
                'total_attempted': 0,
                'successfully_downloaded': 0,
                'failed_downloads': 0,
                'download_success_rate': 0,
            },

            # Version statistics
            'versions': {
                'total_versions': 0,
                'successfully_processed': 0,
                'failed_versions': 0,
            },

            # File size statistics
            'file_sizes': {
                'raw_sizes': [],  # Size before removing figures
                'extracted_sizes': [],  # Size after removing figures
                'total_raw_size': 0,
                'total_extracted_size': 0,
                'average_raw_size': 0,
                'average_extracted_size': 0,
                'space_saved_by_figure_removal': 0,
                'figures_removed': 0,
            },

            # Reference statistics
            'references': {
                'papers_with_references': 0,
                'papers_without_references': 0,
                'total_references': 0,
                'average_references_per_paper': 0,
                'reference_success_rate': 0,
            },

            # Performance metrics
            'performance': {
                'phase_times': {},  # Time for each phase
                'paper_processing_times': [],
                'average_time_per_paper': 0,
                'fastest_paper_time': float('inf'),
                'slowest_paper_time': 0,
            },

            # Memory statistics
            'memory': {
                'initial_ram_mb': 0,
                'peak_ram_mb': 0,
                'average_ram_mb': 0,
                'ram_samples': [],
                'initial_disk_gb': 0,
                'final_disk_gb': 0,
                'disk_space_used_gb': 0,
            },

            # Errors
            'errors': {
                'total_errors': 0,
                'error_types': {},
            }
        }

        self.process = psutil.Process()
        self.current_phase = None
        self.phase_start_time = None
        self.papers_processed_since_save = 0
        self.save_interval = config.STATS_SAVE_INTERVAL

    def start(self):
        """Start tracking."""
        self.stats['start_time'] = datetime.now()
        self.stats['memory']['initial_ram_mb'] = self.process.memory_info().rss / (1024 * 1024)
        self.stats['memory']['initial_disk_gb'] = self._get_disk_usage()
        logger.info("Statistics tracking started")

        # Try to load existing stats if resuming
        self.load_checkpoint()

    def end(self):
        """End tracking and calculate final statistics."""
        self.stats['end_time'] = datetime.now()
        self.stats['total_duration'] = (self.stats['end_time'] - self.stats['start_time']).total_seconds()

        # Calculate final memory stats
        self.stats['memory']['final_disk_gb'] = self._get_disk_usage()
        self.stats['memory']['disk_space_used_gb'] = (
            self.stats['memory']['final_disk_gb'] - self.stats['memory']['initial_disk_gb']
        )

        if self.stats['memory']['ram_samples']:
            self.stats['memory']['average_ram_mb'] = sum(self.stats['memory']['ram_samples']) / len(self.stats['memory']['ram_samples'])

        # Calculate success rates
        if self.stats['papers']['total_attempted'] > 0:
            self.stats['papers']['download_success_rate'] = (
                self.stats['papers']['successfully_downloaded'] / self.stats['papers']['total_attempted']
            ) * 100

        # Calculate reference success rate
        total_ref_attempts = (
            self.stats['references']['papers_with_references'] +
            self.stats['references']['papers_without_references']
        )
        if total_ref_attempts > 0:
            self.stats['references']['reference_success_rate'] = (
                self.stats['references']['papers_with_references'] / total_ref_attempts
            ) * 100

        # Calculate average references per paper
        if self.stats['references']['papers_with_references'] > 0:
            self.stats['references']['average_references_per_paper'] = (
                self.stats['references']['total_references'] /
                self.stats['references']['papers_with_references']
            )

        # Calculate file size averages
        if self.stats['file_sizes']['raw_sizes']:
            self.stats['file_sizes']['average_raw_size'] = (
                sum(self.stats['file_sizes']['raw_sizes']) / len(self.stats['file_sizes']['raw_sizes'])
            )
            self.stats['file_sizes']['total_raw_size'] = sum(self.stats['file_sizes']['raw_sizes'])

        if self.stats['file_sizes']['extracted_sizes']:
            self.stats['file_sizes']['average_extracted_size'] = (
                sum(self.stats['file_sizes']['extracted_sizes']) / len(self.stats['file_sizes']['extracted_sizes'])
            )
            self.stats['file_sizes']['total_extracted_size'] = sum(self.stats['file_sizes']['extracted_sizes'])

        self.stats['file_sizes']['space_saved_by_figure_removal'] = (
            self.stats['file_sizes']['total_raw_size'] - self.stats['file_sizes']['total_extracted_size']
        )

        # Calculate average time per paper
        if self.stats['performance']['paper_processing_times']:
            self.stats['performance']['average_time_per_paper'] = (
                sum(self.stats['performance']['paper_processing_times']) /
                len(self.stats['performance']['paper_processing_times'])
            )
            self.stats['performance']['fastest_paper_time'] = min(self.stats['performance']['paper_processing_times'])
            self.stats['performance']['slowest_paper_time'] = max(self.stats['performance']['paper_processing_times'])

        logger.info("Statistics tracking completed")

    def start_phase(self, phase_name: str):
        """Start tracking a processing phase."""
        self.current_phase = phase_name
        self.phase_start_time = time.time()
        logger.info(f"Started phase: {phase_name}")

    def end_phase(self):
        """End tracking current phase."""
        if self.current_phase and self.phase_start_time:
            duration = time.time() - self.phase_start_time
            self.stats['performance']['phase_times'][self.current_phase] = duration
            logger.info(f"Completed phase '{self.current_phase}' in {duration:.2f}s")
            self.current_phase = None
            self.phase_start_time = None

    def record_paper_identified(self):
        """Record a paper was identified."""
        self.stats['papers']['total_identified'] += 1

    def record_paper_attempt(self):
        """Record a paper download attempt."""
        self.stats['papers']['total_attempted'] += 1

    def record_paper_success(self, processing_time: float):
        """Record successful paper processing."""
        self.stats['papers']['successfully_downloaded'] += 1
        self.stats['performance']['paper_processing_times'].append(processing_time)

    def record_paper_failure(self):
        """Record failed paper processing."""
        self.stats['papers']['failed_downloads'] += 1

    def record_version(self, success: bool = True):
        """Record version processing."""
        self.stats['versions']['total_versions'] += 1
        if success:
            self.stats['versions']['successfully_processed'] += 1
        else:
            self.stats['versions']['failed_versions'] += 1

    def record_file_sizes(self, raw_size: int, extracted_size: int, figures_removed: int = 0):
        """Record file sizes before and after figure removal."""
        self.stats['file_sizes']['raw_sizes'].append(raw_size)
        self.stats['file_sizes']['extracted_sizes'].append(extracted_size)
        self.stats['file_sizes']['figures_removed'] += figures_removed

    def record_references(self, reference_count: int):
        """Record reference extraction."""
        if reference_count > 0:
            self.stats['references']['papers_with_references'] += 1
            self.stats['references']['total_references'] += reference_count
        else:
            self.stats['references']['papers_without_references'] += 1

    def record_error(self, error_type: str):
        """Record an error."""
        self.stats['errors']['total_errors'] += 1
        if error_type not in self.stats['errors']['error_types']:
            self.stats['errors']['error_types'][error_type] = 0
        self.stats['errors']['error_types'][error_type] += 1

    def sample_memory(self):
        """Sample current memory usage."""
        current_ram_mb = self.process.memory_info().rss / (1024 * 1024)
        self.stats['memory']['ram_samples'].append(current_ram_mb)

        if current_ram_mb > self.stats['memory']['peak_ram_mb']:
            self.stats['memory']['peak_ram_mb'] = current_ram_mb

    def _get_disk_usage(self) -> float:
        """Get current disk usage of data directory in GB."""
        total_size = 0
        try:
            for path in config.DATA_DIR.rglob('*'):
                if path.is_file():
                    total_size += path.stat().st_size
        except Exception as e:
            logger.warning(f"Error calculating disk usage: {e}")
        return total_size / (1024 ** 3)  # Convert to GB

    def get_summary(self) -> Dict[str, Any]:
        """Get statistics summary."""
        return self.stats

    def save_to_file(self, output_path: Optional[Path] = None):
        """Save statistics to JSON file."""
        if output_path is None:
            output_path = config.DATA_DIR / 'statistics.json'

        # Convert datetime objects to strings
        stats_copy = self._serialize_stats(self.stats)

        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(stats_copy, f, indent=2)

        logger.info(f"Statistics saved to {output_path}")

    def save_checkpoint(self):
        """Save incremental checkpoint."""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        checkpoint_file = config.STATS_DIR / f'checkpoint_{timestamp}.json'

        # Also save to latest checkpoint
        latest_file = config.STATS_DIR / 'checkpoint_latest.json'

        stats_copy = self._serialize_stats(self.stats)

        for file_path in [checkpoint_file, latest_file]:
            try:
                with open(file_path, 'w', encoding='utf-8') as f:
                    json.dump(stats_copy, f, indent=2)
            except Exception as e:
                logger.error(f"Error saving checkpoint to {file_path}: {e}")

        logger.info(f"Checkpoint saved to {checkpoint_file}")
        self.papers_processed_since_save = 0

    def load_checkpoint(self):
        """Load latest checkpoint if exists."""
        latest_file = config.STATS_DIR / 'checkpoint_latest.json'

        if latest_file.exists():
            try:
                with open(latest_file, 'r', encoding='utf-8') as f:
                    loaded_stats = json.load(f)

                # Merge loaded stats with current stats
                # Convert string dates back to datetime
                if 'start_time' in loaded_stats and loaded_stats['start_time']:
                    self.stats['start_time'] = datetime.fromisoformat(loaded_stats['start_time'])

                # Merge other stats
                for key in loaded_stats:
                    if key not in ['start_time', 'end_time']:
                        if isinstance(loaded_stats[key], dict):
                            self.stats[key].update(loaded_stats[key])
                        else:
                            self.stats[key] = loaded_stats[key]

                logger.info(f"Resumed from checkpoint: {latest_file}")
            except Exception as e:
                logger.warning(f"Could not load checkpoint: {e}")

    def maybe_save_checkpoint(self):
        """Save checkpoint if interval reached."""
        self.papers_processed_since_save += 1

        if self.papers_processed_since_save >= self.save_interval:
            self.save_checkpoint()

    def _serialize_stats(self, obj):
        """Serialize datetime objects for JSON."""
        if isinstance(obj, datetime):
            return obj.isoformat()
        elif isinstance(obj, dict):
            return {k: self._serialize_stats(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._serialize_stats(item) for item in obj]
        return obj

    def print_summary(self):
        """Print formatted statistics summary."""
        print("\n" + "=" * 80)
        print("SCRAPING STATISTICS REPORT")
        print("=" * 80)

        # Time information
        print(f"\nTIME INFORMATION:")
        print(f"  Start Time: {self.stats['start_time']}")
        print(f"  End Time: {self.stats['end_time']}")
        print(f"  Total Duration: {timedelta(seconds=int(self.stats['total_duration']))}")

        # Phase times
        print(f"\nPHASE DURATIONS:")
        for phase, duration in self.stats['performance']['phase_times'].items():
            print(f"  {phase}: {timedelta(seconds=int(duration))}")

        # Paper statistics
        print(f"\nPAPER STATISTICS:")
        print(f"  Total Identified: {self.stats['papers']['total_identified']}")
        print(f"  Total Attempted: {self.stats['papers']['total_attempted']}")
        print(f"  Successfully Downloaded: {self.stats['papers']['successfully_downloaded']}")
        print(f"  Failed Downloads: {self.stats['papers']['failed_downloads']}")
        print(f"  Success Rate: {self.stats['papers']['download_success_rate']:.2f}%")

        # Version statistics
        print(f"\nVERSION STATISTICS:")
        print(f"  Total Versions: {self.stats['versions']['total_versions']}")
        print(f"  Successfully Processed: {self.stats['versions']['successfully_processed']}")
        print(f"  Failed Versions: {self.stats['versions']['failed_versions']}")

        # Performance
        print(f"\nPERFORMANCE METRICS:")
        if self.stats['performance']['paper_processing_times']:
            print(f"  Average Time per Paper: {self.stats['performance']['average_time_per_paper']:.2f}s")
            print(f"  Fastest Paper: {self.stats['performance']['fastest_paper_time']:.2f}s")
            print(f"  Slowest Paper: {self.stats['performance']['slowest_paper_time']:.2f}s")

        # File sizes
        print(f"\nFILE SIZE STATISTICS:")
        print(f"  Average Size Before Figure Removal: {self._format_bytes(self.stats['file_sizes']['average_raw_size'])}")
        print(f"  Average Size After Figure Removal: {self._format_bytes(self.stats['file_sizes']['average_extracted_size'])}")
        print(f"  Total Raw Size: {self._format_bytes(self.stats['file_sizes']['total_raw_size'])}")
        print(f"  Total Extracted Size: {self._format_bytes(self.stats['file_sizes']['total_extracted_size'])}")
        print(f"  Space Saved: {self._format_bytes(self.stats['file_sizes']['space_saved_by_figure_removal'])}")
        print(f"  Figures Removed: {self.stats['file_sizes']['figures_removed']}")

        # References
        print(f"\nREFERENCE STATISTICS:")
        print(f"  Papers with References: {self.stats['references']['papers_with_references']}")
        print(f"  Papers without References: {self.stats['references']['papers_without_references']}")
        print(f"  Total References: {self.stats['references']['total_references']}")
        print(f"  Average References per Paper: {self.stats['references']['average_references_per_paper']:.2f}")
        print(f"  Reference Success Rate: {self.stats['references']['reference_success_rate']:.2f}%")

        # Memory
        print(f"\nMEMORY USAGE:")
        print(f"  Initial RAM: {self.stats['memory']['initial_ram_mb']:.2f} MB")
        print(f"  Peak RAM: {self.stats['memory']['peak_ram_mb']:.2f} MB")
        print(f"  Average RAM: {self.stats['memory']['average_ram_mb']:.2f} MB")
        print(f"  Initial Disk Usage: {self.stats['memory']['initial_disk_gb']:.2f} GB")
        print(f"  Final Disk Usage: {self.stats['memory']['final_disk_gb']:.2f} GB")
        print(f"  Disk Space Used: {self.stats['memory']['disk_space_used_gb']:.2f} GB")

        # Errors
        if self.stats['errors']['total_errors'] > 0:
            print(f"\nERROR STATISTICS:")
            print(f"  Total Errors: {self.stats['errors']['total_errors']}")
            print(f"  Error Types:")
            for error_type, count in self.stats['errors']['error_types'].items():
                print(f"    {error_type}: {count}")

        print("\n" + "=" * 80)

    def _format_bytes(self, bytes_value: float) -> str:
        """Format bytes to human readable format."""
        for unit in ['B', 'KB', 'MB', 'GB']:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0
        return f"{bytes_value:.2f} TB"

Writing lib/statistics_tracker.py


In [57]:
%%writefile lib/paper_organizer.py
import logging
import json
import shutil
from pathlib import Path
from typing import Dict
from datetime import datetime
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
import config

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(config.LOGS_DIR / 'organizer.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class PaperOrganizer:
    """Organize papers in the new required structure."""

    def __init__(self):
        self.papers_dir = config.PAPERS_DIR
        self.temp_dir = config.TEMP_DIR

    def format_arxiv_id(self, arxiv_id: str) -> str:
        """Convert arxiv ID '2312.12345' -> '2312-12345'"""
        return arxiv_id.replace('.', '-')

    def get_paper_dir(self, arxiv_id: str) -> Path:
        """Get the directory for a paper."""
        formatted_id = self.format_arxiv_id(arxiv_id)
        return self.papers_dir / formatted_id

    def create_paper_structure(self, arxiv_id: str) -> Dict[str, Path]:
        """Create the directory structure for a paper."""
        paper_dir = self.get_paper_dir(arxiv_id)
        tex_dir = paper_dir / "tex"

        paper_dir.mkdir(parents=True, exist_ok=True)
        tex_dir.mkdir(exist_ok=True)

        return {
            'paper_dir': paper_dir,
            'tex_dir': tex_dir,
            'metadata_file': paper_dir / 'metadata.json',
            'references_file': paper_dir / 'references.json',
        }

    def save_metadata(self, arxiv_id: str, paper_data: Dict):
        """Save paper metadata in ISO format."""
        paths = self.create_paper_structure(arxiv_id)

        # format dates to ISO format
        published = paper_data.get('published', '')
        updated = paper_data.get('updated', '')

        # parse and convert to ISO format
        try:
            if isinstance(published, str):
                pub_date = datetime.fromisoformat(published.replace('Z', '+00:00'))
                published_iso = pub_date.date().isoformat()
            else:
                published_iso = str(published)
        except:
            published_iso = str(published)

        # get all revision dates from version_dates if available
        revised_dates = []

        metadata = {
            'title': paper_data.get('title', 'Unknown'),
            'authors': paper_data.get('authors', []),
            'submission_date': published_iso,
            'revised_dates': revised_dates,
            'categories': paper_data.get('categories', []),
            'abstract': paper_data.get('abstract', ''),
            'arxiv_id': arxiv_id,
            'versions': paper_data.get('versions', []),
            'comments': paper_data.get('comments', ''),
            'license': paper_data.get('license', ''),
        }

        with open(paths['metadata_file'], 'w', encoding='utf-8') as f:
            json.dump(metadata, f, indent=2, ensure_ascii=False)

        logger.info(f"Saved metadata for {arxiv_id}")

    def save_references(self, arxiv_id: str, ref_data: Dict):
        """
        Save references in required format with yyyymm-id keys.

        Expected format:
        {
            "1706-03762": {
                "paper_title": "...",
                "authors": [...],
                "submission_date": "2017-06-12",
                "semantic_scholar_id": "..."
            }
        }
        """
        paths = self.create_paper_structure(arxiv_id)

        references = {}

        if 'references' in ref_data:
            for ref in ref_data['references']:
                # try to extract arXiv ID from externalIds
                ref_arxiv_id = None

                if ref.get('externalIds'):
                    external_ids = ref['externalIds']
                    if 'ArXiv' in external_ids:
                        ref_arxiv_id = external_ids['ArXiv']

                # only include references that have arXiv IDs
                # otherwise skip
                if ref_arxiv_id:
                    formatted_ref_id = self.format_arxiv_id(ref_arxiv_id)

                    # extract authors, only get names
                    authors = []
                    if ref.get('authors'):
                        authors = [author.get('name', 'Unknown') for author in ref['authors']]

                    # parse submission date from year
                    submission_date = ''
                    if ref.get('year'):
                        submission_date = f"{ref['year']}-01-01"

                    references[formatted_ref_id] = {
                        'paper_title': ref.get('title', 'Unknown'),
                        'authors': authors,
                        'submission_date': submission_date,
                        'semantic_scholar_id': ref.get('paperId', '')
                    }

        with open(paths['references_file'], 'w', encoding='utf-8') as f:
            json.dump(references, f, indent=2, ensure_ascii=False)

        logger.info(f"Saved {len(references)} arXiv references for {arxiv_id}")

    def copy_source_files(self, arxiv_id: str, version: str, source_dir: Path):
        """
        Copy .tex files to tex folder and .bib files to paper folder.
        Delete temp directory after copying.
        """
        paths = self.create_paper_structure(arxiv_id)
        tex_dir = paths['tex_dir']
        paper_dir = paths['paper_dir']

        if not source_dir.exists():
            logger.warning(f"Source directory does not exist: {source_dir}")
            return

        tex_count = 0
        bib_count = 0

        for file_path in source_dir.rglob('*'):
            if file_path.is_file():
                file_ext = file_path.suffix.lower()

                if file_ext == '.tex':
                    # Copy .tex files to tex folder, maintaining structure
                    rel_path = file_path.relative_to(source_dir)
                    dest_path = tex_dir / version / rel_path
                    dest_path.parent.mkdir(parents=True, exist_ok=True)

                    try:
                        shutil.copy2(file_path, dest_path)
                        tex_count += 1
                    except Exception as e:
                        logger.debug(f"Could not copy {file_path.name}: {e}")

                elif file_ext == '.bib':
                    dest_path = tex_dir / version / file_path.name
                    dest_path.parent.mkdir(parents=True, exist_ok=True)

                    try:
                        shutil.copy2(file_path, dest_path)
                        bib_count += 1
                    except Exception as e:
                        logger.debug(f"Could not copy {file_path.name}: {e}")

        logger.info(f"Copied {tex_count} .tex files and {bib_count} .bib files for {arxiv_id}")

        # Delete temp directory if configured
        if config.DELETE_TEMP_AFTER_COPY:
            try:
                shutil.rmtree(source_dir)
                logger.debug(f"Deleted temp directory: {source_dir}")
            except Exception as e:
                logger.debug(f"Could not delete temp directory: {e}")


    def is_paper_complete(self, arxiv_id: str) -> bool:
        """Check if a paper has been fully processed."""
        paths = self.create_paper_structure(arxiv_id)

        # check if all required files exist
        has_metadata = paths['metadata_file'].exists()
        has_tex = paths['tex_dir'].exists() and any(paths['tex_dir'].rglob('*.tex'))
        has_references = paths['references_file'].exists()

        return has_metadata and has_tex and has_references

Writing lib/paper_organizer.py


In [58]:
%%writefile lib/source_identifier.py
import logging
import time
import arxiv
from typing import List, Dict, Optional
import json
import config

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(config.LOGS_DIR / 'arxiv_harvester.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class ArxivHarvester:
    """Harvest metadata from arXiv using the arxiv Python API."""

    def __init__(self, id_ranges: List[tuple], checkpoint_interval: int = 10):
        """
        Initialize the harvester.
        ----------
        Params:
            id_ranges: List of (YYMM, start_id, end_id) tuples
                      e.g., [("2312", 15844, 99999), ("2401", 0, 3095)]
            checkpoint_interval: Save checkpoint every N batches
        """
        self.id_ranges = id_ranges
        self.checkpoint_interval = checkpoint_interval
        self.client = arxiv.Client(
            page_size=config.ARXIV_MAX_RESULTS,
            delay_seconds=config.ARXIV_DELAY,
            num_retries=config.MAX_RETRIES
        )
        self.checkpoint_file = config.DATA_DIR / 'harvest_checkpoint.json'
        self.failed_batches_file = config.DATA_DIR / 'failed_batches.json'

    def generate_arxiv_ids(self, yymm: str, start_id: int, end_id: int) -> List[str]:
        """
        Generate arXiv IDs for a given range.
        ----------
        Params:
            yymm: Year-month string (e.g., "2312")
            start_id: Starting ID number
            end_id: Ending ID number

        Returns:
            List of arXiv IDs (e.g., ["2312.15844", "2312.15845", ...])
        """
        return [f"{yymm}.{str(i).zfill(5)}" for i in range(start_id, end_id + 1)]

    def fetch_paper_batch(self, arxiv_ids: List[str], yymm: str) -> tuple[List[Dict], Optional[str]]:
        """
        Fetch metadata for a batch of arXiv IDs.
        ----------
        Params:
            arxiv_ids: List of arXiv IDs
            yymm: Year-month string for error tracking

        Returns:
            Tuple of (list of paper metadata dictionaries, error_type if failed)
        """
        papers = []
        error_type = None

        try:
            search = arxiv.Search(id_list=arxiv_ids)
            results = self.client.results(search)

            for paper in results:
                try:
                    paper_data = self._parse_paper(paper)
                    if paper_data:
                        papers.append(paper_data)
                except Exception as e:
                    logger.error(f"Error parsing paper: {e}")
                    continue

        except Exception as e:
            error_msg = str(e).lower()
            if '429' in error_msg or 'rate limit' in error_msg or 'too many requests' in error_msg:
                error_type = '429_rate_limit'
                logger.error(f"Rate limit error (429): {e}")
            else:
                error_type = 'other_error'
                logger.error(f"Error fetching batch: {e}")

        return papers, error_type

    def _parse_paper(self, paper: arxiv.Result) -> Optional[Dict]:
        """
        Parse an arxiv.Result object into our metadata format.
        ----------
        Params:
            paper: arxiv.Result object

        Returns:
            Dictionary with paper metadata
        """
        try:
            # Extract base arXiv ID (without version)
            arxiv_id = paper.entry_id.split('/')[-1].split('v')[0]

            # Get version from entry_id - the API returns the latest version
            entry_with_version = paper.entry_id.split('/')[-1]
            if 'v' in entry_with_version:
                latest_version = int(entry_with_version.split('v')[-1])
                versions = [f"{arxiv_id}v{i}" for i in range(1, latest_version + 1)]
            else:
                versions = [f"{arxiv_id}v1"]

            # Format dates
            published = paper.published.date().isoformat()
            updated = paper.updated.date().isoformat()

            # Extract authors
            authors = [author.name for author in paper.authors]

            # Extract categories
            categories = [paper.primary_category] + paper.categories
            categories = list(set(categories))  # Remove duplicates

            paper_data = {
                'arxiv_id': arxiv_id,
                'title': paper.title.strip(),
                'authors': authors,
                'categories': categories,
                'abstract': paper.summary.strip().replace('\n', ' '),
                'published': published,
                'updated': updated,
                'comments': paper.comment if paper.comment else '',
                'license': paper.journal_ref if paper.journal_ref else '',
                'versions': versions,
                'pdf_url': paper.pdf_url,
                'entry_id': paper.entry_id
            }

            return paper_data

        except Exception as e:
            logger.error(f"Error parsing paper: {e}")
            return None

    def harvest_all_papers(self) -> List[Dict]:
        """
        Harvest all papers across the specified ID ranges and Date ranges.

        Returns:
            List of paper metadata dictionaries
        """

        all_papers = []
        processed_ranges = {}

        # Process each range
        for yymm, start_id, end_id in self.id_ranges:
            range_key = f"{yymm}_{start_id}_{end_id}"

            logger.info(f"Processing range: {yymm}.{start_id} to {yymm}.{end_id}")

            # Generate all IDs
            all_ids = self.generate_arxiv_ids(yymm, start_id, end_id)
            logger.info(f"Generated {len(all_ids)} potential IDs")

            # Split into batches
            batch_size = config.ARXIV_BATCH_SIZE
            batches = [all_ids[i:min(i + batch_size, len(all_ids))] for i in range(0, len(all_ids), batch_size)]

            # Resume from last batch if exists
            start_batch = 0

            papers_in_range = []
            consecutive_no_paper_count = 0

            # Process batches sequentially (to respect rate limits)
            for i in range(start_batch, len(batches)):
                batch = batches[i]
                logger.info(f"Batch {i+1}/{len(batches)}: fetching {len(batch)} IDs")

                papers, error = self.fetch_paper_batch(batch, yymm)

                if papers:
                    papers_in_range.extend(papers)
                    all_papers.extend(papers)
                    consecutive_no_paper_count = 0
                    logger.info(f"Found {len(papers)} papers (range: {len(papers_in_range)}, total: {len(all_papers)})")
                else:
                    consecutive_no_paper_count += 1
                    logger.info(f"No papers found in this batch (consecutive: {consecutive_no_paper_count})")

                    # If too many consecutive empty batches, assume no more papers
                    if consecutive_no_paper_count >= 2:
                        logger.info("No more papers found in recent batches, stopping early for this range.")
                        break

                # Rate limiting
                time.sleep(config.ARXIV_DELAY)

            # Mark range as complete
            logger.info(f"Completed {yymm}: found {len(papers_in_range)} papers\n")
            processed_ranges[range_key] = True

        logger.info(f"="*80)
        logger.info(f"Harvesting complete! Total papers: {len(all_papers)}")
        logger.info(f"="*80)

        return all_papers

Writing lib/source_identifier.py


In [59]:
%%writefile lib/reference_extractor.py
import requests
import logging
import json
import time
from pathlib import Path
from typing import Dict, Optional
import config

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(config.LOGS_DIR / 'references.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class ReferenceExtractor:
    """Extract reference information using Semantic Scholar API."""

    def __init__(self, stats_tracker=None):
        self.api_base = config.SEMANTIC_SCHOLAR_API_BASE
        self.max_retries = config.MAX_RETRIES
        self.retry_delay = config.RETRY_DELAY
        self.stats = stats_tracker

    def fetch_references(self, arxiv_id: str) -> Optional[Dict]:
        """
        Fetch reference information for a paper from Semantic Scholar.
        --------
        Params:
            arxiv_id: arXiv identifier of the paper

        Returns:
            Dictionary with reference data or None if not found/error
        """
        url = f"{self.api_base}/paper/arXiv:{arxiv_id}"
        # add key to headers
        headers = {
            'x-api-key': config.SEMANTIC_KEY_API
        }
        params = {
            'fields': config.SEMANTIC_SCHOLAR_FIELDS
        }

        for attempt in range(self.max_retries):
            try:
                logger.info(f"Fetching references for arXiv:{arxiv_id} (attempt {attempt + 1})")

                response = requests.get(
                    url,
                    headers=headers,
                    params=params,
                    timeout=30
                )

                if response.status_code == 200:
                    data = response.json()
                    logger.info(f"Successfully fetched references for {arxiv_id}")
                    return data

                elif response.status_code == 404:
                    logger.warning(f"Paper not found in Semantic Scholar: {arxiv_id}")
                    if self.stats:
                        self.stats.record_error("Reference not found (404)")
                    return None

                elif response.status_code == 429:
                    wait_time = self.retry_delay
                    logger.warning(f"Rate limited. Waiting {wait_time}s...")
                    time.sleep(wait_time)
                    continue

                else:
                    logger.error(f"Error {response.status_code}: {response.text[:200]}")
                    if attempt < self.max_retries - 1:
                        time.sleep(self.retry_delay)
                        continue
                    if self.stats:
                        self.stats.record_error(f"HTTP {response.status_code}")
                    return None

            except requests.exceptions.RequestException as e:
                logger.error(f"Request error for {arxiv_id}: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
                    continue
                if self.stats:
                    self.stats.record_error(f"Request error: {type(e).__name__}")
                return None

        return None

Writing lib/reference_extractor.py


In [60]:
%%writefile lib/source_downloader.py
import tarfile
import gzip
import logging
import shutil
from pathlib import Path
from typing import List, Optional, Tuple
import config
import requests

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(config.LOGS_DIR / 'downloader.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class SourceDownloader:
    """Download and extract arXiv paper sources with figure removal."""

    def __init__(self, stats_tracker=None, paper_organizer=None):
        self.temp_dir = config.TEMP_DIR
        self.figure_extensions = config.FIGURE_EXTENSIONS
        self.stats = stats_tracker
        self.organizer = paper_organizer

    def download_paper_source(self, arxiv_id: str, version: str) -> Optional[Path]:
        """
        Download source files for a specific paper version.
        --------
        Params:
            arxiv_id: arXiv identifier (e.g., "2312.15844")
            version: Version string (e.g., "2312.15844v1")

        Returns:
            Path to the downloaded .tar.gz file, or None on failure.
        """
        try:
            # Create temporary directory for this version
            version_temp_dir = self.temp_dir / arxiv_id / version
            tar_path = version_temp_dir / f"{version}.tar.gz"

            logger.info(f"Downloading source for {version}")
            version_temp_dir.mkdir(parents=True, exist_ok=True)

            # Download source for the specific version
            # Construct the source URL manually for the specific version
            source_url = f"https://arxiv.org/e-print/{version}"

            try:
                response = requests.get(source_url, timeout=30)
                response.raise_for_status()

                # Save the downloaded content
                with open(tar_path, 'wb') as f:
                    f.write(response.content)

                logger.info(f"Downloaded {version}")
                return tar_path

            except requests.exceptions.RequestException as e:
                logger.error(f"Error downloading from {source_url}: {e}")
                if self.stats:
                    self.stats.record_error(f"Download request error: {type(e).__name__}")
                return None

        except StopIteration:
            logger.warning(f"Paper {version} not found in arXiv")
            if self.stats:
                self.stats.record_error("Paper not found")
            return None
        except Exception as e:
            logger.error(f"Error downloading {version}: {e}")
            if self.stats:
                self.stats.record_error(f"Download error: {type(e).__name__}")
            return None

    def get_directory_size(self, directory: Path) -> int:
        """
        Get total size of directory in bytes.
        --------
        Params:
            directory: Path to the directory

        Returns:
            Total size in bytes
        """
        total_size = 0
        try:
            for path in directory.rglob('*'):
                if path.is_file():
                    total_size += path.stat().st_size
        except Exception as e:
            logger.error(f"Error calculating directory size: {e}")
        return total_size

    def remove_figures_from_directory(self, directory: Path) -> int:
        """
        Remove figure files from a directory.
        --------
        Params:
            directory: Path to the directory

        Returns:
            Number of figure files removed
        """
        removed_count = 0

        try:
            for file_path in directory.rglob('*'):
                if file_path.is_file():
                    if any(file_path.suffix.lower() == ext for ext in self.figure_extensions):
                        try:
                            file_path.unlink()
                            removed_count += 1
                            logger.debug(f"Removed figure: {file_path.name}")
                        except Exception as e:
                            logger.debug(f"Could not remove {file_path.name}: {e}")

            if removed_count > 0:
                logger.info(f"Removed {removed_count} figure files from {directory.name}")

        except Exception as e:
            logger.error(f"Error removing figures from {directory}: {e}")
            if self.stats:
                self.stats.record_error(f"Figure removal error: {type(e).__name__}")

        return removed_count

    def extract_nested_archives(self, directory: Path, depth: int = 0, max_depth: int = 3):
        """
        Recursively extract nested tar/tar.gz files.
        --------
        Params:
            directory: Directory to search for archives
            depth: Current recursion depth
            max_depth: Maximum recursion depth to prevent infinite loops

        Returns:
            None
        """
        if depth >= max_depth:
            logger.warning(f"Maximum extraction depth ({max_depth}) reached")
            return

        archive_extensions = ['.tar', '.tar.gz', '.tgz', '.gz']

        for file_path in list(directory.rglob('*')):
            if not file_path.is_file():
                continue

            # Check if it's an archive
            is_archive = any(
                str(file_path).endswith(ext) for ext in archive_extensions
            )

            if not is_archive:
                continue

            try:
                extract_subdir = file_path.parent / file_path.stem
                extract_subdir.mkdir(exist_ok=True)

                logger.debug(f"Extracting nested archive: {file_path.name}")

                # Try different extraction methods
                extracted = False

                # Method 1: tar.gz
                if str(file_path).endswith(('.tar.gz', '.tgz')):
                    try:
                        with tarfile.open(file_path, 'r:gz') as tar:
                            tar.extractall(path=extract_subdir)
                        extracted = True
                    except Exception:
                        pass

                # Method 2: plain tar
                if not extracted and str(file_path).endswith('.tar'):
                    try:
                        with tarfile.open(file_path, 'r:') as tar:
                            tar.extractall(path=extract_subdir)
                        extracted = True
                    except Exception:
                        pass

                # Method 3: plain gzip (decompress to single file)
                if not extracted and str(file_path).endswith('.gz'):
                    try:
                        output_file = extract_subdir.parent / file_path.stem
                        with gzip.open(file_path, 'rb') as f_in:
                            with open(output_file, 'wb') as f_out:
                                shutil.copyfileobj(f_in, f_out)
                        extracted = True
                        # If it's a .tar after decompression, extract it
                        if output_file.suffix == '.tar':
                            try:
                                with tarfile.open(output_file, 'r:') as tar:
                                    tar.extractall(path=extract_subdir)
                                output_file.unlink()
                            except Exception:
                                pass
                    except Exception:
                        pass

                if extracted:
                    # Remove the archive file after successful extraction
                    file_path.unlink()
                    logger.debug(f"Extracted and removed: {file_path.name}")

                    # Recursively extract nested archives
                    self.extract_nested_archives(extract_subdir, depth + 1, max_depth)
                else:
                    logger.debug(f"Could not extract: {file_path.name}")

            except Exception as e:
                logger.debug(f"Error extracting nested archive {file_path.name}: {e}")

    def extract_tarball(self, tar_path: Path, arxiv_id: str, version: str) -> Tuple[Optional[Path], int, int, int]:
        """
        Extract a .tar.gz file and remove figures.
        --------
        Params:
            tar_path: Path to the .tar.gz file
            arxiv_id: arXiv identifier
            version: Version string

        Returns:
            Tuple of (extract_dir, raw_size, extracted_size, figures_removed)
        """
        if not tar_path or not tar_path.exists():
            logger.warning(f"Tar file not found: {tar_path}")
            return None, 0, 0, 0

        extract_dir = self.temp_dir / arxiv_id / version / "extracted"
        try:
            extract_dir.mkdir(parents=True, exist_ok=True)

            # Get raw size
            raw_size = tar_path.stat().st_size

            logger.info(f"Extracting {tar_path.name} ({raw_size} bytes)")

            extracted = False

            # Try multiple extraction methods
            # Method 1: tar.gz
            try:
                with tarfile.open(tar_path, 'r:gz') as tar:
                    tar.extractall(path=extract_dir)
                extracted = True
                logger.debug("Extracted as tar.gz")
            except (tarfile.ReadError, gzip.BadGzipFile) as e:
                logger.debug(f"Not a gzipped tar: {e}")

            # Method 2: plain tar
            if not extracted:
                try:
                    with tarfile.open(tar_path, 'r:') as tar:
                        tar.extractall(path=extract_dir)
                    extracted = True
                    logger.debug("Extracted as plain tar")
                except tarfile.ReadError as e:
                    logger.debug(f"Not a plain tar: {e}")

            # Method 3: gzip only (decompress then check if tar)
            if not extracted:
                try:
                    decompressed_path = extract_dir / "decompressed.tar"
                    with gzip.open(tar_path, 'rb') as f_in:
                        with open(decompressed_path, 'wb') as f_out:
                            shutil.copyfileobj(f_in, f_out)

                    # Try to open as tar
                    try:
                        with tarfile.open(decompressed_path, 'r:') as tar:
                            tar.extractall(path=extract_dir)
                        decompressed_path.unlink()
                        extracted = True
                        logger.debug("Extracted as gzip -> tar")
                    except tarfile.ReadError:
                        # It's just a gzipped file, not a tar
                        extracted = True
                        logger.debug("Extracted as plain gzip")

                except (gzip.BadGzipFile, OSError) as e:
                    logger.debug(f"Not gzipped: {e}")

            # Method 4: Copy as-is (might be a single .tex file)
            if not extracted:
                logger.warning(f"Could not extract {tar_path.name}, copying as-is")
                shutil.copy(tar_path, extract_dir / tar_path.name)
                extracted = True

            # Extract any nested archives (tar/tar.gz inside)
            logger.debug("Checking for nested archives...")
            self.extract_nested_archives(extract_dir)

            # Remove figures
            figures_removed = self.remove_figures_from_directory(extract_dir)

            # Delete raw file if configured
            if config.DELETE_RAW_AFTER_EXTRACT:
                try:
                    tar_path.unlink()
                    logger.debug(f"Deleted raw file: {tar_path.name}")
                except Exception as e:
                    logger.debug(f"Could not delete raw file: {e}")

            # Get size after figure removal (only .tex and .bib files)
            extracted_size = 0
            tex_bib_files = []
            for file_path in extract_dir.rglob('*'):
                if file_path.is_file():
                    if file_path.suffix.lower() in ['.tex', '.bib', '.sty', '.cls', '.bst']:
                        extracted_size += file_path.stat().st_size
                        tex_bib_files.append(file_path.name)

            if tex_bib_files:
                logger.info(f"Found {len(tex_bib_files)} LaTeX files")
            else:
                logger.warning(f"No LaTeX files found in {version}")

            logger.info(f"Extracted {version}: {raw_size} -> {extracted_size} bytes ({figures_removed} figures removed)")

            return extract_dir, raw_size, extracted_size, figures_removed

        except Exception as e:
            logger.error(f"Error extracting {tar_path}: {e}")
            if self.stats:
                self.stats.record_error(f"Extraction error: {type(e).__name__}")
            return None, 0, 0, 0

    def download_and_extract_version(self, arxiv_id: str, version: str) -> bool:
        """
        Download and extract a single version.
        --------
        Params:
            arxiv_id: arXiv identifier
            version: Version string

        Returns:
            True on success, False on failure
        """
        try:
            # Download
            tar_path = self.download_paper_source(arxiv_id, version)

            if not tar_path:
                if self.stats:
                    self.stats.record_version(success=False)
                return False

            # Extract
            extract_dir, raw_size, extracted_size, figures_removed = self.extract_tarball(
                tar_path, arxiv_id, version
            )

            if extract_dir:
                # Copy extracted files to organized structure
                if self.organizer:
                    try:
                        self.organizer.copy_source_files(arxiv_id, version, extract_dir)
                        logger.info(f"Success: Organized {version}")
                    except Exception as e:
                        logger.error(f"Error organizing files: {e}")

                if self.stats:
                    self.stats.record_file_sizes(raw_size, extracted_size, figures_removed)
                    self.stats.record_version(success=True)

                # Clean up temp directory if configured
                if config.DELETE_TEMP_AFTER_COPY:
                    try:
                        shutil.rmtree(extract_dir)
                        logger.debug(f"Cleaned up temp dir for {version}")
                    except Exception as e:
                        logger.debug(f"Could not clean up temp: {e}")

                return True
            else:
                if self.stats:
                    self.stats.record_version(success=False)
                return False

        except Exception as e:
            logger.error(f"Error processing {version}: {e}")
            if self.stats:
                self.stats.record_error(f"Processing error: {type(e).__name__}")
                self.stats.record_version(success=False)
            return False

Writing lib/source_downloader.py


In [61]:
%%writefile lib/metadata_updater.py
"""
Script to update metadata.json with actual file modification dates from .tex files
and remove papers that don't have any LaTeX source files.
"""

import json
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
import shutil
import config

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(config.LOGS_DIR / 'update_metadata.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class MetadataUpdater:
    """Update metadata.json files with actual file dates and clean up empty papers."""

    def __init__(self, papers_dir: Path):
        self.papers_dir = papers_dir
        self.stats = {
            'total_papers': 0,
            'updated_papers': 0,
            'deleted_papers': 0,
            'papers_without_tex': 0,
            'errors': 0
        }

    def get_tex_file_dates(self, tex_dir: Path) -> Dict[str, str]:
        """
        Get modification dates of .tex files in each version directory.

        Args:
            tex_dir: Path to the tex folder (contains v1/, v2/, etc.)

        Returns:
            Dictionary mapping version to date (e.g., {"v1": "2023-12-15", "v2": "2023-12-18"})
        """
        version_dates = {}

        # Find all version directories (v1, v2, v3, etc.)
        version_dirs = sorted([d for d in tex_dir.iterdir() if d.is_dir()])

        if not version_dirs:
            logger.debug(f"No version directories found in {tex_dir}")
            return version_dates

        for version_dir in version_dirs:
            version_name = version_dir.name  # e.g., "v1", "v2"

            # Find all .tex files in this version
            tex_files = list(version_dir.glob('*.tex'))

            if not tex_files:
                logger.debug(f"No .tex files in {version_dir}")
                continue

            # Get the latest modification time among all .tex files
            latest_mtime = max(f.stat().st_mtime for f in tex_files)

            # Convert to ISO date format
            date = datetime.fromtimestamp(latest_mtime).date().isoformat()
            version_dates[version_name] = date

            logger.debug(f"  {version_name}: {date} (from {len(tex_files)} .tex files)")

        return version_dates

    def update_metadata(self, paper_dir: Path) -> bool:
        """
        Update metadata.json with actual file dates.

        Args:
            paper_dir: Path to the paper directory

        Returns:
            True if paper has .tex files, False if should be deleted
        """
        metadata_file = paper_dir / 'metadata.json'
        tex_dir = paper_dir / 'tex'

        # Check if metadata exists
        if not metadata_file.exists():
            logger.warning(f"No metadata.json in {paper_dir.name}")
            return False

        # If there is no folder or files, return empty
        if not tex_dir.exists() or not any(tex_dir.iterdir()):
            logger.info(f"No tex directory or files in {paper_dir.name} - marking for deletion")
            return False

        # Get version dates from actual files
        version_dates = self.get_tex_file_dates(tex_dir)

        if not version_dates:
            logger.info(f"No .tex files found in {paper_dir.name} - marking for deletion")
            return False

        # Load metadata
        try:
            with open(metadata_file, 'r', encoding='utf-8') as f:
                metadata = json.load(f)
        except Exception as e:
            logger.error(f"Error loading metadata from {paper_dir.name}: {e}")
            self.stats['errors'] += 1
            return True  # Keep the paper even if we can't read metadata

        # Update revised_dates (all dates except v1)
        sorted_versions = sorted(version_dates.keys())
        print(sorted_versions)
        sorted_versions = sorted(version_dates.keys(), key=lambda x: (x[1:]))
        if len(sorted_versions) > 1:
            metadata['revised_dates'] = [version_dates[v] for v in sorted_versions[1:]]
        else:
            metadata['revised_dates'] = []

        # Save updated metadata
        try:
            with open(metadata_file, 'w', encoding='utf-8') as f:
                json.dump(metadata, f, indent=2, ensure_ascii=False)

            # Log changes if dates were updated
            logger.info(f"✓ Updated {paper_dir.name}:")
            logger.info(f"    Versions: {len(version_dates)}")
            logger.info(f"    Dates: {version_dates}")
            self.stats['updated_papers'] += 1

            return True

        except Exception as e:
            logger.error(f"Error saving metadata for {paper_dir.name}: {e}")
            self.stats['errors'] += 1
            return True  # Keep the paper even if we can't save

    def delete_paper(self, paper_dir: Path):
        """Delete a paper directory that has no .tex files."""
        try:
            shutil.rmtree(paper_dir)
            logger.info(f"✗ Deleted {paper_dir.name} (no .tex files)")
            self.stats['deleted_papers'] += 1
        except Exception as e:
            logger.error(f"Error deleting {paper_dir.name}: {e}")
            self.stats['errors'] += 1

    def process_all_papers(self, delete_empty: bool = True):
        """
        Process all papers in the papers directory.

        Args:
            delete_empty: If True, delete papers without .tex files
        """
        if not self.papers_dir.exists():
            logger.error(f"Papers directory not found: {self.papers_dir}")
            return

        # Get all paper directories
        paper_dirs = sorted([d for d in self.papers_dir.iterdir() if d.is_dir()])
        self.stats['total_papers'] = len(paper_dirs)

        logger.info("="*80)
        logger.info(f"Processing {len(paper_dirs)} papers")
        logger.info(f"Delete empty papers: {delete_empty}")
        logger.info("="*80)

        papers_to_delete = []

        for i, paper_dir in enumerate(paper_dirs, 1):
            logger.info(f"\n[{i}/{len(paper_dirs)}] Processing {paper_dir.name}")

            has_tex = self.update_metadata(paper_dir)

            if not has_tex:
                papers_to_delete.append(paper_dir)
                self.stats['papers_without_tex'] += 1

        # Delete papers without .tex files
        if delete_empty and papers_to_delete:
            logger.info(f"\n{'='*80}")
            logger.info(f"Deleting {len(papers_to_delete)} papers without .tex files")
            logger.info(f"{'='*80}")

            for paper_dir in papers_to_delete:
                self.delete_paper(paper_dir)
        elif papers_to_delete:
            logger.info(f"\n{'='*80}")
            logger.info(f"Found {len(papers_to_delete)} papers without .tex files (not deleted)")
            logger.info(f"{'='*80}")
            for paper_dir in papers_to_delete:
                logger.info(f"  - {paper_dir.name}")

    def print_summary(self):
        """Print summary statistics."""
        logger.info(f"\n{'='*80}")
        logger.info("SUMMARY")
        logger.info(f"{'='*80}")
        logger.info(f"Total papers processed:       {self.stats['total_papers']}")
        logger.info(f"Papers with updated dates:    {self.stats['updated_papers']}")
        logger.info(f"Papers without .tex files:    {self.stats['papers_without_tex']}")
        logger.info(f"Papers deleted:               {self.stats['deleted_papers']}")
        logger.info(f"Errors:                       {self.stats['errors']}")
        logger.info(f"{'='*80}")

        remaining_papers = self.stats['total_papers'] - self.stats['deleted_papers']
        logger.info(f"\nRemaining papers: {remaining_papers}")

        if self.stats['deleted_papers'] > 0:
            deletion_rate = (self.stats['deleted_papers'] / self.stats['total_papers']) * 100
            logger.info(f"Deletion rate: {deletion_rate:.1f}%")

Writing lib/metadata_updater.py


In [62]:
%%writefile main.py
import logging
import json
import time
import threading
import concurrent.futures

from lib.source_identifier import ArxivHarvester
from lib.source_downloader import SourceDownloader
from lib.reference_extractor import ReferenceExtractor
from lib.statistics_tracker import StatisticsTracker
from lib.paper_organizer import PaperOrganizer
from lib.metadata_updater import MetadataUpdater
import config

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(config.LOGS_DIR / 'main.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


def memory_monitor(stats, stop_event):
    """Background thread to monitor memory usage."""
    while not stop_event.is_set():
        stats.sample_memory()
        time.sleep(5)


def download_paper_sources(paper, downloader, organizer, stats):
    """
    Download all versions of a single paper.
    ----------
    Params
        paper: Paper metadata dictionary
        downloader: SourceDownloader instance
        organizer: PaperOrganizer instance
        stats: StatisticsTracker instance

    Returns:
        Tuple of (arxiv_id, success, processing_time)
    """
    arxiv_id = paper['arxiv_id']
    versions = paper['versions']

    start_time = time.time()

    try:
        logger.info(f"Downloading {arxiv_id} ({len(versions)} versions)")

        # Save metadata
        organizer.save_metadata(arxiv_id, paper)

        # Download and extract all versions
        for version in versions:
            success = downloader.download_and_extract_version(arxiv_id, version)
            if not success:
                logger.warning(f"Warning: Failed to download version {version['version']} of {arxiv_id}")

        processing_time = time.time() - start_time
        logger.info(f"Success: Downloaded {arxiv_id} in {processing_time:.2f}s")

        return arxiv_id, True, processing_time

    except Exception as e:
        logger.error(f"Error: Error downloading {arxiv_id}: {e}")
        import traceback
        traceback.print_exc()

        processing_time = time.time() - start_time
        stats.record_error(f"Download error: {type(e).__name__}")

        return arxiv_id, False, processing_time


def fetch_paper_references(arxiv_id, ref_extractor, organizer, stats):
    """
    Fetch references for a single paper.
    ----------
    Params
        arxiv_id: arXiv ID
        ref_extractor: ReferenceExtractor instance
        organizer: PaperOrganizer instance
        stats: StatisticsTracker instance

    Returns:
        Tuple of (arxiv_id, success, ref_count)
    """
    try:
        logger.info(f"Fetching references for {arxiv_id}")

        ref_data = ref_extractor.fetch_references(arxiv_id)

        if ref_data:
            organizer.save_references(arxiv_id, ref_data)
            ref_count = len(ref_data.get('references', []))
            stats.record_references(ref_count)
            logger.info(f"Success: Found {ref_count} references for {arxiv_id}")
            return arxiv_id, True, ref_count
        else:
            stats.record_references(0)
            logger.warning(f"Error: No references found for {arxiv_id}")
            return arxiv_id, False, 0

    except Exception as e:
        logger.error(f"Error: Error fetching references for {arxiv_id}: {e}")
        stats.record_error(f"Reference error: {type(e).__name__}")
        stats.record_references(0)
        return arxiv_id, False, 0


def main():
    """Main execution pipeline with optimized speed."""
    stats = StatisticsTracker()
    organizer = PaperOrganizer()
    stats.start()

    stop_monitoring = threading.Event()
    monitor_thread = threading.Thread(target=memory_monitor, args=(stats, stop_monitoring))
    monitor_thread.start()

    try:
        logger.info("=" * 80)
        logger.info("Starting arXiv Data Scraping")
        logger.info("=" * 80)

        # Step 1: Harvest metadata using arXiv API
        stats.start_phase("Metadata Harvesting (arXiv API)")
        logger.info("\n--- STEP 1: Harvesting metadata via arXiv API ---")

        paper_list_file = config.DATA_DIR / 'paper_list.json'

        if paper_list_file.exists():
            logger.info(f"Loading cached paper list from {paper_list_file}")
            with open(paper_list_file, 'r', encoding='utf-8') as f:
                papers = json.load(f)
            logger.info(f"Loaded {len(papers)} papers from cache")
        else:
            harvester = ArxivHarvester(
                id_ranges=config.ARXIV_ID_RANGES,
                checkpoint_interval=10  # Save every 10 batches
            )

            # Harvest papers with checkpoint support
            papers = harvester.harvest_all_papers()

            # Record identified papers
            for _ in papers:
                stats.record_paper_identified()

        # Save paper list
        with open(paper_list_file, 'w', encoding='utf-8') as f:
            json.dump(papers, f, indent=2, default=str)

        logger.info(f"Saved {len(papers)} papers to {paper_list_file}")

        stats.end_phase()
        stats.save_checkpoint()

        if not papers:
            logger.warning("No papers found. Exiting.")
            return

        logger.info(f"\n{'='*80}")
        logger.info(f"Found {len(papers)} papers to process")
        logger.info(f"{'='*80}\n")

        # Filter out already complete papers
        papers_to_process = []
        for paper in papers:
            arxiv_id = paper['arxiv_id']
            if organizer.is_paper_complete(arxiv_id):
                logger.info(f"Success: {arxiv_id} already complete, skipping")
                stats.record_paper_success(0)
            else:
                papers_to_process.append(paper)

        logger.info(f"\n{len(papers_to_process)} papers need processing\n")

        if not papers_to_process:
            logger.info("All papers already processed!")
            return

        # Step 2: Download all source papers concurrently
        stats.start_phase("Source Download (Parallel)")
        logger.info("\n--- STEP 2: Downloading all source papers (parallel) ---")
        logger.info(f"Max concurrent downloads: {config.PAPER_DOWNLOAD_BATCH_SIZE}")

        downloader = SourceDownloader(stats_tracker=stats, paper_organizer=organizer)

        download_results = {}
        downloaded_papers = []
        failed_downloads = []

        with concurrent.futures.ThreadPoolExecutor(max_workers=config.PAPER_DOWNLOAD_BATCH_SIZE) as executor:
            # Submit all download tasks
            future_to_paper = {
                executor.submit(download_paper_sources, paper, downloader, organizer, stats): paper
                for paper in papers_to_process
            }

            completed = 0
            total = len(papers_to_process)

            # Process completed downloads
            for future in concurrent.futures.as_completed(future_to_paper):
                paper = future_to_paper[future]
                completed += 1

                try:
                    arxiv_id, success, processing_time = future.result()
                    download_results[arxiv_id] = success

                    if success:
                        downloaded_papers.append(arxiv_id)
                        stats.record_paper_attempt()
                        logger.info(f"[{completed}/{total}] Success: {arxiv_id} - {processing_time:.2f}s")
                    else:
                        failed_downloads.append(arxiv_id)
                        stats.record_paper_attempt()
                        stats.record_paper_failure()
                        logger.warning(f"[{completed}/{total}] ✗ {arxiv_id} - Failed")

                    # Save checkpoint periodically
                    if completed % config.STATS_SAVE_INTERVAL == 0:
                        stats.save_checkpoint()
                        logger.info(f"--- Checkpoint: {completed}/{total} downloads completed ---")

                except Exception as e:
                    logger.error(f"[{completed}/{total}] ✗ Exception processing {paper['arxiv_id']}: {e}")
                    failed_downloads.append(paper['arxiv_id'])
                    stats.record_paper_attempt()
                    stats.record_paper_failure()

        stats.end_phase()
        stats.save_checkpoint()

        logger.info(f"\n{'='*80}")
        logger.info(f"Download phase completed!")
        logger.info(f"  Successful: {len(downloaded_papers)}/{total}")
        logger.info(f"  Failed: {len(failed_downloads)}/{total}")
        logger.info(f"{'='*80}\n")

        # Step 3: Fetch references for all downloaded papers
        stats.start_phase("Reference Extraction (Parallel)")
        logger.info("\n--- STEP 3: Fetching references for all papers (parallel) ---")
        logger.info(f"Max concurrent API calls: {config.REFERENCE_BATCH_SIZE}")

        ref_extractor = ReferenceExtractor(stats_tracker=stats)

        reference_results = {}
        papers_with_refs = []
        papers_without_refs = []

        with concurrent.futures.ThreadPoolExecutor(max_workers=config.REFERENCE_BATCH_SIZE) as executor:
            # Submit all reference fetching tasks
            future_to_arxiv_id = {
                executor.submit(fetch_paper_references, arxiv_id, ref_extractor, organizer, stats): arxiv_id
                for arxiv_id in downloaded_papers
            }

            completed = 0
            total = len(downloaded_papers)

            # Process completed reference fetches
            for future in concurrent.futures.as_completed(future_to_arxiv_id):
                arxiv_id = future_to_arxiv_id[future]
                completed += 1

                try:
                    arxiv_id_result, success, ref_count = future.result()
                    reference_results[arxiv_id_result] = (success, ref_count)

                    if success:
                        papers_with_refs.append(arxiv_id_result)
                        logger.info(f"[{completed}/{total}] Success: {arxiv_id_result} - {ref_count} refs")
                    else:
                        papers_without_refs.append(arxiv_id_result)
                        logger.warning(f"[{completed}/{total}] ✗ {arxiv_id_result} - No refs")

                    # Save checkpoint periodically
                    if completed % config.STATS_SAVE_INTERVAL == 0:
                        stats.save_checkpoint()
                        logger.info(f"--- Checkpoint: {completed}/{total} references fetched ---")

                    # Rate limiting - delay between API calls
                    time.sleep(config.REQUEST_DELAY)

                except Exception as e:
                    logger.error(f"[{completed}/{total}] ✗ Exception fetching refs for {arxiv_id}: {e}")
                    papers_without_refs.append(arxiv_id)

        stats.end_phase()
        stats.save_checkpoint()

        logger.info(f"\n{'='*80}")
        logger.info(f"Reference extraction completed!")
        logger.info(f"  With references: {len(papers_with_refs)}/{total}")
        logger.info(f"  Without references: {len(papers_without_refs)}/{total}")
        logger.info(f"{'='*80}\n")

        # Mark all successfully processed papers as complete
        for arxiv_id in downloaded_papers:
            if download_results.get(arxiv_id):
                # Mark as complete regardless of reference success
                # some papers legitimately have no references in Semantic Scholar
                # because they are very new or not indexed
                stats.record_paper_success(0)  # Time already recorded during download

        # Update metadata for all papers
        updater = MetadataUpdater(papers_dir=config.PAPERS_DIR)
        updater.process_all_papers(delete_empty=False)

        # Cleanup temp directory
        logger.info("\n--- Cleaning up temporary files ---")
        try:
            if config.TEMP_DIR.exists():
                import shutil
                shutil.rmtree(config.TEMP_DIR)
                config.TEMP_DIR.mkdir(parents=True, exist_ok=True)
                logger.info("Success: Cleaned up temp directory")
        except Exception as e:
            logger.warning(f"Could not clean temp directory: {e}")

        logger.info("\n" + "=" * 80)
        logger.info("Pipeline completed!")
        logger.info("=" * 80)
        logger.info(f"\nSummary:")
        logger.info(f"  Total papers: {len(papers)}")
        logger.info(f"  Already complete: {len(papers) - len(papers_to_process)}")
        logger.info(f"  Downloaded: {len(downloaded_papers)}")
        logger.info(f"  Failed downloads: {len(failed_downloads)}")
        logger.info(f"  With references: {len(papers_with_refs)}")
        logger.info(f"  Without references: {len(papers_without_refs)}")

    finally:
        stop_monitoring.set()
        monitor_thread.join()
        stats.end()
        stats.save_to_file(config.DATA_DIR / 'statistics_final.json')
        stats.save_checkpoint()
        stats.print_summary()
        logger.info(f"\nStatistics: {config.DATA_DIR / 'statistics_final.json'}")
        logger.info(f"Papers: {config.PAPERS_DIR}")


if __name__ == "__main__":
    main()

Writing main.py


In [63]:
!python main.py

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
2025-11-15 11:16:44,235 - INFO - [81/496] Success: 2312.15894 - 21 refs
2025-11-15 11:16:45,698 - INFO - Fetching references for arXiv:2312.16103 (attempt 6)
2025-11-15 11:16:47,235 - INFO - [82/496] Success: 2312.15868 - 0 refs
2025-11-15 11:16:47,441 - INFO - Fetching references for arXiv:2312.16143 (attempt 2)
2025-11-15 11:16:47,694 - INFO - Successfully fetched references for 2312.16143
2025-11-15 11:16:47,694 - INFO - Saved 0 arXiv references for 2312.16143
2025-11-15 11:16:47,695 - INFO - Success: Found 0 references for 2312.16143
2025-11-15 11:16:47,695 - INFO - Fetching references for 2312.16077
2025-11-15 11:16:47,695 - INFO - Fetching references for arXiv:2312.16077 (attempt 1)
2025-11-15 11:16:48,051 - INFO - Successfully fetched references for 2312.16077
2025-11-15 11:16:48,052 - INFO - Saved 6 arXiv references for 2312.16077
2025-11-15 11:16:48,052 - INFO - Success: Found 16 references for 2312.16077
2025-11

In [64]:
!zip -r scrape_data.zip /content/arxiv_scraper/data/papers

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/sec/1_intro.tex (deflated 54%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/sec/2_related_short.tex (deflated 54%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/sec/X_suppl.tex (deflated 68%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/sec/5_conclusion.tex (deflated 38%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/sec/4_experiments.tex (deflated 63%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/sec/3_method.tex (deflated 62%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/sec/0_abstract.tex (deflated 49%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/preamble.tex (deflated 44%)
  adding: content/arxiv_scraper/data/papers/2312-16145/tex/2312.16145v1/main.tex (deflated 

In [65]:
# %rm -rf /content/arxiv_scraper