Library

In [2]:
import arxiv
import os
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from collections import deque

In [None]:
download_lock = Lock()
stats = {
    "downloaded": 0, 
    "failed": 0
}

def get_source_all_versions(arxiv_id, save_dir="./sources"):
    os.makedirs(save_dir, exist_ok=True)
    
    client = arxiv.Client()
    versions_downloaded = 0
    latest_version = 0

    try:
        search_base = arxiv.Search(id_list=[arxiv_id])
        base_paper = next(client.results(search_base))

        entry_id_url = base_paper.entry_id
        match = re.search(r'v(\d+)$', entry_id_url)

        if not match:
            latest_version = 1
        else:
            latest_version = int(match.group(1))
        
    except Exception as e:
        print(f"ERROR: No source found for {arxiv_id}: {e}")
        return False
    
    for v in range(1, latest_version + 1):
        versioned_id = f"{arxiv_id}v{v}"

        try:
            search_version = arxiv.Search(id_list=[versioned_id])
            paper_version = next(client.results(search_version))
            
            file_name = f"{versioned_id}.tar.gz"
            print(f"\tDownloading: {file_name}...")

            paper_version.download_source(dirpath=save_dir, filename=file_name)

            versions_downloaded += 1
        except Exception as e:
            print(f"ERROR when downloading {versioned_id}: {e}")
            continue
    
    if versions_downloaded == 0 and latest_version > 0:
        print(f"ERROR: Found v{latest_version} but could not download any versions.")
        return False
    elif versions_downloaded == latest_version:
        print(f"Successfully downloaded: {versions_downloaded} / {latest_version} versions for {arxiv_id}.")
        return True
    return False

def download_single_paper(arxiv_id, save_dir):
    target_dir = os.path.join(save_dir, arxiv_id)
    if os.path.exists(target_dir):
        print(f"Skipped {arxiv_id}: already exists.")
        return (arxiv_id, True)
    
    success = get_source_all_versions(arxiv_id, save_dir)
    
    with download_lock:
        if success:
            stats["downloaded"] += 1
        else:
            stats["failed"] += 1
    
    return (arxiv_id, success)

def download_arxiv_range(start_month, start_id, end_month, end_id, save_dir="./sources", max_parallels=20):
    """
    Download arxiv papers in parallel with sliding window approach
    
    Args:
        start_month: Starting month in format 'YYYY-MM'
        start_id: Starting paper ID number
        end_month: Ending month in format 'YYYY-MM'
        end_id: Ending paper ID number
        save_dir: Directory to save downloaded sources
        max_parallels: Number of parallel download threads (default: 20)
    """

    start_year, start_month = start_month.split('-')
    end_year, end_month = end_month.split('-')

    start_prefix = start_year[2:] + start_month
    end_prefix = end_year[2:] + end_month

    stats["downloaded"] = 0
    stats["failed"] = 0
    
    max_consecutive_failures = 3

    if start_month == end_month:
        print(f"Phase: Downloading all from {start_month} ({start_id} → {end_id})...")
        
        arxiv_ids = [f"{start_prefix}.{current_id:05d}" for current_id in range(start_id, end_id + 1)]
        
        with ThreadPoolExecutor(max_workers=max_parallels) as executor:
            future_to_id = {
                executor.submit(download_single_paper, arxiv_id, save_dir): arxiv_id 
                for arxiv_id in arxiv_ids
            }
            
            completed = 0
            for future in as_completed(future_to_id):
                arxiv_id = future_to_id[future]
                completed += 1
                
                try:
                    paper_id, success = future.result()
                    print(f"[{completed}/{len(arxiv_ids)}] {paper_id}")
                except Exception as e:
                    print(f"[{completed}/{len(arxiv_ids)}] {arxiv_id} - Exception: {e}")
                    with download_lock:
                        stats["failed"] += 1
        
        print(f"Finished {start_month}.\n")
    else:
        # Phase 1: Download from start_month with sliding window
        print(f"Phase 1: Downloading from {start_month} starting at ID {start_id}...")
        
        current_id = start_id
        failed_consecutive = 0
        completed_count = 0
        
        with ThreadPoolExecutor(max_workers=max_parallels) as executor:
            active_futures = {}
            results_buffer = {} 
            next_id_to_process = start_id
            
            while failed_consecutive < max_consecutive_failures:
                while len(active_futures) < max_parallels and failed_consecutive < max_consecutive_failures:
                    arxiv_id = f"{start_prefix}.{current_id:05d}"
                    future = executor.submit(download_single_paper, arxiv_id, save_dir)
                    active_futures[future] = (arxiv_id, current_id)
                    current_id += 1
                
                if active_futures:
                    for done_future in as_completed(list(active_futures.keys())):
                        arxiv_id, id_num = active_futures.pop(done_future)
                        
                        try:
                            paper_id, success = done_future.result()
                            results_buffer[id_num] = (paper_id, success)
                        except Exception as e:
                            print(f"X {arxiv_id} - Exception: {e}")
                            results_buffer[id_num] = (arxiv_id, False)
                        
                        while next_id_to_process in results_buffer:
                            paper_id, success = results_buffer.pop(next_id_to_process)
                            completed_count += 1
                            print(f"[Phase 1: {completed_count}] {paper_id}")
                            
                            if success:
                                failed_consecutive = 0
                            else:
                                failed_consecutive += 1
                            
                            next_id_to_process += 1
                            
                            if failed_consecutive >= max_consecutive_failures:
                                print(f"\nReached {max_consecutive_failures} consecutive failures.")
                                for remaining_future in active_futures.keys():
                                    remaining_future.cancel()
                                active_futures.clear()
                                break

                        if failed_consecutive >= max_consecutive_failures:
                            break

                if not active_futures and failed_consecutive < max_consecutive_failures:
                    break

        
        print(f"Completed {start_month}. No more papers found after {max_consecutive_failures} consecutive failures.\n")
        
        # Phase 2: Download from end_month
        print(f"Phase 2: Downloading from {end_month} starting at ID 1, going forward to ID {end_id}...")
        
        phase2_ids = [f"{end_prefix}.{current_id:05d}" for current_id in range(1, end_id + 1)]
        
        with ThreadPoolExecutor(max_workers=max_parallels) as executor:
            future_to_id = {
                executor.submit(download_single_paper, arxiv_id, save_dir): arxiv_id 
                for arxiv_id in phase2_ids
            }
            
            completed = 0
            for future in as_completed(future_to_id):
                arxiv_id = future_to_id[future]
                completed += 1
                
                try:
                    paper_id, success = future.result()
                    print(f"[Phase 2: {completed}/{len(phase2_ids)}] {paper_id}")
                except Exception as e:
                    print(f"[Phase 2: {completed}/{len(phase2_ids)}] {arxiv_id} - Exception: {e}")
                    with download_lock:
                        stats["failed"] += 1
        
        print(f"Reached end ID {end_id} in {end_month}.")
        
        print(f"\n{'='*50}")
        print(f"Download complete!")
        print(f"Successfully downloaded: {stats['downloaded']} papers")
        print(f"Files saved to: {os.path.abspath(save_dir)}")

download_arxiv_range(
    start_month="2023-05",
    start_id=9595,
    end_month="2023-05", 
    end_id=14596,
    save_dir="./sources"
)

Phase: Downloading all from 05 (9595 → 14596)...
	Downloading: 2305.09595v1.tar.gz...
	Downloading: 2305.09605v1.tar.gz...
	Downloading: 2305.09600v1.tar.gz...
	Downloading: 2305.09603v1.tar.gz...
	Downloading: 2305.09606v1.tar.gz...
	Downloading: 2305.09607v1.tar.gz...
	Downloading: 2305.09613v1.tar.gz...
	Downloading: 2305.09598v1.tar.gz...
	Downloading: 2305.09614v1.tar.gz...
	Downloading: 2305.09599v1.tar.gz...
	Downloading: 2305.09601v1.tar.gz...
	Downloading: 2305.09609v1.tar.gz...
	Downloading: 2305.09596v1.tar.gz...
	Downloading: 2305.09611v1.tar.gz...
	Downloading: 2305.09602v1.tar.gz...
	Downloading: 2305.09608v1.tar.gz...
	Downloading: 2305.09604v1.tar.gz...
	Downloading: 2305.09612v1.tar.gz...
	Downloading: 2305.09610v1.tar.gz...
	Downloading: 2305.09597v1.tar.gz...
Successfully downloaded: 1 / 1 versions for 2305.09609.
Successfully downloaded: 1 / 1 versions for 2305.09613.
Successfully downloaded: 1 / 1 versions for 2305.09604.
Successfully downloaded: 1 / 1 versions for