In [1]:
import time
from os.path import join
import os
import pandas as pd
from tqdm import tqdm

#
# # Create empty DataFrames for each ID type
# doi_df = pd.DataFrame(columns=['supported_id', 'openalex_id'])
# pmid_df = pd.DataFrame(columns=['supported_id', 'openalex_id'])
# pmcid_df = pd.DataFrame(columns=['supported_id', 'openalex_id'])
#
# start_time = time.time()
# for root, dirs, files in os.walk(directory_path):
#     for file in tqdm(files):
#         if file.endswith('.csv'):
#             file_path = os.path.join(root, file)
#             current_df = pd.read_csv(file_path)
#
#             # Determine the prefix and concatenate with the corresponding DataFrame
#             df_doi = current_df[current_df['supported_id'].str.startswith('doi:')]
#             doi_df = pd.concat([doi_df, df_doi], ignore_index=True)
#
#             df_pmid = current_df[current_df['supported_id'].str.startswith('pmid:')]
#             pmid_df = pd.concat([pmid_df, df_pmid], ignore_index=True)
#
#             df_pmcid = current_df[current_df['supported_id'].str.startswith('pmcid:')]
#             pmcid_df = pd.concat([pmcid_df, df_pmcid], ignore_index=True)
#
# print('Creating DataFrames for each ID type took: {} minutes'.format((time.time() - start_time)/60))

# Create DataFrames for each ID type
We want to create three lookup tables from the CSV files mapping PIDs to OpenAlex IDs for the resources in OpenAlex. We can do so by creating three different DataFrames, one for each ID type (doi, pmid, pmcid). Each DataFrame will have two columns: the first one will contain the PID of the ID type (doi, pmid, pmcid) and the second one will contain the OpenAlex ID of the bibliographic resource it is associated with. We can later use these DataFrames to create the corresponding tables in the database.
As the task is memory-intensive, we need to create the ID-type DataFrames one at a time. We define a function that takes in input the prefix (doi|pmid|pmcid) needed to select the rows from the input CSV files and the input directory; it returns the corresponding a single DataFrame, where each line corresponds to a bibliographic resource and stores one PID of the ID type specified as a parameter and the OpenAlex ID of the bibliographic resource it is associated with. (Only takes into consideration the IDs that can be associated with single bibliographic resources: doi, pmid, pmcid; isbn and issn are excluded!).

In [2]:
# from typing import Literal
#
# def create_id_df(id_type: Literal['doi','pmid','pmcid'], directory_path:str):
#     """
#     Create a DataFrame for a given ID type (doi, pmid, pmcid) from the CSV files in the input directory.
#     :param id_type: the ID type (doi, pmid, pmcid)
#     :param directory_path: the path to the directory containing the CSV files
#     :return: a DataFrame containing the PIDs of the ID type specified as a parameter and the OpenAlex IDs of the
#     bibliographic resources they are associated with
#     """
#     try:
#         assert id_type in ['doi','pmid','pmcid']
#         id_df = pd.DataFrame(columns=['supported_id', 'openalex_id'])
#         for root, dirs, files in os.walk(directory_path):
#             for file in tqdm(files):
#                 if file.endswith('.csv'):
#                     file_path = os.path.join(root, file)
#                     current_df = pd.read_csv(file_path)
#
#                     # Determine the prefix and concatenate with the corresponding DataFrame
#                     df_id = current_df[current_df['supported_id'].str.startswith(id_type)]
#                     id_df = pd.concat([id_df, df_id], ignore_index=True)
#         return id_df
#     except AssertionError:
#         print('The ID type must be one of the following: doi, pmid, pmcid')
#         return None

## Still to slow...
Let's try another approach. In the following function we first read each csv file line by line, then we append relevant rows to a list. Then we create a DataFrame from the list of rows and return the DataFrame.

In [3]:
# import pandas as pd
# import os
# import csv
# from tqdm import tqdm
# from typing import Literal
#
# def create_id_df(id_type: Literal['doi', 'pmid', 'pmcid'], directory_path: str):
#     """
#     Create a DataFrame for a given ID type (doi, pmid, pmcid) from the CSV files in the input directory.
#     :param id_type: the ID type (doi, pmid, pmcid)
#     :param directory_path: the path to the directory containing the CSV files
#     :return: a DataFrame containing the PIDs of the ID type specified as a parameter and the OpenAlex IDs of the
#     bibliographic resources they are associated with
#     """
#     try:
#         assert id_type in ['doi', 'pmid', 'pmcid']
#         id_data = []
#         for root, dirs, files in os.walk(directory_path):
#             for file in tqdm(files):
#                 if file.endswith('.csv'):
#                     file_path = os.path.join(root, file)
#                     with open(file_path, 'r', encoding='utf-8') as csv_file:
#                         reader = csv.DictReader(csv_file, dialect='unix')
#                         for row in reader:
#                             if row['supported_id'].startswith(id_type):
#                                 id_data.append(row)
#
#         id_df = pd.DataFrame(id_data, columns=['supported_id', 'openalex_id'])
#         return id_df
#     except AssertionError:
#         print('The ID type must be one of the following: doi, pmid, pmcid')
#         return None

## Still too slow (x2)...
This still takes forever to run. The RAM saturates pretty soon, and though it is a bit faster at the beginning, before even finishing building the list it incurs in a MemoryError. Let's try to use the multiprocessing functions built in Dask to speed up the process and manage the memory issues.

In [None]:
# import pandas as pd
# import dask.dataframe as dd
# import os
# from tqdm import tqdm
# from typing import Literal
#
# def create_id_df(id_type: Literal['doi', 'pmid', 'pmcid'], directory_path: str):
#     """
#     Create a DataFrame for a given ID type (doi, pmid, pmcid) from the CSV files in the input directory.
#     :param id_type: the ID type (doi, pmid, pmcid)
#     :param directory_path: the path to the directory containing the CSV files
#     :return: a DataFrame containing the PIDs of the ID type specified as a parameter and the OpenAlex IDs of the
#     bibliographic resources they are associated with
#     """
#     try:
#         assert id_type in ['doi', 'pmid', 'pmcid']
#
#         dask_df = dd.DataFrame()
#         for root, dirs, files in os.walk(directory_path):
#             for file in tqdm(files):
#                 if file.endswith('.csv'):
#                     file_path = os.path.join(root, file)
#                     current_df = dd.read_csv(file_path)
#
#                     # Determine the prefix and concatenate with the corresponding DataFrame
#                     df_id = current_df[current_df['supported_id'].str.startswith(id_type)]
#                     dask_df = dd.concat([dask_df, df_id], ignore_index=True)
#
#         id_df = dask_df.compute()  # Convert Dask DataFrame to Pandas DataFrame
#         return id_df
#     except AssertionError:
#         print('The ID type must be one of the following: doi, pmid, pmcid')
#         return None

In [4]:
import dask.dataframe as dd
dask_df = dd.read_csv('D:/oa_work_tables.*.*.csv', blocksize=25e6)

ModuleNotFoundError: No module named 'dask'

In [5]:
OA_WORK_OUTPUT_FOLDER_PATH = join('D:/oa_work_tables')

directory_path = OA_WORK_OUTPUT_FOLDER_PATH  # Path to the directory containing the CSV files

start_time = time.time()
doi_df = create_id_df('doi', directory_path)
print('Creating the DOI DataFrame took: {} minutes'.format((time.time() - start_time)/60))

0it [00:00, ?it/s]
100%|██████████| 1/1 [00:00<?, ?it/s]
100%|██████████| 1/1 [00:00<00:00, 999.60it/s]
100%|██████████| 1/1 [00:00<00:00, 998.88it/s]
100%|██████████| 1/1 [00:00<00:00, 1002.46it/s]
100%|██████████| 1/1 [00:00<00:00, 1001.03it/s]
100%|██████████| 1/1 [00:00<00:00, 902.19it/s]
100%|██████████| 1/1 [00:00<00:00, 1000.07it/s]
100%|██████████| 1/1 [00:00<00:00, 999.12it/s]
100%|██████████| 1/1 [00:00<00:00, 994.85it/s]
100%|██████████| 1/1 [00:00<00:00, 997.69it/s]
100%|██████████| 1/1 [00:00<00:00, 1000.79it/s]
100%|██████████| 1/1 [00:00<00:00, 997.93it/s]
100%|██████████| 1/1 [00:00<00:00, 999.36it/s]
100%|██████████| 1/1 [00:00<00:00, 1001.51it/s]
100%|██████████| 1/1 [00:00<00:00, 988.29it/s]
100%|██████████| 1/1 [00:00<00:00, 997.93it/s]
100%|██████████| 1/1 [00:00<00:00, 499.68it/s]
100%|██████████| 1/1 [00:00<00:00, 994.15it/s]
100%|██████████| 1/1 [00:00<00:00, 999.60it/s]
100%|██████████| 1/1 [00:00<00:00, 1003.66it/s]
100%|██████████| 1/1 [00:00<00:00, 996.75it/

MemoryError: 

In [None]:
start_time = time.time()
pmid_df = create_id_df('pmid', directory_path)
print('Creating the PMID DataFrame took: {} minutes'.format((time.time() - start_time)/60))

In [None]:
start_time = time.time()
pmcid_df = create_id_df('pmcid', directory_path)
print('Creating the PMCID DataFrame took: {} minutes'.format((time.time() - start_time)/60))

In [None]:
from sys import getsizeof
print('DOI DataFrame size: {} Mb'.format(getsizeof(doi_df)/1000000))
print('Number of DOIs (rows) in DOI DataFrame: {}'.format(doi_df.size))
print('PMID DataFrame size: {} Mb'.format(getsizeof(pmid_df)/1000000))
print('Number of PMIDs (rows) in PMID DataFrame: {}'.format(pmid_df.size))
print('PMCID DataFrame size: {} Mb'.format(getsizeof(pmcid_df)/1000000))
print('Number of PMCIDs (rows) in PCMID DataFrame: {}'.format(pmcid_df.size))

## Build the database, create the tables and insert the data

In [None]:
from sqlite3 import connect
import time

DB_PATH = 'oa_ids_tables.db'  # Path to the SQLite database file

start_time = time.time()
with connect(DB_PATH) as con:
    doi_table_start = time.time()
    doi_df.to_sql("WorksDOI", con, if_exists="replace", index=False)
    print('Creating the DOI table took: {} seconds'.format(time.time() - doi_table_start))
    pmid_table_start = time.time()
    pmid_df.to_sql("WorksPMID", con, if_exists="replace", index=False)
    print('Creating the PMID table took: {} seconds'.format(time.time() - pmid_table_start))
    pmcid_table_start = time.time()
    pmcid_df.to_sql("WorksPMCID", con, if_exists="replace", index=False)
    print('Creating the PMCID table took: {} seconds'.format(time.time() - pmcid_table_start))

print('Creating the tables in the database took: {} seconds'.format(time.time() - start_time))
