# <center> **Milestone 1 — Module 1: Metadata and Paper**

## **Download and Import libraries**

In [None]:
# 1. Install dependencies
!pip -q install arxiv requests filetype orjson
!apt install unzip

  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.5/81.5 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for sgmllib3k (setup.py) ... [?25l[?25hdone
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
unzip is already the newest version (6.0-26ubuntu3.2).
0 upgraded, 0 newly installed, 0 to remove and 41 not upgraded.


In [None]:
# 2. Imports & constants
from __future__ import annotations
import os, re, tarfile, shutil, json, orjson, time, threading, urllib, atexit
from pathlib import Path
from typing import Dict, List, Optional, Any, Iterable, Tuple
from dateutil import parser as dtparser
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests, arxiv, gzip, filetype, io, struct, hashlib, unicodedata
import xml.etree.ElementTree as ET
import math
import signal
import sys

try:
    import psutil
except Exception:
    psutil = None

try:
    import resource
    _HAS_RESOURCE = True
except Exception:
    _HAS_RESOURCE = False

from google.colab import drive, runtime
drive.mount('/content/drive', force_remount = True)

Mounted at /content/drive


In [None]:
# Create shortcut on My Drive before execution
%cd '/content/drive/MyDrive/[Project] Milestone 1'

/content/drive/MyDrive/[Project] Milestone 1


## **Config and constants**

In [None]:
# Data range
START_MONTH, START_ID = "2022-11", 13747
# END_MONTH, END_ID = "2022-12", 11475
END_MONTH, END_ID = "2022-11", 15247

# Output paths
WORK_DIR = Path("workdir")

# Characters forbidden in path components
_FORBIDDEN = set('/\0')

# Unicode control character ranges
_CTRL_RANGES = [(0x00, 0x1F), (0x7F, 0x9F)]

# Unicode surrogate code point range
_SURR_MIN, _SURR_MAX = 0xD800, 0xDFFF

# Maximum allowed filename component length
_MAX_COMP_LEN = 200

## **Ram measurement**

Function `get_rss_bytes()` returns the resident memory (RSS) usage in bytes for the current process, optionally including child processes.

1. **Uses `psutil` (preferred):** retrieves RSS for the main process and all child processes.  
2. **Fallback `/proc/self/status`:** parses `VmRSS` from Linux’s process status file.  
3. **Fallback `resource.getrusage`:** reads maximum RSS usage from the OS (adjusting units for macOS/Linux).  
4. **Fallback `tracemalloc`:** returns Python heap memory if system methods fail.  
5. Returns `0` if all detection methods fail.

In [None]:
def get_rss_bytes(include_children: bool = True):
    """
    Function `get_rss_bytes()` returns the resident memory (RSS) in bytes for the current process.
    """

    # Try psutil (preferred; supports child processes)
    try:
        proc = psutil.Process(os.getpid())
        rss = proc.memory_info().rss
        if include_children:
            for ch in proc.children(recursive=True):
                try:
                    rss += ch.memory_info().rss
                except Exception:
                    pass  # child may have exited or be inaccessible
        return int(rss)
    except Exception:
        pass

    # Try reading /proc/self/status (Linux only)
    try:
        if sys.platform.startswith("linux"):
            with open("/proc/self/status") as f:
                for line in f:
                    if line.startswith("VmRSS:"):
                        kb = int(line.split()[1])
                        return kb * 1024
    except Exception:
        pass

    # Try resource.getrusage (Unix)
    try:
        usage = resource.getrusage(resource.RUSAGE_SELF)
        rss = usage.ru_maxrss
        if sys.platform == "darwin":
            return int(rss)        # bytes on macOS
        else:
            return int(rss) * 1024  # kilobytes on Linux
    except Exception:
        pass

    # Fallback: tracemalloc (Python heap only)
    try:
        import tracemalloc
        if not tracemalloc.is_tracing():
            tracemalloc.start()
        current, _ = tracemalloc.get_traced_memory()
        return int(current)
    except Exception:
        pass

    # Return 0 if all methods fail
    return 0

Function `display_bytes()` converts a byte count into a human-readable string with appropriate size units.

1. Returns `"0 B"` for zero or negative values.  
2. Selects the correct unit (KB, MB, GB, …) based on log base 1024.  
3. Shows both the raw byte value and a scaled, formatted value when applicable.  
4. Falls back to plain bytes when no scaling is needed.

In [None]:
def display_bytes(n: int):
    # Human-readable units
    units = ["B", "KB", "MB", "GB", "TB", "PB"]

    # Handle zero or negative values
    if n <= 0:
        return "0 B"

    # Determine appropriate unit
    i = min(int(math.log(n, 1024)), len(units) - 1)

    # Format with readable scaling
    if i > 0:
        return f"{n} B ({n / (1024 ** i):.2f} {units[i]})"

    # Bytes only (no scaling)
    return f"{n} B"

Function `disk_usage_for()` returns total, used, and free disk space for the given path (or nearest existing parent).

1. Chooses the closest existing directory: the path itself, its parent, or `/` as fallback.  
2. Uses `shutil.disk_usage()` to retrieve disk statistics.  
3. Returns a dictionary containing total, used, and free bytes.

In [None]:
def disk_usage_for(path: Path):
    # Choose existing path or fallback to its parent / root
    base = path if path.exists() else path.parent if path.parent.exists() else Path("/")

    # Get disk usage statistics
    du = shutil.disk_usage(base)

    # Return usage summary
    return {
        "disk_total": du.total,
        "disk_used": du.used,
        "disk_free": du.free,
    }

Function `get_folder_size()` computes the total size (in bytes) of all files inside a directory recursively.

1. Walks through all files under the given path using `rglob()`.  
2. Sums the file sizes obtained via `stat().st_size`.  
3. Ignores directories and only counts actual files.  
4. Returns the total size in bytes.

In [None]:
def get_folder_size(path: Path) -> int:
    # Sum sizes of all files under the folder
    return sum(f.stat().st_size for f in path.rglob('*') if f.is_file())

Class `MemoryTracker` tracks the process’s resident memory (RSS) usage, recording both average and peak values.

1. Initializes tracking state using `psutil.Process`.  
2. `sample()` records the current RSS and updates running totals and peak.  
3. `avg_rss_bytes()` returns the average RSS across all samples.  
4. `peak_rss_bytes()` returns the peak RSS using OS-reported values when available (fallback: observed peak or average).

In [None]:
class MemoryTracker:
    """
    Tracks process RSS (bytes) average and peak.
    """

    def __init__(self, sample_interval_sec: float = 0.0):
        # Initialize state (interval kept for API compatibility)
        self.proc = psutil.Process(os.getpid())
        self.count = 0
        self.sum_rss = 0
        self.peak_rss = 0  # observed peak in bytes

    def sample(self):
        # Record current RSS and update stats
        rss = self.proc.memory_info().rss
        self.count += 1
        self.sum_rss += rss
        if rss > self.peak_rss:
            self.peak_rss = rss

    def avg_rss_bytes(self) -> int:
        # Return average RSS
        return int(self.sum_rss / self.count) if self.count else 0

    def peak_rss_bytes(self) -> int:
        # Use OS-reported peak if available
        if _HAS_RESOURCE:
            ru = resource.getrusage(resource.RUSAGE_SELF)
            ru_kb = getattr(ru, "ru_maxrss", 0)

            # Detect if ru_maxrss is in KB (Linux) or bytes (macOS)
            if ru_kb and ru_kb < 1 << 22:  # < 4M → likely KB
                return int(ru_kb * 1024)

            # Otherwise treat as bytes, or fall back to observed values
            return int(ru_kb) if ru_kb else max(self.peak_rss, self.avg_rss_bytes())

        # Fallback to observed values only
        return max(self.peak_rss, self.avg_rss_bytes())

Class `DiskTracker` tracks disk usage growth under a given root path, recording peak usage and cumulative bytes written.

1. Initializes baseline disk usage (`start_used_bytes`).  
2. `sample()` reads current disk usage and updates:  
   - peak disk usage seen,  
   - cumulative bytes written (only positive deltas),  
   - last recorded usage value.  
3. Ignores errors to avoid interrupting main program execution.

In [None]:
class DiskTracker:
    def __init__(self, root: Path):
        # Initialize baseline disk usage
        self.root = root
        start = disk_usage_for(root)['disk_used']
        self.start_used_bytes = start
        self.last_used_bytes = start
        self.peak_used_bytes = start
        self.bytes_written = 0  # cumulative positive deltas only

    def sample(self):
        # Update disk usage statistics
        try:
            now = disk_usage_for(self.root)['disk_used']

            # Track peak usage
            if now > self.peak_used_bytes:
                self.peak_used_bytes = now

            # Count only positive growth as "bytes written"
            delta = now - self.last_used_bytes
            if delta > 0:
                self.bytes_written += delta

            self.last_used_bytes = now

        # Never break execution due to disk tracking failures
        except Exception:
            pass

## **Utility functions**

Function `ensure_dir(p: Path)` ensures that the directory at path `p` exists.

In [None]:
def ensure_dir(p: Path):
    """
    Function `ensure_dir()` ensures that the given directory exists.
    """

    # Create directory (including parents) if missing
    p.mkdir(parents=True, exist_ok=True)

In [None]:
# Setup directories
metadatas_dir = WORK_DIR / "metadata and reference"
papers_dir  = WORK_DIR / "paper"

ensure_dir(metadatas_dir)
ensure_dir(papers_dir)

Function `month_to_yymm()` converts a month string to `YYMM` format.

In [None]:
def month_to_yymm(month_str: str):
    """
    Function `month_to_yymm()` converts a month string into YYMM format.
    """

    # Parse month as a full date and format as YYMM
    return dtparser.parse(month_str + "-01").strftime("%y%m")

Function `base_id_to_tuple()` parses an arXiv base ID into a tuple `(year, number)`.

In [None]:
def base_id_to_tuple(base_id: str):
    """
    Function `base_id_to_tuple()` extracts numeric components from an arXiv base ID.
    """

    # Match IDs like YYYY.NNNN or YYYY.NNNNN
    m = re.fullmatch(r"(\d{4})\.(\d{4,5})", base_id)
    if not m:
        raise ValueError(f"Bad arXiv base id format: {base_id}")

    # Return year and sequence number
    return int(m.group(1)), int(m.group(2))

Function `within_range()` checks whether a given `(yyyymm, iid)` pair falls within a specified range.

In [None]:
def within_range(yyyymm: int, iid: int, start_yyyymm: int, start_id: int, end_yyyymm: int, end_id: int):
    """
    Function `within_range()` checks whether a given (yyyymm, iid) pair falls within a specified range.
    """

    # Reject if month outside range
    if yyyymm < start_yyyymm or yyyymm > end_yyyymm:
        return False

    # Reject if before start ID (same month)
    if yyyymm == start_yyyymm and iid < start_id:
        return False

    # Reject if after end ID (same month)
    if yyyymm == end_yyyymm and iid > end_id:
        return False

    # Within full range
    return True

Function `_atomic_write_json()` safely writes JSON data by writing to a temporary file first, then replacing the original file to avoid corruption.

In [None]:
def _atomic_write_json(path: Path, data: Any, indent: int = 2):
    """
    Function `_atomic_write_json()` safely writes JSON data using an atomic write pattern.
    """

    # Ensure destination directory exists
    path.parent.mkdir(parents=True, exist_ok=True)

    # Write data to a temporary file
    tmp = path.with_suffix(path.suffix + ".tmp")
    with tmp.open("w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=indent)

    # Atomically replace target file
    tmp.replace(path)

Function `save_state()` safely saves a state dictionary to a JSON file.

In [None]:
def save_state(path: Path, state: Dict[str, Any]):
    """
    Function `save_state()` saves a program state dictionary to a file safely.
    """

    # Write state atomically to JSON file
    _atomic_write_json(path, state)

Function `detect_with_filetype()` detects a file’s type and MIME information using the `filetype` library.

1. Uses `filetype.guess()` to identify the file’s format.  
2. Returns the file extension and MIME type if detected.  
3. Returns `(None, "unknown")` if the file type cannot be determined.

In [None]:
def detect_with_filetype(path: Path):
    """
    Function `detect_with_filetype()` detects a file's type using the `filetype` library.
    """

    # Detect file type via filetype.guess()
    kind = filetype.guess(path)

    # Return extension and MIME if found
    if kind:
        return kind.extension, f"{kind.mime} (by filetype)"

    # Unknown file type
    return None, "unknown"

## **Crawling metadata**

Function `build_id_list()` creates a list of arXiv IDs to crawl within the given range.

1. Starts from `start_yymm` and iterates until `end_yymm`.  
2. Generates IDs in the format `"yymm.xxxxx"`.  
3. Handles same-month and multi-month ranges differently.  
4. Rolls over the year correctly when reaching December.  
5. Returns the complete list of formatted arXiv IDs.


In [None]:
def build_id_list(start_yymm: int, start_id: int, end_yymm: int, end_id: int):
    """
    Function `build_id_list()` creates a list of arXiv base IDs between given start and end points.
    """
    id_list = []
    yymm = start_yymm

    # Handle same-month range
    if start_yymm == end_yymm:
        id_list = [f"{yymm}.{i:05d}" for i in range(start_id, end_id + 1)]
    else:
        # Iterate through each month until end_yymm
        while yymm <= end_yymm:
            # First month: start from given ID
            if yymm == start_yymm:
                id_list.extend([f"{yymm}.{i:05d}" for i in range(start_id, 30001)])
            # Last month: stop at end_id
            elif yymm == end_yymm:
                id_list.extend([f"{yymm}.{i:05d}" for i in range(1, end_id + 1)])
            # Intermediate months: full range
            else:
                id_list.extend([f"{yymm}.{i:05d}" for i in range(1, 30001)])

            # Increment month (handle year rollover)
            if yymm % 100 == 12:
                yymm += 88  # next year
            yymm += 1
            start_id = 1

    return id_list

Function `load_checkpoint_state()` loads the most recent checkpoint and determines where to resume the crawl.

1. Initializes default values (IDs, timers).  
2. Returns defaults if the checkpoint folder doesn’t exist.  
3. Finds the latest `metadata_checkpoint_*.json` file.  
4. Loads its state and extracts the last crawled ID and total time.  
5. If the crawl already reached the end, returns `None`s.  
6. Otherwise, returns the next starting point and updated state info.

In [None]:
def load_metadata_checkpoint_state(checkpoint_folder: Optional[Path], end_yymm: int, end_id: int):
    """
    Function `load_checkpoint_state()` loads the latest crawling checkpoint to resume progress.
    """
    id_num = 1
    checkpoint_id = 1
    total_time = 0
    start_yymm = None

    # Skip if no checkpoint folder
    if not checkpoint_folder or not checkpoint_folder.exists():
        return None, id_num, checkpoint_id, total_time, None

    # Find latest checkpoint file
    checkpoints = list(checkpoint_folder.glob("metadata_checkpoint_*.json"))
    if not checkpoints:
        return None, id_num, checkpoint_id, total_time, None

    latest = max(checkpoints, key=lambda p: int(p.stem.split('_')[-1].split('.')[0]))
    print(f"Resuming from checkpoint: {latest}")
    checkpoint_id = int(latest.stem.split('_')[-1]) + 1

    # Load checkpoint data
    with latest.open("r") as f:
        state = orjson.loads(f.read())
        last_crawled_id = state['last_id']
        total_time = state['total_time']
        last_yymm, last_id_num = base_id_to_tuple(last_crawled_id)

        # Stop if already reached end
        if last_yymm == end_yymm and last_id_num >= end_id:
            print("Crawling already completed up to the end ID.")
            return None, None, None, None, None

        # Set next start position
        start_yymm, id_num = last_yymm, last_id_num + 1

    return start_yymm, id_num, checkpoint_id, total_time, last_yymm

Function `save_checkpoint()` safely saves a progress checkpoint for each batch.

1. Builds a new state dictionary with the latest ID and timing info.  
2. Uses `save_state()` to perform an atomic JSON write.  
3. Names the file `metadata_checkpoint_<id>` in the checkpoint folder.  
4. Prints a confirmation message after saving.

In [None]:
def save_metadata_checkpoint(checkpoint_folder: Path,
                             checkpoint_id: int,
                             last_success_id: Optional[str],
                             section_time: float,
                             total_time: float,
                             mem_tracker: MemoryTracker,
                             disk_start: dict,
                             note: str = ""):
    """
    Enhanced checkpoint writer. Adds memory & disk info and an optional note.
    """

    # Compute disk usage delta since checkpoint start
    disk_added = disk_usage_for(checkpoint_folder)['disk_used'] - disk_start['disk_used']

    # Build checkpoint state
    state = {
        "last_id": last_success_id,   # may be None if nothing succeeded yet
        "section_time": section_time,
        "total_time": total_time,
        "peak_rss": display_bytes(mem_tracker.peak_rss_bytes()),
        "avg_rss": display_bytes(mem_tracker.avg_rss_bytes()),
        "disk_added": display_bytes(disk_added),
        "note": note,
    }

    # Save checkpoint atomically
    path = checkpoint_folder / f"metadata_checkpoint_{checkpoint_id}.json"
    save_state(path, state)

    # Log confirmation
    print(f"[checkpoint] Saved checkpoint #{checkpoint_id}.")

Function `fetch_metadata_worker()`  fetches metadata for one arXiv paper (all versions) and safely saves it to `metadata.json`.

1. Extracts the base arXiv ID and version count.  
2. Skips if `metadata.json` already exists.  
3. Builds a metadata dictionary with title, authors, etc.  
4. Queries all versions and merges authors, DOIs, comments, and dates.  
5. Removes duplicates and saves using an atomic JSON write.  
6. Returns `1` on success, `0` on failure.

In [None]:
def fetch_metadata_worker(result: arxiv.Result,
                          client: arxiv.Client,
                          checkpoint_folder: Path,
                          query_delay: float = 3.0):
    """
    Function `fetch_metadata_worker()` downloads and saves metadata for a single arXiv paper.
    """

    try:
        # Extract arXiv ID and version count
        arxiv_id = result.get_short_id()
        v_pos = arxiv_id.rfind('v')
        version_counts = int(arxiv_id[v_pos + 1:])
        base_id = arxiv_id[:v_pos]

        # Path to metadata file
        meta_path = checkpoint_folder / "metadata.json"

        # Skip if already exists
        if meta_path.exists():
            print(f"Metadata already exists for {base_id}")
            return 1

        print(f"Creating metadata for {base_id}...")

        # Init base metadata dict
        metadata = {
            "title": result.title,
            "authors": [a.name for a in result.authors],
            "publication_venue": [],
            "submission_date": result.published.strftime("%Y-%m-%d"),
            "revised_dates": [],
        }

        # Query all paper versions
        sub_search = arxiv.Search(
            id_list=[f"{base_id}v{ver}" for ver in range(1, version_counts + 1)]
        )
        time.sleep(query_delay)

        # Merge metadata from each version
        for sub_result in client.results(sub_search):
            metadata["authors"] = list(set(metadata["authors"] + [a.name for a in sub_result.authors]))
            if sub_result.journal_ref:
                metadata["publication_venue"].append(sub_result.journal_ref)
            metadata["revised_dates"].append(sub_result.updated.strftime("%Y-%m-%d"))

        # Deduplicate venues & DOIs
        metadata["publication_venue"] = list(set(metadata["publication_venue"]))

        # Save metadata atomically
        save_state(meta_path, metadata)
        print(f"Fetched metadata for arXiv ID {base_id}")
        return 1

    except Exception as e:
        # Log failure
        print(f"[WARN] Failed to fetch metadata for {result.get_short_id()}: {e}")
        return 0

Function `crawl_arXiv_metadata()` crawls and saves arXiv metadata for a range of paper IDs, handling multiple versions and checkpoints safely.

1. Converts month strings to numeric `YYMM` values.  
2. Loads or resumes from the latest checkpoint if available.  
3. Builds a list of arXiv IDs to crawl within the specified range.  
4. Iterates through ID batches using an `arxiv.Client`.  
5. Uses multithreading to fetch metadata in parallel with `fetch_metadata_worker()`.  
6. Skips already existing metadata files to prevent overwriting.  
7. Saves progress after each successful batch via `save_checkpoint()`.  
8. Automatically continues to the next month if the current list is empty.

In [None]:
def crawl_arXiv_metadata(start_month: str, start_id: int,
                         end_month: str, end_id: int,
                         batch_size: int = 100,
                         delay: float = 0.125,
                         query_delay: float = 3,
                         retries: int = 3,
                         folder: Optional[Path] = Path('./')):
    """
    Crash-resilient single-thread metadata crawler with checkpointing.
    """

    # Ensure output folder exists
    ensure_dir(folder)

    # Convert month strings to YYMM
    start_yymm = int(month_to_yymm(start_month))
    end_yymm = int(month_to_yymm(end_month))

    # Resume from last checkpoint if available
    resume = load_metadata_checkpoint_state(folder, end_yymm, end_id)
    if resume == (None, None, None, None, None):
        return
    if resume[0] is not None:
        start_yymm, id_num, checkpoint_id, total_time, _ = resume
    else:
        id_num, checkpoint_id, total_time = start_id, 1, 0

    # Build ID list and initialize client
    id_list = build_id_list(start_yymm, id_num, end_yymm, end_id)
    client = arxiv.Client(page_size=batch_size, delay_seconds=delay, num_retries=retries)

    # Initialize memory/disk tracking
    mem = MemoryTracker()
    process_start = time.time()
    disk_start = disk_usage_for(folder)
    last_success_id: Optional[str] = None

    # Final checkpoint snapshot (atexit/signal)
    def final_snapshot(note="atexit/signal snapshot"):
        try:
            section_time = time.time() - process_start
            mem.sample()
            save_metadata_checkpoint(folder, checkpoint_id, last_success_id,
                                     section_time, total_time, mem, disk_start, note=note)
        except Exception:
            pass

    atexit.register(final_snapshot)

    # Trap termination signals and force final snapshot
    def signal_handler(signum, frame):
        print(f"[signal] Caught {signum}, flushing checkpoint...")
        final_snapshot(note=f"signal {signum}")
        signal.signal(signum, signal.SIG_DFL)
        os.kill(os.getpid(), signum)

    for sig in (signal.SIGINT, signal.SIGTERM):
        try:
            signal.signal(sig, signal_handler)
        except Exception:
            pass

    curr_batch_start = 0
    yymm = start_yymm

    # Main crawl loop
    try:
        while curr_batch_start < len(id_list):

            # Slice a batch
            batch_ids = id_list[curr_batch_start:curr_batch_start + batch_size]
            curr_batch_start += batch_size

            successful_crawls = 0
            time.sleep(delay * 8)
            section_start = time.time()

            # Process IDs one-by-one
            for full_id in batch_ids:
                mem.sample()

                base_id = full_id
                meta_folder = folder / base_id.replace(".", "-")
                meta_path = meta_folder / "metadata.json"

                # Skip if metadata already exists
                if meta_path.exists():
                    print(f"Metadata exists for {base_id}, skipping.")
                    successful_crawls += 1
                    continue

                # Fetch metadata
                try:
                    result = list(client.results(arxiv.Search(id_list=[base_id])))[0]
                    ok = fetch_metadata_worker(result, client, meta_folder, query_delay)
                    if ok:
                        successful_crawls += 1
                        last_success_id = base_id
                except Exception as e:
                    print(f"[WARN] Unexpected error for {base_id}: {e}")

                time.sleep(delay)

            # End-of-batch timing and cumulative time
            section_time = time.time() - section_start
            if successful_crawls > 0:
                total_time += section_time

            # Save checkpoint for the batch
            mem.sample()
            save_metadata_checkpoint(folder, checkpoint_id, last_success_id,
                                     section_time, total_time, mem, disk_start, note="end-of-batch")
            checkpoint_id += 1

            # Move to next month if batch empty and not at end
            if successful_crawls == 0 and yymm < end_yymm:
                print("Metadata list is empty. Moving to next month...")
                curr_batch_start = 0
                if yymm % 100 == 12:
                    yymm += 88  # next year
                yymm += 1
                id_list = id_list[id_list.index(f"{yymm}.00001"):]

    # Always write final snapshot on any exit path
    finally:
        final_snapshot(note="final")

Function `parse_one_metadata()` returns a list of arXiv version IDs extracted from a single `metadata.json` file.

1. Loads the metadata using `orjson` for fast parsing.  
2. Recovers the base arXiv ID from the parent directory name (converting `-` back to `.`).  
3. Reads the `revised_dates` field to determine the number of versions.  
4. Constructs version IDs like `1234.56789v1`, `1234.56789v2`, …  
5. Returns an empty list if parsing fails or required fields are missing.

In [None]:
def parse_one_metadata(meta_file: Path):
    """Return list of version IDs for a single metadata.json (or empty if bad)."""

    try:
        # Load metadata JSON
        with open(str(meta_file), "rb") as f:
            data = orjson.loads(f.read())

        # Recover base arXiv ID from folder name
        arxiv_id = str(meta_file).replace('\\', '/').split('/')[-2].replace('-', '.')
        if not arxiv_id:
            return []

        # Generate version list based on revised_dates
        rev = data.get("revised_dates")
        n_ver = len(rev)
        return [f"{arxiv_id}v{v}" for v in range(1, n_ver + 1)]

    # Return empty list on any parsing error
    except Exception as e:
        print(f"[WARN] Failed to parse metadata file: {meta_file} with error: {e}")
        return []

Function `get_version_list_from_metadata()` reads all `metadata.json` files in subfolders to build a list of arXiv version IDs.

1. Iterates through subdirectories inside `metadata_dir`.  
2. Opens each `metadata.json` if it exists and extracts the paper’s base ID.  
3. Counts how many versions (`revised_dates`) each paper has.  
4. Appends all version IDs like `1234.56789v1`, `1234.56789v2`, etc.  
5. Prints the total number of versions and returns the list.

In [None]:
def get_version_list_from_metadata(metadata_dir: Path, workers: int = None):
    """
    Collect all arXiv version IDs from metadata subfolders efficiently.
    """

    # Collect all existing metadata.json files
    meta_files = []
    for entry in os.scandir(metadata_dir):
        if entry.is_dir(follow_symlinks=False):
            mp = Path(entry.path) / "metadata.json"
            if mp.is_file():
                print(str(mp))
                meta_files.append(mp)

    # Early exit if nothing found
    version_list = []
    if not meta_files:
        print("Total versions to download: 0")
        return version_list

    # Pick a default number of worker threads
    if workers is None:
        workers = min(32, (os.cpu_count() or 4) * 4)

    # Parse all metadata.json files in parallel
    with ThreadPoolExecutor(max_workers=workers) as ex:
        futures = [ex.submit(parse_one_metadata, mp) for mp in meta_files]
        for fut in as_completed(futures):
            version_list.extend(fut.result())

    # Sort and report total count
    version_list.sort()
    print(f"Total versions to download: {len(version_list)}")
    return version_list

## **Extraction**

Function `is_control()` checks whether a character is a control character based on predefined Unicode ranges.

1. Gets the Unicode code point of the character.  
2. Compares it against all control code ranges in `_CTRL_RANGES`.  
3. Returns `True` if the character falls within any of those ranges, otherwise `False`.

In [None]:
def is_control(ch: str) -> bool:
    """
    Function `_is_control()` checks whether a character is a control character.
    """

    # Get Unicode code point
    cp = ord(ch)

    # Return True if within control ranges
    return any(lo <= cp <= hi for lo, hi in _CTRL_RANGES)

Function `_sanitize_component()` cleans and normalizes a filename component to make it safe for file systems.

1. Normalizes text to NFC form for consistent Unicode representation.  
2. Replaces forbidden characters (path separators, NULs) with underscores.  
3. Replaces control and surrogate characters with underscores for safety.  
4. Ensures the component isn’t empty, `"."`, or `".."`; replaces with `"_"` if so.  
5. Prevents leading hyphens by prefixing an underscore.  
6. Truncates overly long names while preserving extensions if possible.  
7. Returns the sanitized string.

In [None]:
def santitize_component(comp: str) -> str:
    """
    Function `santitize_component()` cleans and normalizes a filename component to ensure it is safe for use.
    """

    # Normalize Unicode to NFC
    comp = unicodedata.normalize('NFC', comp)

    # Replace forbidden chars ('/' or NUL)
    comp = ''.join('_' if c in _FORBIDDEN else c for c in comp)

    # Replace control and surrogate characters
    cleaned = []
    for c in comp:
        o = ord(c)
        if is_control(c):
            cleaned.append('_')
        elif _SURR_MIN <= o <= _SURR_MAX:
            cleaned.append('_')  # unpaired surrogate
        else:
            cleaned.append(c)
    comp = ''.join(cleaned).strip()

    # Replace empty or dot-only names
    if comp in ('', '.', '..'):
        comp = '_'

    # Prevent leading '-' for tool safety
    if comp.startswith('-'):
        comp = '_' + comp[1:]

    # Truncate long names while preserving extension
    if len(comp) > _MAX_COMP_LEN:
        stem, dot, ext = comp.partition('.')
        if dot:
            stem = stem[:_MAX_COMP_LEN - 1 - len(ext)]
            comp = f"{stem}.{ext}"
        else:
            comp = comp[:_MAX_COMP_LEN]

    return comp

Function `sanitize_relative_path()` sanitizes a POSIX-style relative path to remove unsafe components and ensure filesystem safety.

1. Splits the path by `/` and removes empty segments.  
2. Sanitizes each component using `santitize_component()`.  
3. Replaces invalid components (`""`, `"."`, `".."`) with `"_"`.  
4. Reassembles the cleaned components into a safe relative path.  
5. Prints a log message if the sanitized output differs from the original.  
6. Returns the sanitized relative path.

In [None]:
def sanitize_relative_path(posix_path: str) -> str:
    """
    Function `sanitize_relative_path()` sanitizes a relative POSIX-style path to ensure safety and validity.
    """

    # Split by '/' and remove empty parts
    parts = [p for p in posix_path.split('/') if p not in ('',)]

    # Sanitize each path component
    san = [santitize_component(p) for p in parts]

    # Replace invalid or unsafe components
    san = ['_' if p in ('', '.', '..') else p for p in san]

    # Rejoin sanitized parts
    clean_rel = '/'.join(san)

    # Log changes if modified
    if clean_rel != posix_path:
        print(f"[sanitize] {posix_path!r} -> {clean_rel!r}")

    return clean_rel

Function `dedupe_path()` generates a unique file path by appending a short hash if the target already exists.

1. Returns the target path directly if it doesn’t exist.  
2. Computes an 8-character SHA-1 hash from `original_hint`.  
3. Appends the hash to the filename (e.g., `file__abcd1234.txt`).  
4. If that name also exists, adds an incrementing number (`_1`, `_2`, …).  
5. Returns the first non-existing candidate path.

In [None]:
def dedupe_path(target: Path, original_hint: str) -> Path:
    """If 'target' exists, append a short hash derived from the original name."""

    # Return if path doesn't exist
    if not target.exists():
        return target

    # Build hashed candidate name
    stem = target.stem
    suffix = target.suffix
    h = hashlib.sha1(original_hint.encode('utf-8', 'surrogatepass')).hexdigest()[:8]
    cand = target.with_name(f"{stem}__{h}{suffix}")

    # Add numeric suffix on repeated collisions
    i = 1
    while cand.exists():
        cand = target.with_name(f"{stem}__{h}_{i}{suffix}")
        i += 1

    return cand

Function `read_gzip_original_name()` extracts and sanitizes the original filename from a GZIP header if available.

1. Opens the file in binary mode and verifies the GZIP magic header.  
2. Checks compression method and flag bits for validity.  
3. Skips optional header sections (EXTRA, COMMENT, HCRC) as needed.  
4. Reads the `NAME` field (original filename) if the flag is set.  
5. Decodes the name safely with Latin-1, sanitizes it, and keeps only the basename.  
6. Returns the sanitized filename or `None` if unavailable or invalid.

In [None]:
def read_gzip_original_name(path: Path) -> Optional[str]:
    """
    Parse the gzip header to recover the original filename (FNAME) if present.
    Returns a sanitized base name (no directories). If absent, returns None.
    """

    # Open and read base header
    with path.open("rb") as f:
        data = f.read(10)  # ID1 ID2 CM FLG MTIME(4) XFL OS
        if len(data) < 10:
            return None

        # Validate GZIP magic bytes
        # GZIP_MAGIC: b"\x1f\x8b"
        if data[0:2] != b"\x1f\x8b":
            return None

        cm = data[2]
        flg = data[3]

        # Skip invalid compression methods or reserved flags
        # F_RESERVED: 0xE0
        if cm != 8 or (flg & 0xE0):
            return None

        # Helper to skip arbitrary bytes
        def _skip(n: int):
            f.seek(n, io.SEEK_CUR)

        # Skip optional EXTRA field
        # F_EXTRA: 0x04
        if flg & 0x04:
            xtra_len_bytes = f.read(2)
            if len(xtra_len_bytes) != 2:
                return None
            xlen = struct.unpack("<H", xtra_len_bytes)[0]
            _skip(xlen)

        # Read optional NAME field (zero-terminated)
        original_name = None
        # F_NAME: 0x08
        if flg & 0x08:
            name_bytes = bytearray()
            while True:
                b = f.read(1)
                if not b or b == b"\x00":
                    break
                name_bytes.extend(b)

            # Decode and sanitize the extracted name
            try:
                raw = name_bytes.decode("latin-1", errors="replace")
                base = os.path.basename(raw)
                original_name = santitize_component(base)
            except Exception:
                original_name = None

        # Return sanitized base name if available
        if original_name:
            return os.path.basename(original_name)

        return None

Function `safe_join()` joins multiple path components safely while preventing directory traversal.

1. Resolves the absolute path of the base directory.  
2. Sequentially joins all provided path components.  
3. Resolves the final path to eliminate symlinks or `..` segments.  
4. Verifies that the final path stays inside the base directory.  
5. Raises a `ValueError` if a path traversal attempt is detected.  
6. Returns the validated and resolved path.

In [None]:
def safe_join(base: Path, *paths: str) -> Path:
    """
    Join paths and ensure the result stays within base.
    Prevents path traversal attacks.
    """

    # Resolve base path
    base = base.resolve()
    final = base

    # Join additional path components
    for p in paths:
        final = final / str(p)

    # Resolve final absolute path
    final = final.resolve()

    # Block traversal outside base
    if not (str(final) == str(base) or str(final).startswith(str(base) + os.sep)):
        raise ValueError(f"Blocked path traversal attempt: {final}")

    return final

Function `safe_extract_tar()` securely extracts files from a TAR archive while preventing path traversal and handling unsafe entries.

1. Iterates through all archive members (or a provided subset).  
2. Skips unsafe file types — symlinks, device files, and FIFOs.  
3. Sanitizes each member’s relative path to prevent directory traversal.  
4. Ensures extraction paths stay inside the destination directory with `_safe_join()`.  
5. Uses `_dedupe_path()` to avoid filename collisions.  
6. Creates directories as needed and preserves permissions (best-effort).  
7. Streams file extraction safely in chunks to avoid memory issues.  
8. Skips members with no extractable content.

In [None]:
def safe_extract_tar(tar: tarfile.TarFile, dest: Path, members: Optional[Iterable[tarfile.TarInfo]] = None):
    """
    Secure tar extraction: ensure no member escapes dest and filenames are macOS-safe.
    Skips symlinks and special files.
    """
    for m in (members or tar):
        # Skip unsafe types
        if m.islnk() or m.issym() or m.ischr() or m.isblk() or m.isfifo():
            continue

        # Sanitize relative path (tar uses POSIX '/')
        clean_rel = sanitize_relative_path(m.name)
        if clean_rel == '':
            continue

        # Join & ensure containment
        target_path = safe_join(dest, clean_rel)

        # De-duplicate on collision
        target_path = dedupe_path(target_path, m.name)

        if m.isdir():
            target_path.mkdir(parents=True, exist_ok=True)
            # best-effort permissions on dirs
            try:
                os.chmod(target_path, m.mode)
            except Exception:
                pass
            continue

        # Ensure parent exists
        target_path.parent.mkdir(parents=True, exist_ok=True)

        # Extract regular file
        src = tar.extractfile(m)
        if src is None:
            continue  # skip specials with no extractable content
        with src, open(target_path, "wb") as out:
            # Stream copy in chunks
            while True:
                chunk = src.read(1024 * 1024)
                if not chunk:
                    break
                out.write(chunk)

        # Preserve permissions (best-effort)
        try:
            os.chmod(target_path, m.mode)
        except Exception:
            pass

Function `extract_tar_archive()` extracts any TAR-based archive (`.tar`, `.tar.gz`, `.tar.xz`, etc.) safely into a target directory.

1. Ensures the output directory exists.  
2. Opens the archive in auto-detect mode (`r:*`) to handle various compression formats.  
3. Uses `_safe_extract_tar()` to securely extract all members and prevent path traversal.  
4. Collects and returns a list of successfully extracted file paths.  
5. Skips unsafe or invalid entries during extraction.

In [None]:
def extract_tar_archive(path: Path, out_dir: Path) -> list[Path]:
    """
    Function `extract_tar_archive()` safely extracts any TAR-based archive into the specified output directory.
    """

    # Ensure output directory exists
    out_dir.mkdir(parents=True, exist_ok=True)
    extracted: list[Path] = []

    # Open archive and extract safely
    with tarfile.open(str(path), mode="r:*") as tar:
        members = tar.getmembers()
        safe_extract_tar(tar, out_dir, members)

        # Collect extracted files
        for m in members:
            clean_rel = sanitize_relative_path(m.name)
            if not clean_rel:
                continue
            target = (out_dir / clean_rel).resolve()
            if target.exists():
                extracted.append(target)

    return extracted

Function `extract_plain_gzip()` extracts a single GZIP (`.gz`) file safely into the target directory.

1. Ensures the output directory exists.  
2. Attempts to recover the original filename from the GZIP header.  
3. If unavailable, derives the name by removing the `.gz` extension or appending `.out`.  
4. Sanitizes and deduplicates the output filename for filesystem safety.  
5. Decompresses the GZIP content in chunks (1 MB each) to avoid memory overload.  
6. Returns the full path of the extracted file.

In [None]:
def extract_plain_gzip(path: Path, out_dir: Path) -> Path:
    """
    Function `extract_plain_gzip()` extracts a single GZIP file into the given output directory.
    """

    # Ensure output directory exists
    out_dir.mkdir(parents=True, exist_ok=True)

    # Determine output filename from header or input
    header_name = read_gzip_original_name(path)
    if header_name:
        out_name = header_name
    elif path.name.lower().endswith(".gz") and len(path.name) > 3:
        out_name = path.name[:-3]
    else:
        out_name = path.name + ".out"

    # Sanitize and deduplicate output path
    out_name = santitize_component(os.path.basename(out_name)) or "_"
    out_path = safe_join(out_dir, out_name)
    out_path = dedupe_path(out_path, out_name)

    # Decompress GZIP in chunks
    with gzip.open(path, "rb") as gz, open(out_path, "wb") as out:
        for chunk in iter(lambda: gz.read(1024 * 1024), b""):
            out.write(chunk)

    return out_path

Function `extract_until_done()` recursively extracts nested archives (`.tar`, `.tar.gz`, `.tgz`, `.gz`, etc.) into a single output directory.

1. Starts with the given file and tracks extraction depth.  
2. Detects and extracts TAR archives with `extract_tar_archive()`.  
3. Detects and extracts GZIP files with `extract_plain_gzip()`.  
4. Collects newly extracted files and checks if they need further extraction.  
5. Repeats recursively until no more archives remain or `max_depth` is reached.  
6. Stops and warns if the recursion depth limit is exceeded.  
7. Returns a list of all extracted files.

In [None]:
def extract_until_done(path: Path, out_dir: Path, max_depth: int = 5):
    """
    Function `extract_until_done()` recursively extracts nested compressed files until no more remain or max depth is reached.
    """

    extracted_files = []
    current_files = [path]
    depth = 0

    # Process files layer by layer
    while current_files and depth < max_depth:
        next_files = []
        for f in current_files:
            try:
                # Handle TAR archives
                if tarfile.is_tarfile(f):
                    print(f"[{depth}] Extracting TAR archive: {f}")
                    new_files = extract_tar_archive(f, out_dir)
                    extracted_files.extend(new_files)
                    next_files.extend(new_files)
                    continue

                # Handle GZIP files
                elif f.suffix == ".gz":
                    print(f"[{depth}] Extracting GZIP file: {f}")
                    new_file = extract_plain_gzip(f, out_dir)
                    extracted_files.append(new_file)
                    next_files.append(new_file)
                    continue

                # Skip unsupported file types
                else:
                    continue

            # Handle extraction failure
            except Exception as e:
                print(f"Failed to extract {f.name}: {e}")
                continue

        # Filter new files for further extraction
        current_files = [nf for nf in next_files if nf.suffix in [".gz", ".xz", ".tar", ".tgz"]]
        depth += 1

    # Stop if depth limit reached
    if depth >= max_depth:
        print("Stopped: maximum extraction depth reached.")
    else:
        print("Done: no more archives to extract.")

    return extracted_files

## **Delete**
Function `delete_non_tex_files()` cleans a directory by keeping only `.tex` and `.bib` files (or custom extensions).

1. Defaults to keeping `.tex` and `.bib` files if no extensions are provided.  
2. Verifies the target directory exists before proceeding.  
3. Recursively scans all files under `root`.  
4. Deletes any file whose extension isn’t in `keep_exts`.  
5. Tracks and reports the number of deleted and retained files.  
6. Prints a summary of the cleanup operation.

In [None]:
def delete_non_tex_files(root: Path, keep_exts=None):
    """
    Function `delete_non_tex_files()` cleans a directory by keeping only `.tex` and `.bib` files (or custom extensions).
    """

    # Set default extensions to keep
    if keep_exts is None:
        keep_exts = {".tex", ".bib"}

    # Skip if directory doesn't exist
    if not root.exists():
        print(f"⚠️ Directory {root} does not exist.")
        return

    count_deleted, count_kept = 0, 0

    # Iterate over all files recursively
    for p in root.rglob("*"):
        if p.is_file():
            ext = p.suffix.lower()

            # Delete unwanted files
            if ext not in keep_exts:
                try:
                    p.unlink()
                    count_deleted += 1
                except Exception:
                    pass
            else:
                count_kept += 1

    # Print cleanup summary
    print(f"Cleaned '{root.name}': kept {count_kept}, deleted {count_deleted} other files.")

Function `delete_empty_dirs()` recursively removes all empty directories inside the given root path.

1. Scans all subdirectories under `root` in reverse order (deepest first).  
2. Checks whether each directory is empty.  
3. Deletes empty folders safely using `rmdir()`.  
4. Counts and reports the number of removed directories.  
5. Prints a summary message if any were deleted.


In [None]:
def delete_empty_dirs(root: Path):
    """
    Function `delete_empty_dirs()` recursively deletes all empty directories under a given root path.
    """

    count = 0

    # Traverse directories in reverse (deepest first)
    for p in sorted(root.rglob("*"), reverse=True):
        # Remove if directory is empty
        if p.is_dir() and not any(p.iterdir()):
            try:
                p.rmdir()
                count += 1
            except Exception:
                pass

    # Report summary
    if count:
        print(f"Deleted {count} empty directories in '{root.name}'")

Function `clean_and_flatten_subdirs()` cleans and flattens subdirectories in a paper directory while preserving relevant `.tex` and `.bib` files.

1. Skips execution if the target directory doesn’t exist.  
2. Collects `.tex` and `.bib` files in the parent directory for comparison.  
3. For each subdirectory:
   - Deletes it if it has no `.tex`/`.bib` files.  
   - Deletes it if its contents match the parent’s `.tex`/`.bib` files.  
   - Otherwise, moves differing `.tex`/`.bib` files to the parent before deleting the subdirectory.  
4. Prints a summary showing how many files were moved and how many subdirectories were deleted.

In [None]:
def clean_and_flatten_subdirs(paper_dir: Path):
    """
    Function `clean_and_flatten_subdirs()` cleans and flattens subdirectories in a paper folder.
    """

    # Skip if directory doesn't exist or invalid
    if not paper_dir.exists() or not paper_dir.is_dir():
        return

    tex_bib_exts = {".tex", ".bib"}
    moved, deleted = 0, 0

    # Collect .tex/.bib files in parent
    parent_files = {p.name for p in paper_dir.glob("*") if p.suffix.lower() in tex_bib_exts}

    # Iterate through subdirectories
    for subdir in [d for d in paper_dir.iterdir() if d.is_dir()]:
        sub_files = {p.name for p in subdir.rglob("*") if p.suffix.lower() in tex_bib_exts}

        # Remove subfolder if empty
        if not sub_files:
            shutil.rmtree(subdir, ignore_errors=True)
            deleted += 1
            continue

        # Remove if duplicate content
        if sub_files == parent_files:
            shutil.rmtree(subdir, ignore_errors=True)
            deleted += 1
        else:
            # Move unique .tex/.bib files to parent
            for f in subdir.rglob("*"):
                if f.is_file() and f.suffix.lower() in tex_bib_exts:
                    dest = paper_dir / f.name
                    if not dest.exists():
                        shutil.move(str(f), str(dest))
                        moved += 1
            shutil.rmtree(subdir, ignore_errors=True)
            deleted += 1

    # Print cleanup summary
    print(f"Cleaned {paper_dir.name}: moved {moved} files, deleted {deleted} subdirectories.")

## **Crawling paper**

Function `is_gzip_valid()` checks whether a `.gz` file exists and is a valid, readable GZIP archive.

1. Returns `False` if the file doesn’t exist or is empty.  
2. Tries to read the file in chunks to verify integrity.  
3. Returns `True` if all reads succeed without error.  
4. Catches GZIP or I/O errors and returns `False` if invalid.

In [None]:
def is_gzip_valid(path: Path):
    """
    Function `is_gzip_valid()` checks whether a `.gz` file is valid and readable.
    """

    # Skip if missing or empty
    if not path.exists() or path.stat().st_size == 0:
        return False

    try:
        # Verify GZIP integrity by reading fully
        with gzip.open(str(path), "rb") as gz:
            while gz.read(1024 * 1024):
                pass
        return True

    # Invalid or corrupted GZIP
    except (IOError, EOFError, gzip.BadGzipFile):
        return False

Function `is_tar_gz_valid()`checks whether a `.tar.gz` file exists and is both a valid TAR and valid GZIP archive.

1. Returns `False` if the file doesn’t exist or is empty.  
2. Uses `tarfile.is_tarfile()` to verify TAR structure.  
3. Calls `is_gzip_valid()` to check GZIP integrity.  
4. Returns `True` only if both checks pass.


In [None]:
def is_tar_gz_valid(path: Path):
    """
    Function `is_tar_gz_valid()` verifies whether a `.tar.gz` file is valid.
    """

    # Skip if missing or empty
    if not path.exists() or path.stat().st_size == 0:
        return False

    # Check TAR structure
    if not tarfile.is_tarfile(str(path)):
        return False

    # Check GZIP integrity
    if not is_gzip_valid(path):
        return False

    return True

Function `load_src_checkpoint_state()` loads the most recent source-download checkpoint and returns progress information.

1. Returns default values if the checkpoint folder does not exist.  
2. Searches for files matching `src_checkpoint_*.json`.  
3. Selects the checkpoint with the highest numeric suffix.  
4. Loads the JSON content with `orjson`.  
5. Extracts success count, 404 failures, retryable failures, and last success ID.  
6. Computes the next checkpoint ID to use.  
7. Returns a tuple:  
   `(success_count, fail_404_list, fail_retryable_list, last_success_id, checkpoint_id)`

In [None]:
def load_src_checkpoint_state(checkpoint_folder: Path):
    """
    Return (success_count, fail_404_list, fail_retryable_list, last_success_id, checkpoint_id).
    """

    checkpoint_id = 1

    # Return defaults if folder missing
    if not checkpoint_folder or not checkpoint_folder.exists():
        return 0, [], [], None, checkpoint_id

    # Locate latest checkpoint file
    checkpoints = list(checkpoint_folder.glob("src_checkpoint_*.json"))
    if not checkpoints:
        return 0, [], [], None, checkpoint_id

    # Pick the checkpoint with the highest numeric suffix
    f_ckp = max(checkpoints, key=lambda p: int(p.stem.split('_')[-1].split('.')[0]))
    print(f"Resuming from checkpoint: {f_ckp}")
    checkpoint_id = int(f_ckp.stem.split('_')[-1]) + 1

    # Load checkpoint JSON
    with open(str(f_ckp), 'r') as f:
        ck = orjson.loads(f.read())
        if not ck:
            return (0, [], [], None, 1)

        # Return parsed state values
        return (
            int(ck.get("success_count", 0)),
            ck.get("fail_404_list", []),
            ck.get("fail_429/5xx_list", []),
            ck.get("last_success_id"),
            checkpoint_id
        )

Function `save_src_checkpoint()` saves a detailed checkpoint summarizing progress, performance, and resource usage for the source-download process.

1. Computes the total session runtime from the start timestamp.  
2. Builds a checkpoint dictionary containing:  
   - success count and failure lists,  
   - last successfully downloaded ID,  
   - timing statistics (session time, avg time per file),  
   - memory usage (peak & average RSS),  
   - disk usage (peak used & bytes written),  
   - custom note.  
3. Saves the checkpoint atomically using `save_state()`.  
4. Prints a confirmation message after writing the checkpoint file.

In [None]:
def save_src_checkpoint(folder: Path, checkpoint_id: int,
                        success_count: int, fail_404_list: list, fail_retryable_list: list,
                        last_id: Optional[str], process_start_ts: float,
                        total_processed_files: int, mem: MemoryTracker,
                        disk: DiskTracker, note):
    # Compute session runtime
    session_time = time.time() - process_start_ts

    # Build checkpoint state dictionary
    state = {
        "success_count": success_count,
        "fail_404_list": fail_404_list,
        "fail_429/5xx_list": fail_retryable_list,
        "last_success_id": last_id,
        "session_time": round(session_time, 3),
        "average_time_per_file": round(session_time / total_processed_files, 3)
                                if total_processed_files > 0 else 0.0,
        "RAM peak": display_bytes(mem.peak_rss_bytes()),
        "Average RAM": display_bytes(mem.avg_rss_bytes()),
        "Max disk used": display_bytes(disk.peak_used_bytes),
        "Disk added": display_bytes(disk.bytes_written),
        "Note": note,
    }

    # Save checkpoint data atomically
    save_state(folder / f'src_checkpoint_{checkpoint_id}.json', state)

    # Log confirmation
    print(f"[checkpoint] Saved checkpoint #{checkpoint_id}.")

Function `download_source_arXiv()` downloads an arXiv source file from a specified mirror using `urllib`.

1. Constructs the full download URL from protocol, mirror, and version.  
2. Builds the destination file path inside `output_dir`.  
3. Uses `urllib.request.urlretrieve()` to download the file.  
4. Returns the local path of the downloaded file.

In [None]:
def download_source_arXiv(version: str, file_name: str, output_dir: str = '.',
                          http_protocol: str = 'https', arXiv_mirror: str = 'export.arxiv.org/src'):
    """
    Function `download_source_arXiv()` downloads an arXiv source file from a mirror.
    """

    # Build download URL and destination path
    url = f"{http_protocol}://{arXiv_mirror}/{version}"
    dest = f"{output_dir}/{file_name}"

    # Download via urllib and return written file path
    written_path, _ = urllib.request.urlretrieve(url, dest)
    return written_path

Function `crawl_arXiv_paper_src()` downloads, extracts, cleans, and checkpoints arXiv paper source files in a fully resumable and fault-tolerant pipeline.

1. **Initialization & resume**  
   - Ensures output folder exists.  
   - Loads previous checkpoint (success, failures, last ID).  
   - Resumes from `last_success_id` if available.  
   - Initializes `MemoryTracker`, `DiskTracker`, and runtime timestamp.

2. **Resilient shutdown handling**  
   - Registers `atexit` and signal handlers (SIGINT/SIGTERM).  
   - On interruption, saves a full checkpoint snapshot before exiting.

3. **Main processing loop**  
   - Iterates through remaining IDs plus retryable ones.  
   - For each ID, determines and continues its current processing stage using its per-file checkpoint JSON (`file_state`):  
     - **2** → fully processed → skip  
     - **1** → extracted, needs cleanup → cleanup  
     - **0** → downloaded, needs extraction → extract  
     - **-1** → new ID → download → extract → cleanup  

4. **Per-ID workflow**  
   - Creates required directories.  
   - Downloads source via `download_source_arXiv()`.  
   - Detects correct file extension with `detect_with_filetype()`.  
   - Extracts nested archives via `extract_until_done()`.  
   - Cleans folders (`clean_and_flatten_subdirs`, `delete_non_tex_files`, `delete_empty_dirs`).  
   - Records per-file process time, sizes, and state transitions.  

5. **Resource tracking**  
   - Regular memory and disk usage sampling.  
   - Tracks cumulative process statistics.

6. **Failure handling**  
   - 404 errors → added to `fail_404_list`.  
   - All other download/process errors → added to retryable list.

7. **Finalization**  
   - Saves a final aggregate checkpoint at the end of the run.  
   - Ensures a final snapshot is always written even on exceptions.

In [None]:
def crawl_arXiv_paper_src(version_list: List[str],
                          output_folder: Path,
                          query_delay: float = 3,
                          retries: int = 3):

    ensure_dir(output_folder)
    last_idx = 0

    # Resume aggregate stats
    _, fail_404_list, fail_retryable_list, last_success_id, checkpoint_id = load_src_checkpoint_state(output_folder)
    curr_success_count = 0
    curr_fail_404_count = 0
    curr_fail_retryable_count = 0

    # Resume from last id
    if last_success_id:
        last_idx = version_list.index(last_success_id)

    mem = MemoryTracker()
    disk = DiskTracker(output_folder)
    process_start = time.time()

    # Best-effort final snapshot
    def final_snapshot(note="atexit/signal snapshot"):
        try:
            mem.sample()
            disk.sample()
            curr_total_done = curr_success_count + curr_fail_404_count + curr_fail_retryable_count
            save_src_checkpoint(output_folder, checkpoint_id, curr_success_count, fail_404_list, fail_retryable_list,
                                last_success_id, process_start, curr_total_done, mem, disk, note)
        except Exception as e:
            pass

    atexit.register(final_snapshot)

    def signal_handler(signum, frame):
        print(f"[signal] Caught {signum}, flushing checkpoint...")
        final_snapshot(note=f"signal {signum}")
        # Re-raise default behavior after saving
        signal.signal(signum, signal.SIG_DFL)
        os.kill(os.getpid(), signum)

    for sig in (signal.SIGINT, signal.SIGTERM):
        try:
            signal.signal(sig, signal_handler)
        except Exception:
            pass

    # If resuming, skip any already-successful IDs by checking their checkpoint
    # (We still process the full list but skip items with state >= 1 or with a present final artifact.)

    run_list = version_list[last_idx:] + fail_retryable_list
    try:
        for id in run_list:
            mem.sample()  # periodic memory sample
            disk.sample()
            short_id = id.split('v')[0].replace('.', '-')

            # Prepare per-id folder
            id_folder = output_folder / short_id.replace(".", "-")
            extracted_folder = id_folder / 'tex' / id.replace('.', '-')
            status_file = id_folder / f'{id}.json'

            if status_file.exists():
                with open(str(status_file), 'r') as f:
                    file_ckp = orjson.loads(f.read())

                if file_ckp['file_state'] == 2:
                    print(f'Source of {id} is already processed. Proceed to next version...')
                    last_success_id = id
                    disk.sample()
                    continue

                elif file_ckp['file_state'] == 1:
                    file_process_start = time.time()
                    clean_and_flatten_subdirs(extracted_folder)
                    delete_non_tex_files(extracted_folder)
                    delete_empty_dirs(extracted_folder)
                    mem.sample()
                    disk.sample()
                    # Start handling shutdown from here
                    file_ckp['process_time'] += time.time() - file_process_start
                    file_ckp['file_state'] = 2
                    file_ckp['processed_size'] = get_folder_size(extracted_folder)
                    save_state(id_folder / f'{id}.json', file_ckp)
                    last_success_id = id
                    disk.sample()
                    continue

                elif file_ckp['file_state'] == 0:
                    extracted_start = time.time()
                    archive_path = list(extracted_folder.glob(f'{id}.*'))[0]
                    extract_until_done(archive_path, extracted_folder)
                    mem.sample()
                    disk.sample()
                    archive_path.unlink(missing_ok=True)
                    disk.sample()
                    file_process_start = time.time()
                    # Start handling shutdown from here
                    file_ckp['process_time'] += file_process_start - extracted_start
                    file_ckp['file_state'] = 1
                    save_state(id_folder / f'{id}.json', file_ckp)
                    disk.sample()

                    clean_and_flatten_subdirs(extracted_folder)
                    delete_non_tex_files(extracted_folder)
                    delete_empty_dirs(extracted_folder)
                    mem.sample()
                    disk.sample()
                    # Start handling shutdown from here
                    file_ckp['process_time'] += time.time() - file_process_start
                    file_ckp['file_state'] = 2
                    file_ckp['processed_size'] = get_folder_size(extracted_folder)
                    save_state(id_folder / f'{id}.json', file_ckp)
                    last_success_id = id
                    disk.sample()
                    continue

            else:
                file_ckp = {
                    'file_state': -1,
                    'extension': None,
                    'process_time': 0,
                    'file_size': 0,
                    'processed_size': 0,
                }
                time.sleep(query_delay)
                try:
                    ensure_dir(extracted_folder)
                    print(f'Start downloading source for {id}...')
                    mem.sample()
                    disk.sample()
                    download_source_arXiv(id, f'{id}.tar.gz', str(extracted_folder))
                    disk.sample()
                    last_success_id = id
                    print(f'Finished downloading source for {id}...')
                    # Start handling shutdown from here
                    file_ckp['file_state'] = 0
                    suitable_ext, _ = detect_with_filetype(extracted_folder / f'{id}.tar.gz')
                    file_path = str(extracted_folder / f'{id}.tar.gz')
                    if suitable_ext != 'gz':
                        os.rename(file_path, str(extracted_folder / f'{id}.{suitable_ext}'))
                        file_path = str(extracted_folder / f'{id}.{suitable_ext}')
                    elif not tarfile.is_tarfile(str(extracted_folder / f'{id}.tar.gz')):
                        os.rename(file_path, str(extracted_folder / f'{id}.gz'))
                        file_path = str(extracted_folder / f'{id}.gz')
                    else:
                        suitable_ext = 'tar.gz'
                    file_ckp['extension'] = suitable_ext
                    file_ckp['file_size'] = Path(file_path).stat().st_size
                    extracted_start = time.time()
                    file_ckp['process_time'] = extracted_start - process_start
                    curr_success_count += 1
                    save_state(id_folder / f'{id}.json', file_ckp)
                    disk.sample()

                    archive_path = extracted_folder / f'{id}.{suitable_ext}'
                    extract_until_done(archive_path, extracted_folder)
                    mem.sample()
                    disk.sample()
                    archive_path.unlink(missing_ok=True)
                    disk.sample()
                    file_process_start = time.time()
                    # Start handling shutdown from here
                    file_ckp['process_time'] += file_process_start - extracted_start
                    file_ckp['file_state'] = 1
                    save_state(id_folder / f'{id}.json', file_ckp)
                    disk.sample()

                    clean_and_flatten_subdirs(extracted_folder)
                    delete_non_tex_files(extracted_folder)
                    delete_empty_dirs(extracted_folder)
                    mem.sample()
                    disk.sample()
                    # Start handling shutdown from here
                    file_ckp['process_time'] += time.time() - file_process_start
                    file_ckp['file_state'] = 2
                    file_ckp['processed_size'] = get_folder_size(extracted_folder)
                    save_state(id_folder / f'{id}.json', file_ckp)
                    disk.sample()

                except Exception as e:
                    if isinstance(e, requests.exceptions.HTTPError):
                        print(e.response)
                        if e.response.status_code == 404:
                            curr_fail_404_count += 1
                            if id not in fail_404_list:
                                fail_404_list.append(id)
                        else:
                            curr_fail_retryable_count += 1
                            if id not in fail_retryable_list:
                                fail_retryable_list.append(id)
                    else:
                        print(e)
                        curr_fail_retryable_count += 1
                        if id not in fail_retryable_list:
                            fail_retryable_list.append(id)

        mem.sample()
        disk.sample()
        curr_total_done = curr_success_count + curr_fail_404_count + curr_fail_retryable_count
        save_src_checkpoint(output_folder, checkpoint_id, curr_success_count, fail_404_list, fail_retryable_list,
                            last_success_id, process_start, curr_total_done, mem, disk, 'end-of-process')
    finally:
        # Final snapshot on any exit path
        final_snapshot(note="final")

### **Run program**

In [None]:
crawl_arXiv_metadata(START_MONTH, START_ID, END_MONTH, END_ID, folder = metadatas_dir)

version_list = get_version_list_from_metadata(metadatas_dir)
print(f"Total versions to download: {len(version_list)}")

crawl_arXiv_paper_src(version_list, papers_dir, query_delay = 3, retries = 3)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Cleaned '2211-14748v1': kept 3, deleted 6 other files.
Start downloading source for 2211.14749v1...
Finished downloading source for 2211.14749v1...
[0] Extracting GZIP file: workdir/paper/2211-14749/tex/2211-14749v1/2211.14749v1.gz
Done: no more archives to extract.
Cleaned 2211-14749v1: moved 0 files, deleted 0 subdirectories.
Cleaned '2211-14749v1': kept 1, deleted 0 other files.
Start downloading source for 2211.14750v1...
Finished downloading source for 2211.14750v1...
[0] Extracting TAR archive: workdir/paper/2211-14750/tex/2211-14750v1/2211.14750v1.tar.gz
Done: no more archives to extract.
Cleaned 2211-14750v1: moved 0 files, deleted 1 subdirectories.
Cleaned '2211-14750v1': kept 2, deleted 7 other files.
Start downloading source for 2211.14751v1...
Finished downloading source for 2211.14751v1...
[0] Extracting TAR archive: workdir/paper/2211-14751/tex/2211-14751v1/2211.14751v1.tar.gz
Done: no more archives to extra