In [None]:
from dotenv import load_dotenv
import os
import pymongo
import re
import requests
from retry import retry

In [None]:
load_dotenv()

FOSM_BASE_URL="https://cluster.elasticsearch.dataesr.ovh"
FOSM_INDEX="bso-publications"
FOSM_LIMIT=0  # Set to 0 for no limit
FOSM_PAGE_SIZE=10000  # Maximum is 10000
FOSM_PIT_KEEP_ALIVE="5m"
MONGO_CHUNK_SIZE=500
MONGO_COLLECTION="publications"
MONGO_DB="bsocoverage"
OA_LIMIT=0  # Set to 0 for no limit
OA_PAGE_SIZE=200  # Maximum is 200

# Access the environment variables from the .env file
FOSM_AUTHORIZATION=os.getenv("FOSM_AUTHORIZATION")
MONGO_URI=os.getenv("MONGO_URI", "mongodb://localhost:27017/")
OA_API_KEY=os.getenv("OA_API_KEY")

In [None]:
mongo_database = pymongo.MongoClient(MONGO_URI)[MONGO_DB]
mongo_collection = mongo_database[MONGO_COLLECTION]

# 1. Import FOSM

In [None]:
# Number of publications in French OSM
json = {
  "query": { "bool": { "must": [
    { "range": { "year": { "gte": 2013, "lte": 2021 } } },
    { "term": { "bso_country_corrected": "fr" } },
    { "terms": { "genre.keyword": [ "journal-article", "proceedings", "book-chapter", "book", "preprint" ] } },
  ] } },
}
r = requests.get("/".join([FOSM_BASE_URL, FOSM_INDEX, "_count"]), headers={"Authorization": f"Basic {FOSM_AUTHORIZATION}"}, json=json)
fosm_total_count = r.json().get("count")
print(fosm_total_count)

In [None]:
@retry(Exception, tries=5, delay=10)
def get_fosm_publications(pit, total_results_count=0, search_after=None):
    try:
        print(f"\"{pit}\", {total_results_count}, {search_after}")
        json = {
            "pit": {"id":  pit, "keep_alive": FOSM_PIT_KEEP_ALIVE},
            "query": { "bool": { "must": [
                { "range": { "year": { "gte": 2013, "lte": 2021 } } },
                { "term": { "bso_country_corrected": "fr" } },
                { "terms": { "genre.keyword": [ "journal-article", "proceedings", "book-chapter", "book", "preprint" ] } },
            ] } },
            "size": FOSM_PAGE_SIZE,
            "sort": ["_shard_doc"],
        }
        if search_after:
            json["search_after"] = search_after
            json["track_total_hits"] = False
        r = requests.get("/".join([FOSM_BASE_URL, "_search"]),
                        headers={"Authorization": f"Basic {FOSM_AUTHORIZATION}"}, json=json)
        response = r.json()
        results = response.get("hits").get("hits")
        actions = []
        for publication in results:
            doi = publication.get("_source", {}).get("doi")
            hal_id = publication.get("_source", {}).get("hal_id")
            id = doi if doi else hal_id
            id = id.lower()
            publication = {
                "all_ids": publication.get("_source").get("external_ids"),
                "id": id,
                "is_in_fosm": True,
                "fosm": publication.get("_source"),
            }
            if doi:
                publication["doi"] = doi
            if hal_id:
                publication["hal_id"] = hal_id
            actions.append(pymongo.UpdateOne(
                {"id": id}, {"$set": publication}, upsert=True))
        if len(actions) > 0:
            mongo_collection.bulk_write(actions, ordered=False)
        results_count = len(results)
        total_results_count += results_count
        last_result = results[results_count - 1]
        next_pit = response.get("pit_id")
        del json
        del r
        del response
        del results
        del actions
        print('{:.0f} %'.format((total_results_count / fosm_total_count) * 100))
        if results_count > 0 and (FOSM_LIMIT == 0 or total_results_count < FOSM_LIMIT):
            search_after = last_result.get("sort")
            return get_fosm_publications(next_pit, total_results_count, search_after)
        else:
            return total_results_count
    except Exception as error:
        print(error)
        raise

In [None]:
# Get Point In Time
r = requests.post("/".join([FOSM_BASE_URL, FOSM_INDEX, f"_pit?keep_alive={FOSM_PIT_KEEP_ALIVE}"]), headers={"Authorization": f"Basic {FOSM_AUTHORIZATION}"})
pit = r.json().get("id")
# Collect all publications with DOI in French OSM
fosm_publications = get_fosm_publications(pit)
print(fosm_publications)
# Delete Point In Time
r = requests.delete("/".join([FOSM_BASE_URL, "_pit"]), headers={"Authorization": f"Basic {FOSM_AUTHORIZATION}"}, json={"id": pit})

# 2. Import OpenAlex

In [None]:
# Number of French publications in OpenAlex
r = requests.get(
    f"https://api.openalex.org/works?filter=institutions.country_code:FR,is_paratext:false,publication_year:2013-2021&api_key={OA_API_KEY}")
openalex_total_count = r.json().get("meta").get("count")
print(openalex_total_count)

In [None]:
@retry(Exception, tries=5, delay=30)
def get_openalex_publications(cursor, total_results_count):
    print(f"\"{cursor}\", {total_results_count}")
    r = requests.get(
        f"https://api.openalex.org/works?filter=institutions.country_code:FR,is_paratext:false,publication_year:2013-2021&per-page={OA_PAGE_SIZE}&api_key={OA_API_KEY}&cursor={cursor}")
    response = r.json()
    results = response.get("results")
    actions = []
    for publication in results:
        open_alex_id = publication.get("id")
        doi = False
        if publication.get("doi"):
            doi = publication.get("doi", "").replace("https://doi.org/", "")
        hal_landing_page_urls = [location.get("landing_page_url") for location in response.get("locations", []) if re.match("^https:\/\/hal\.(science|archives-ouvertes\.fr|inria\.fr)\/(hal-\d*)", location.get("landing_page_url", ""))]
        hal_ids_uniq = list(set([hal_landing_page_url.split('/')[3] for hal_landing_page_url in hal_landing_page_urls]))
        if len(hal_ids_uniq) > 1:
            print(f"More than one hal_id in OpenAlex work : {open_alex_id}")
            hal_id = False
        else:
            hal_id = hal_ids_uniq[0] if len(hal_ids_uniq) == 1 else False
        id = doi if doi else hal_id if hal_id else open_alex_id
        id = id.lower()
        if id:
            all_ids = [{"id_type": k, "id_value": v} for k, v in publication.get("ids").items()]
            if open_alex_id and len([id for id in all_ids if id.get("id_type") == "openalex"]) == 0:
                all_ids.append({"id_type": "openalex", "id_value": open_alex_id})
            if doi and len([id for id in all_ids if id.get("id_type") == "doi"]) == 0:
                all_ids.append({"id_type": "doi", "id_value": doi})
            if hal_id and len([id for id in all_ids if id.get("id_type") == "hal_id"]) == 0:
                all_ids.append({"id_type": "hal_id", "id_value": hal_id})
            publication = {
                "all_ids": all_ids,
                "id": id,
                "is_in_openalex": True,
                "openalex": publication,
            }
            actions.append(pymongo.UpdateOne(
                {"id": id}, {"$set": publication}, upsert=True))
    if len(actions) > 0:
        mongo_collection.bulk_write(actions, ordered=False)
    results_count = len(results)
    total_results_count += results_count
    next_cursor = response.get("meta").get("next_cursor")
    del actions
    del r
    del response
    del results
    print('{:.0f} %'.format((total_results_count / openalex_total_count) * 100))
    if next_cursor is not None and results_count > 0 and (OA_LIMIT == 0 or len(total_results_count) < OA_LIMIT):
        return get_openalex_publications(next_cursor, total_results_count)
    else:
        return total_results_count

In [None]:
# Collect all French publications in OpenAlex
openalex_publications = get_openalex_publications("*", 0)
print(openalex_publications)

# 3. Compute year field

In [None]:
# Set year with priority to FOSM
publicationsCount = mongo_collection.count_documents({ "year": { "$exists": False } })
publications = mongo_collection.find({ "year": { "$exists": False } })
actions = []
index = 0
for publication in publications:
  index += 1
  year_fosm = publication.get("fosm", {}).get("year")
  year_openalex = publication.get("openalex", {}).get("publication_year")
  if year_fosm:
    actions.append(pymongo.UpdateOne({ "id": publication.get("id") }, { "$set": { "year": year_fosm } }, upsert=True))
  elif year_openalex:
    actions.append(pymongo.UpdateOne({ "id": publication.get("id") }, { "$set": { "year": year_openalex } }, upsert=True))
  if len(actions) == MONGO_CHUNK_SIZE:
    print(f"{index} / {publicationsCount}")
    mongo_collection.bulk_write(actions, ordered=False)
    actions = []