In [0]:
import apache_beam as beam
import arxiv 

from apache_beam.dataframe.convert import to_dataframe
from datetime import datetime

In [0]:
query_keywords = [
    "\"image segmentation\"",
    "\"self-supervised learning\"",
    "\"representation learning\"",
    "\"image generation\"",
    "\"object detection\"",
    "\"transfer learning\"",
    "\"transformers\"",
    "\"adversarial training",
    "\"generative adversarial networks\"",
    "\"model compressions\"",
    "\"image segmentation\"",
    "\"few-shot learning\"",
    "\"natural language\"",
    "\"graph\"",
    "\"colorization\"",
    "\"depth estimation\"",
    "\"point cloud\"",
    "\"structured data\"",
    "\"optical flow\"",
    "\"reinforcement learning\"",
    "\"super resolution\"",
    "\"attention\"",
    "\"tabular\"",
    "\"unsupervised learning\"",
    "\"semi-supervised learning\"",
    "\"explainable\"",
    "\"radiance field\"",
    "\"decision tree\"",
    "\"time series\"",
    "\"molecule\"",
    "\"physics\"",
    "\"graphics\"",
    "\"ray tracing\"",
    "\"optical flow\"",
    "\"photogrametry\"",
]

In [0]:
import typing


class ArxivEntries(typing.NamedTuple):
    terms: typing.List[str]
    titles: str
    abstracts: str

In [0]:
client = arxiv.Client(num_retries=20, page_size=500)


def query_with_keywords(query):
    search = arxiv.Search(
        query=query, max_results=20000, sort_by=arxiv.SortCriterion.LastUpdatedDate,
    )

    for res in client.results(search):
        if res.primary_category in ["cs.CV", "stat.ML", "cs.LG"]:
            yield beam.Row(
                terms=res.categories, titles=res.title, abstracts=res.summary
            )

In [0]:
%%writefile setup.py

import setuptools


NAME = "gather_arxiv_data"
VERSION = "0.1.0"
REQUIRED_PACKAGES = [
    "apache_beam==2.32.0",
    "pandas==1.3.2",
    "arxiv==1.4.2",
    "google_cloud_storage==1.42.1",
]


setuptools.setup(
    name=NAME,
    version=VERSION,
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
)

In [0]:
gcs_bucket_name = "arxiv-data-nlp"
gcp_project = "####" # Specify this.

pipeline_args = {
    "job_name": f'arxiv-data-{datetime.utcnow().strftime("%y%m%d-%H%M%S")}',
    "num_workers": "4",
    "runner": "DataflowRunner",
    "setup_file": "./setup.py",
    "project": gcp_project,
    "region": "us-central1",
    "gcs_location": f"gs://{gcs_bucket_name}",
    "temp_location": f"gs://{gcs_bucket_name}/temp",
    "staging_location": f"gs://{gcs_bucket_name}/staging",
    "save_main_session": "True",
}

# Convert the dictionary to a list of (argument, value) tuples and then flatten the list.
pipeline_args = [(f"--{k}", v) for k, v in pipeline_args.items()]
pipeline_args = [x for y in pipeline_args for x in y]

In [0]:
with beam.Pipeline(argv=pipeline_args) as pipeline:
    keywords = pipeline | beam.Create(query_keywords)
    records = keywords | beam.FlatMap(query_with_keywords).with_output_types(ArxivEntries)
    _ = to_dataframe(records).to_csv(
        f"gs://{gcs_bucket_name}/arxiv/sample.csv", index=False
    )

In [0]:
!gsutil ls -R gs://{gcs_bucket_name}/arxiv/

In [0]:
!gsutil cp gs://arxiv-data-nlp/arxiv/sample.csv-00000-of-00020 .

In [0]:
import pandas as pd


df = pd.read_csv("sample.csv-00000-of-00020")
df.head()

## Acknowledgements

* [Lukas Schwab](https://github.com/lukasschwab)
* [Robert Bradshaw](https://www.linkedin.com/in/robert-bradshaw-1b48a07/)