# <center> OpenAlex data for Research Portal Denmark (RPD) </center>


## 1: Use OpenAlex search UI and compare results with current RPD databases
How many records in Open Alex and the 3 current RPD global data sources fulfill these criteria:
- At least one author is affiliated to a Danish institution.
- Year = 2022
- SDG (UN Sustainable Development Goal) = 2 Zero hunger
- There is Open Access to the publication (any kind of Open Access)

### **<font color='red'>Solution</font>**

!!! Note the answers were given at 24/1/2024

Your answers – please send to us before the interview:
- OpenAlex: <font color='red'>1659</font>
- Clarivate: <font color='red'>610</font>
- Digital Science: <font color='red'>254</font>
- Elsevier: <font color='red'>393</font>
For the OpenAlex answer, please cut and paste the search result URL here:  

https://openalex.org/works?page=1&filter=publication_year%3A2022,sustainable_development_goals.id%3Ahttps%3A%2F%2Fmetadata.un.org%2Fsdg%2F2,open_access.is_oa%3Atrue,institutions.country_code%3ADK&sort=cited_by_count%3Adesc&group_by=publication_year,open_access.is_oa,authorships.institutions.lineage,type

## 2: Programming challenge

### **A. Data Extraction from OpenAlex**
<font color='yellow'>o RPD holds records pertaining to Denmark starting from the year 2011. OpenAlex UI indicates the system currently has 457,600 such records.

o Write a Python script to extract all these records using the OpenAlex API and store them in a SQL/NoSQL database of your choice – please send the script to us before the interview. – Downloading 1000 records or so will suffice for the further tasks.</font>

### **<font color='red'>Solution</font>**

In [None]:
import requests
import json
from pymongo import MongoClient




def extract_and_store(api_url, database_name, collection_name):
    # Specify the cursor parameter to start cursor pagination
    params_cursor_paging = {
        'cursor': '*',
    }

    # Connect to MongoDB on localhost |  running on the default port 27017
    client = MongoClient('localhost', 27017)
    # Create or access a database |
    db = client[database_name]

    # Create or access a collection |
    collection = db[collection_name]

    total_results = 0
    next_cursor = params_cursor_paging

    while next_cursor is not None and total_results < 1000:  #I limited the instances to 1000
        response_cursor_paging = requests.get(api_url, params=params_cursor_paging)
        data_cursor_paging = response_cursor_paging.json()


        data = data_cursor_paging["results"]

        result = collection.insert_many(data)

        next_cursor = data_cursor_paging['meta']['next_cursor']

        # 200 results by page
        total_results += 200

        params_cursor_paging['cursor'] = next_cursor


    client.close()


if __name__ == "__main__":
    api_url = "https://api.openalex.org/works?page=1&filter=publication_year:2011-2024,institutions.country_code:DK&sort=cited_by_count:desc&per_page=200"
    database_name = 'openalex_db'
    collection_name = 'dataset'
    extract_and_store(api_url, database_name, collection_name)



The **extract_and_store** function uses the request library to make api calls to retrieve data from OpenAlex. It uses cursor paging to which allows to access many records. Then it cretes a MongoDB database and a collection and stores the data for every page. I have set the instances per page to 200 and I limited the retreived documents to 1000. At the following image you can see the resulted collection using Studio 3T UI.

<img src="pics/ing1.jpg"
     alt="Studio 3T image"
     style="float: left; margin-right: 10px;" />

<font color='yellow'>o Every month all the RPD databases are updated with new and changed records.

o Write a Python script for the monthly extraction and storage of new and changed
records. Ideally the script should automate this process, so it can be run periodically to
keep the data up to date – please send the script to us before the interview.</font>

### **<font color='red'>Solution</font>**

According to OpenAlex api documentation we need a Premium Subscription to use the **from_updated_date** filtering. 

<img src="pics/from.jpg"
     alt="documentation"
     style="float: left; margin-right: 10px;" />

I made an application to get the api key. I used again the function **extract_and_store** but this time using the appropriate api url to get the filtered data that are updated from 20/1/2024. 

*At the following section I dont provide my api key*

In [None]:
api_url = "https://api.openalex.org/works?page=1&filter=publication_year:2011-2024,institutions.country_code:DK,from_updated_date:2024-01-20&api_key=<MY_API_KEY>&sort=cited_by_count:desc&per_page=200"
database_name = 'openalex_db'
collection_name = 'updated_data'
extract_and_store(api_url, database_name, collection_name)

As you can see I the above screenshot, I created anothe collection named **updated_data**. I will use this collection to update the other collection - *named dataset* - that contains the full data.

<img src="pics/s3t.jpg"
     alt="documentation"
     style="float: left; margin-right: 10px;" />

The following function takes as argument the name of the updated collection and updates the data to the full data collection.

In [None]:
from pymongo import MongoClient

    
def update_data(updated_collection):    
    
    client = MongoClient('localhost', 27017)
    database = client['openalex_db']
    collection1 = database['dataset']
    collection2 = database[updated_collection]

    try:
        # Get all distinct titles from collection2
        distinct_titles = collection2.distinct("title")

        # Delete documents in collection1 with matching titles
        collection1.delete_many({"title": {"$in": distinct_titles}})

        # Get all documents from collection2 and insert into collection1
        documents_to_insert = list(collection2.find())
        if documents_to_insert:
            collection1.insert_many(documents_to_insert)

        print('Merge operation completed successfully.')

    finally:
        client.close()

To automate the above process so that it runs every month I used **Dagster**, a tool that is used for Cloud-native orchestration of data pipelines.

In [None]:
import requests
import json
from datetime import datetime
from dateutil.relativedelta import relativedelta
from pymongo import MongoClient
from Data_extraction.extract_data import extract_and_store
from Data_extraction.update_collection import update_data
from dagster import (
    AssetExecutionContext,
    Definitions,
    ScheduleDefinition,
    asset,
    define_asset_job,
)

@asset  
def monthly_updated_data(
    context: AssetExecutionContext):

    current_datetime = datetime.now()

    previous_month_datetime = current_datetime - relativedelta(months=1)

    previous_month_date = previous_month_datetime.date()

    
    # Date to string to name the collection
    formatted_date = current_datetime.strftime("%Y_%m_%d")

    previous_month_date_formated = previous_month_date.strftime("%Y-%m-%d")

    # format the date so that it changes the date at the "from_updated_date" filter
    api_url = "https://api.openalex.org/works?page=1&filter=publication_year:2011-2024,institutions.country_code:DK,from_updated_date:{}&api_key=<MY_KEY>&sort=cited_by_count:desc&per_page=200".format(previous_month_date_formated)

    database_name = 'openalex_db'
    new_collection_name = 'updated_data_{0}'.format(formatted_date)

    extract_and_store(api_url, database_name, new_collection_name)

    update_data(new_collection_name)
    
updatedata_job = define_asset_job("updatedata_job", selection=[monthly_updated_data])

#ScheduleDefinition the job it should run and a cron schedule
updatedata_schedule = ScheduleDefinition(
    job=updatedata_job,
    cron_schedule = "0 0 1 * *",  # every month
)


defs = Definitions(
    assets=[monthly_updated_data],
    schedules=[updatedata_schedule]
)


After running ```dagster dev -f schedule.py``` at the terminal it opens the UI of the platform to start and schedule the job.

<img src="pics/dag.jpg"
     alt="documentation"
     style="float: left; margin-right: 10px;" />

### B. Data enhancement
<font color='yellow'> o For example, the RPD maintains its own standard names for Danish research institutions and its own standard grouping of these. See for example the ‘Danish Affiliations’ filter at https://clarivate.forskningsportal.dk/search?facet_content-type_ss=publications . RPD has mapping tables for all the name variants that should be mapped to these standard names and groups.

o Consider and present how you think such an enhancement step could be added to the
data processing pipeline you are designing. </font>

### **<font color='red'>Solution</font>**

For this task, due to the fact that I don't have access to the RPD mapping tables I assume the following:  

*The mapping table should contain at least 3 columns.The first should contain the RPD standard name of the institution, the second a list of the variants of the names and the third the group that the institute belongs to. E.g Universities, Hospitals, Non-Profit Organizations etc.*

Then we have to generate a script (matching algorithm) so that for each record in the OpenAlex dataset, finds a match in the variants column of mapping tables, based on the institution name. Once it matches the name of the institution with a variant then the algorithm should replace the name at the OpenAlex dataset with the RPD standard name.

This step should be added in the designed pipeline as a transformation step after the data ingestion from OpenAlex. What I desgribe is an ELT process than the below picture illustrates.

<img src="pics/ELT.png"
     alt="documentation"
     style="float: left; margin-right: 10px;" />

### C. Data indexing using Elasticsearch

<font color='yellow'> o One of the end-products of your data processing pipeline will be a search UI based on
Elasticsearch and with a look and feel similar to https://clarivate.forskningsportal.dk/search?facet_content-type_ss=publications

o Consider and present how you think such a search service could be built and kept up to
data – in a robust, efficient, and user-friendly way. </font>

### **<font color='red'>Solution</font>**

For this task I created a python script that connects the MongoDB database with the elasticsearch. The following script allows elasticsearch to dynamically infer the mapping based on the structure of the documents. This process is known as dynamic mapping.

In [None]:
from elasticsearch import Elasticsearch
from pymongo import MongoClient
from bson import ObjectId
import json

mongo_client = MongoClient('localhost', 27017)
mongo_db = mongo_client["openalex_db"]
mongo_collection = mongo_db["dataset"]

es = Elasticsearch('http://localhost:9200', verify_certs=False)
print(es.ping())

index_name = "bulk19"


es.indices.create(index=index_name)

# Index each document from MongoDB into Elasticsearch
for document in mongo_collection.find():
    
    document['_id'] = str(document['_id'])

# Remove MongoDB-specific _id field
    if '_id' in document:
        del document['_id']

    
    es.index(index=index_name, body=document)

# Udpdate the index to make the documents available for search
es.indices.refresh(index=index_name)

mongo_client.close()


Once we have run we can discover our data at the 'Discover' section of **Kibana**. A screenshot of a part of the indexed data is presented below:

<img src="pics/kib.jpg"
     alt="documentation"
     style="float: left; margin-right: 10px;" />

Indeed the above script is a basic implementation of an Elasticsearch indexing process. In order to build a search service in a robust, efficient, and user-friendly way we have to consider some factors that analyzed below.

- Mapping  
  First of all we have to choose the appropriate field type for each attribute in our documents (e.g., text, keyword, date, long, etc.). For example we have to define the 'title' or 'display_name' element as 'text' to allow for full-text search. The county could be defined as 'keyword' type for exact matching and aggregations. The 'publication_year' could be defined as date to allow sorting and filtering.

- Autocomplete  
  This functionality returns suggestions to the end user based on the user inputs. We could integrate this component to make the search UI user-friendly.

- Synchronization  
  The pipeline should be designed in a way that allows the sunchronization of MongoDB with Elasticsearch. This component will enable the Elasticsearch to ingest the updated data from MongoDB automatically after the updates that occur. 


We can add more factors that we can consider to build the specific earch service, keeping always in mind the objectives of the end-product.

## Optional extra investigation:
### D. Data Exploration (1)