# Building DataLake in MongoDB

In [2]:
import polars as pl

df = (
    pl.read_parquet("arxiv-metadata-oai-snapshot.parquet")
    .with_columns(pl.col("update_date").str.to_datetime(r"%Y-%m-%d"))
    .rename(lambda name: "arxiv_" + name.replace("-", "_").lower())
)

In [5]:
from tqdm import tqdm

total = df.select(pl.len()).item()
batch = 1000

db.drop_collection("data")
with tqdm(total=total, desc="loading data in mongodb") as pbar:
    for df_slice in df.iter_slices(batch):
        docs = df_slice.to_dicts()
        if docs:
            db.papers.insert_many(docs)
        pbar.update(len(docs))

loading data in mongodb: 100%|██████████| 2700231/2700231 [16:18<00:00, 2760.72it/s]


## Disambiguation of authors and works, integration new data

In [10]:
from pyalex import Works, Authors, Sources, Institutions, Topics, Publishers, Funders
import pyalex
import os

pyalex.config.email = os.environ["OPENALEX_EMAIL"]

In [3]:
Works().search_filter(title="Sparsity-certifying Graph Decompositions").get()

[{'id': 'https://openalex.org/W1993757674',
  'doi': 'https://doi.org/10.1007/s00373-008-0834-4',
  'title': 'Sparsity-certifying Graph Decompositions',
  'display_name': 'Sparsity-certifying Graph Decompositions',
  'relevance_score': 739.04486,
  'publication_year': 2009,
  'publication_date': '2009-05-01',
  'ids': {'openalex': 'https://openalex.org/W1993757674',
   'doi': 'https://doi.org/10.1007/s00373-008-0834-4',
   'mag': '1993757674'},
  'language': 'en',
  'primary_location': {'is_oa': False,
   'landing_page_url': 'https://doi.org/10.1007/s00373-008-0834-4',
   'pdf_url': None,
   'source': {'id': 'https://openalex.org/S186126824',
    'display_name': 'Graphs and Combinatorics',
    'issn_l': '0911-0119',
    'issn': ['0911-0119', '1435-5914'],
    'is_oa': False,
    'is_in_doaj': False,
    'is_indexed_in_scopus': True,
    'is_core': True,
    'host_organization': 'https://openalex.org/P4310319900',
    'host_organization_name': 'Springer Science+Business Media',
    'hos

In [15]:
from tqdm import tqdm
from time import sleep

updates = {}
failed = []

cursor = db.papers.find(
    {"arxiv_doi": {"$ne": None}, "alex": None}, {"_id": 1, "arxiv_doi": 1}
)

In [17]:
for _ in tqdm(range(0, 1232385)):
    doc = cursor.next()
    try:
        data = Works()["https://doi.org/{doi}".format(doi=doc["arxiv_doi"])]
        db.papers.update_one({"_id": doc["_id"]}, {"$set": {"alex": dict(data)}})
    except Exception as e:
        failed.append(doc["_id"])
    finally:
        sleep(0.1)  # max 10 rps

  3%|▎         | 41048/1232385 [10:13:02<296:32:05,  1.12it/s] 


KeyboardInterrupt: 