In [1]:
import aiohttp
import requests
import asyncio
from pathlib import Path
from asyncio import Semaphore
import numpy as np

ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36'

from aiohttp_socks import ProxyConnector
proxies = ['socks5://127.0.0.1:1984','socks5://127.0.0.1:1986']
connectors = [ProxyConnector.from_url(p) for p in proxies] + [None]


sesss = [aiohttp.ClientSession(headers={'User-Agent': ua},connector=c,trust_env=True) for c in connectors]

import json
import re
import os

from tqdm import tqdm

import zstd

In [2]:
subjects = ["Experiment-HEP", "Phenomenology-HEP", "Experiment-Nucl", "Instrumentation", "Theory-Nucl", "Astrophysics", "Lattice", "Theory-HEP", "Other", "General Physics", "Computing", "Accelerators", "Data Analysis and Statistics", "Gravitation and Cosmology", "Quantum Physics", "Condensed Matter", "Math and Math Physics"]

document_types = ["published", "article", "conference paper", "review", "introductory", "lectures", "book chapter", "note", "thesis", "book", "report", "proceedings"]

arxiv_categories = ["hep-ex", "hep-ph", "nucl-ex", "physics.ins-det", "nucl-th", "hep-lat", "astro-ph.CO", "astro-ph.HE", "astro-ph.IM", "physics.data-an", "hep-th", "physics.acc-ph", "quant-ph", "physics.atom-ph", "gr-qc", "cs.LG", "astro-ph.SR", "astro-ph.GA", "physics.comp-ph", "stat.ML"]

In [3]:
import functools
def async_retry(timeout: float = 1, backoff: float = 2, max_retry: int = 5):
    def inner(func):
        _timeout = timeout

        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            timeout = _timeout
            for i in range(max_retry):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    funcname = func.__name__
                    print(f'\033[33m Caught {e} in {funcname}, retrying in {timeout} seconds...\033[0m')
                    await asyncio.sleep(timeout)
                timeout *= backoff
            raise Exception('Too many retries')
        return wrapper
    return inner



In [4]:
def gen_params(subject=None, document_type=None, arxiv_category=None):
    
    url = 'https://inspirehep.net/api/literature'
    params = {
        'sort': 'mostrecent',
        'size':10,
        'page':1,
    }

    if subject:
        assert subject in subjects
        params['subject'] = subject
    if document_type:
        assert document_type in document_types
        params['doc_type'] = document_type
    if arxiv_category:
        assert arxiv_category in arxiv_categories
        params['arxiv_categories'] = arxiv_category
    
    return url, params

In [5]:
# url, params = gen_params(subject='Experiment-HEP', document_type='thesis')
# path = Path('/tmp/data/' + '_'.join([f'{k}={v}' for k,v in params.items()]))

In [6]:
path = Path('../../../data/inspirehep')
# path = Path('/tmp/data/inspirehep')

In [7]:
del_keys = ['references', 'referenced_authors_bais', 'authors', '$schema', 'external_system_identifiers', 'documents']

def to(s:str):
    def f(X):
        return [x[s] for x in X if s in x]
    return f

xest = {
    'keywords': to('value'),
    'inspire_categories' : to('term'),
    'supervisors': lambda X: [(x['full_name'], x['recid']) for x in X if 'full_name' in x and 'recid' in x],
    'urls': to('value'),
    'first_author': lambda X: (X.get('full_name', None), X.get('recid', None)),
    'titles': to('title'),
    'abstracts': to('value'),
    'imprints': to('date'),
    'report_numbers': to('value'),
}

def cleanse(data:dict):
    for k in del_keys:
        if k in data:
            del data[k]
    for k, f in xest.items():
        if k in data:
            data[k] = f(data[k])
    return data

In [8]:
async def fetch(sess, url, params):
    async with sess.get(url, params=params) as resp:
        if resp.status == 400:
            return None
        assert resp.status == 200
        return await resp.json()

@async_retry()
async def load_balanced_fetch(sesss:list[aiohttp.ClientSession], p, q:asyncio.Queue, counter:list, pbar:tqdm, sema):
    async with sema:
        url, params = p
        idx = np.argmin(counter)
        counter[idx] += 1
        r = await fetch(sesss[idx], url, params)
        counter[idx] -= 1
        if r is None:
            return None

        page = params['page']
        if page == 1:
            total_pages = r['hits']['total']
            n_pages = int(np.ceil(total_pages / params['size']))
            dd = 0
            for i in range(2, n_pages+1):
                q.put_nowait((url, {**params, 'page': i}))
                dd += 1
            pbar.total += dd
            pbar.refresh()
    q.task_done()
    return r


In [9]:
async def fetch_all(sesss:list[aiohttp.ClientSession], q:asyncio.Queue, counter:list, sema:Semaphore):
    tasks = set()
    with tqdm(position=0, desc='requests', total=len(q._queue)) as pbar: # type: ignore
        
        for _ in range(64):
            if q.empty():
                break
            p = q.get_nowait()
            tasks.add(asyncio.create_task(load_balanced_fetch(sesss,p,q,counter,pbar,sema)))
        while tasks:
            finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
            for future in finished:
                tasks.remove(future)
                if not q.empty():
                    p = q.get_nowait()
                    tasks.add(asyncio.create_task(load_balanced_fetch(sesss,p,q,counter,pbar,sema)))
                data = await future
                if data is None:
                    continue
                for d in data['hits']['hits']:
                    if 'metadata' not in d:
                        print(d.keys())
                        continue
                    meta = cleanse(d['metadata'])
                    meta['id'] = d.get('id')
                    yield meta
                pbar.update()


async def save_all(sesss:list[aiohttp.ClientSession], q:asyncio.Queue, sema:Semaphore, counter:list, path:Path, chunk_size=1000):
    path.mkdir(parents=True, exist_ok=True)
    buffer = []
    i = 0
# with tqdm(position=1) as pbar:
    seen = set()
    async for data in fetch_all(sesss, q, counter, sema):
        _id = data.get('id')
        if _id is None:
            print(data, 'wtf?')
            continue
        if _id in seen:
            continue
        buffer.append(data)
        seen.add(_id)
        # pbar.update()
        if len(buffer) >= chunk_size:
            with open(path / f'{i}.json.zst', 'wb') as f:
                f.write(zstd.compress(json.dumps(buffer, separators=(',', ':')).encode('utf-8'), 22))
            buffer = []
            i += 1
    if buffer:
        with open(path / f'{i}.json.zst', 'wb') as f:
            f.write(zstd.compress(json.dumps(buffer, separators=(',', ':')).encode('utf-8'), 22))
        buffer = []
        i += 1

In [10]:
subjects = ["Experiment-HEP", "Phenomenology-HEP", "Experiment-Nucl", "Instrumentation", "Theory-Nucl", "Astrophysics", "Lattice", "Theory-HEP", "Other", "General Physics", "Computing", "Accelerators", "Data Analysis and Statistics", "Gravitation and Cosmology", "Quantum Physics", "Condensed Matter", "Math and Math Physics"]


In [11]:
url, _ = gen_params()
q = asyncio.Queue()
for s in subjects:
    url, params = gen_params(subject=s, document_type='thesis')
    params['earliest_date'] = '1500--1974'
    q.put_nowait((url, params))
    for y in range(1975, 2024):
        url, params = gen_params(subject=s, document_type='thesis')
        params['earliest_date'] = f'{y}--{y}'
        q.put_nowait((url, params))
    #     break
    # break


In [12]:
await save_all(sesss, q, Semaphore(24), [0,0,0], path/'thesis', chunk_size=8192)

requests: 100%|██████████| 6421/6421 [13:45<00:00,  7.78it/s]  


In [13]:
url, _ = gen_params()
q = asyncio.Queue()
for s in subjects:
    url, params = gen_params(subject=s, document_type='published')
    params['earliest_date'] = '1500--1974'
    q.put_nowait((url, params))
    for y in range(1975, 2024):
        url, params = gen_params(subject=s, document_type='published')
        params['earliest_date'] = f'{y}--{y}'
        q.put_nowait((url, params))
    #     break
    # break


In [14]:
await save_all(sesss, q, Semaphore(24), [0,0,0], path/'published', chunk_size=8192)

requests:   4%|▍         | 4430/104883 [06:18<18:23:05,  1.52it/s] 

[33m Caught Response payload is not completed in load_balanced_fetch, retrying in 1 seconds...[0m


requests:   4%|▍         | 4435/104883 [06:18<9:18:16,  3.00it/s] 

[33m Caught Response payload is not completed in load_balanced_fetch, retrying in 1 seconds...[0m


requests:   5%|▌         | 5335/104883 [08:48<38:26:28,  1.39s/it] 

[33m Caught Response payload is not completed in load_balanced_fetch, retrying in 1 seconds...[0m


requests:   5%|▌         | 5433/104883 [08:50<45:39, 36.30it/s]   

[33m Caught Response payload is not completed in load_balanced_fetch, retrying in 1 seconds...[0m


requests:  12%|█▏        | 13070/104883 [16:16<20:42:43,  1.23it/s]

[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m


requests:  23%|██▎       | 24380/104883 [31:55<10:51:56,  2.06it/s] 

[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m


requests:  24%|██▍       | 25389/104883 [33:28<12:06:16,  1.82it/s]

[33m Caught [Errno 32] Broken pipe in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught [Errno 32] Broken pipe in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught [Errno 32] Broken pipe in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m


requests:  43%|████▎     | 45012/104883 [49:07<26:12:06,  1.58s/it]

[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m
[33m Caught Server disconnected in load_balanced_fetch, retrying in 1 seconds...[0m


requests:  51%|█████▏    | 53820/104883 [1:00:25<9:31:31,  1.49it/s] 

[33m Caught Response payload is not completed in load_balanced_fetch, retrying in 1 seconds...[0m


requests: 100%|█████████▉| 104810/104883 [1:36:38<00:04, 18.08it/s]  
