In [1]:
import json
import os
os.chdir("../") # resets notebook directory to repository root folder (DO ONLY ONCE!)
import gzip
import tqdm
import polars as pl
from tqdm import tqdm
from collections import defaultdict
# import pyarrow as pa

def flatten(matrix):
    return [item for row in matrix for item in row]

def simpleId(text):
    try:
        y=text.split('/')[-1]
    except:
        y='NONE'
    return y

def flush_buffers(buffers, output_dir, topic_id = None):
    # if topic_id == None, flush all, otherwise only that
    if topic_id != None:
        buffers_to_flush = {topic_id : buffers[topic_id]}
    else:
        buffers_to_flush = buffers
    for topic_id, rows in buffers_to_flush.items():
        # print(f"flushing {topic_id}",flush=True)
        if not rows:
            continue
        df = pl.DataFrame(rows, orient = "row", schema = columns)

        out_path = os.path.join(output_dir, f"{topic_id}.csv")
        file_exists = os.path.isfile(out_path)
        # # USING pandas
        # df.write_csv(
        #     out_path,
        #     has_header=not file_exists,
        #     separator=',',
        #     append=file_exists
        # )
        # USING polars
        csv_str = df.write_csv(separator=',', include_header=not file_exists)
        with open(out_path, 'a', encoding='utf-8') as f:
            f.write(csv_str)
        # flush original buffers
        buffers[topic_id] = []
    return buffers

In [2]:
snapshot_subfolder = "data/openalex-snapshot/data/works/"

In [3]:
listdir=[subfolder for subfolder in sorted(os.listdir(snapshot_subfolder)) if 'updated' in subfolder]
print(f"Found {len(listdir)} subfolders")

Found 424 subfolders


In [4]:
os.listdir(snapshot_subfolder+listdir[0])

['part_000.gz']

In [5]:
files=flatten([[snapshot_subfolder+listdir[a]+'/'+i for i in os.listdir(snapshot_subfolder+listdir[a]) if 'part' in i] for a in range(len(listdir))])
print(f"Found {len(files)} files")

Found 893 files


In [6]:
print("Example file:", files[0])

Example file: data/openalex-snapshot/data/works/updated_date=2023-05-17/part_000.gz


In [7]:
destination_csv_folder = "data/works_by_topic_csv/"
os.makedirs(destination_csv_folder, exist_ok=True)

# before doing any damage, check if there are files in the destination folder and stop if there are, 
# because if there are already csv, it will append and might cause many duplicates
if os.listdir(destination_csv_folder):  # listdir returns [] if empty
    existing = os.listdir(destination_csv_folder)
    raise RuntimeError(
        f"Destination folder '{destination_csv_folder}' is not empty! "
        f"Found {len(existing)} file(s), e.g., {existing[:3]}. "
        f"Please clean it before running this script to avoid appending duplicates."
    )

columns = ['id', 'date', 'type', 'language', 'journal', 'doi', 'authors', 'topics', 'references', 
           'sdg', 'keywords', 'grants', 'primary_topic']

# Topic buffers: key = topic_id, value = list of rows (as dicts)
buffers = defaultdict(list)
BUFFER_SIZE = 1000  # flush every N rows per topic

for gzfile in tqdm(files):
    with gzip.open(gzfile, 'rt') as file:
        for line in file:
            data = json.loads(line)
            if all(k in data for k in ['primary_topic', 'publication_date', 'authorships']):
                if not (data['primary_topic'] and data['publication_date'] and data['authorships']):
                    continue

                row = {}
                row['primary_topic'] = simpleId(data['primary_topic']['id'])
                row['id'] = simpleId(data['id'])
                row['date'] = data['publication_date']
                row['type'] = data.get('type', '')

                try:
                    row['journal'] = simpleId(data['primary_location']['source']['id'])
                except:
                    row['journal'] = ''

                # Authors: authorID_institutionID1|institutionID2_corresponding;...
                try:
                    author_blocks = []
                    for a in data['authorships']:
                        author_id = simpleId(a['author']['id'])
                
                        # Get all institution IDs
                        if 'institutions' in a and a['institutions']:
                            inst_ids = [simpleId(inst['id']) for inst in a['institutions'] if 'id' in inst]
                            inst_str = '|'.join(inst_ids)
                        else:
                            inst_str = ''
                
                        is_corr = str(a.get('is_corresponding', False))[0]  # T/F
                
                        author_blocks.append(f"{author_id}_{inst_str}_{is_corr}")
                
                    row['authors'] = ';'.join(author_blocks)
                except:
                    row['authors'] = ''


                # Topics
                try:
                    row['topics'] = ';'.join([f"{simpleId(t['id'])}_{t['score']}" for t in data['topics']])
                except:
                    row['topics'] = row['primary_topic']

                # References
                try:
                    row['references'] = ';'.join([simpleId(r) for r in data['referenced_works']])
                except:
                    row['references'] = ''

                # Sustainable Development Goals
                try:
                    sdg_list = data.get('sustainable_development_goals', [])
                    row['sdg'] = ';'.join([
                        f"{simpleId(sdg['id'])}_{sdg['score']}" 
                        for sdg in sdg_list if 'id' in sdg and 'score' in sdg
                    ])
                except:
                    row['sdg'] = ''
                
                # Keywords
                try:
                    kw = data.get('keywords', [])
                    row['keywords'] = ';'.join([
                        f"{kwd['display_name'].replace('_',' ').replace(';',' ')}_{kwd['score']}"
                        for kwd in kw if 'display_name' in kwd and 'score' in kwd
                    ])
                except:
                    row['keywords'] = ''
                
                # Grants
                try:
                    grants = data.get('grants', [])
                    row['grants'] = ';'.join([
                        f"{grant['award_id']}_{simpleId(grant['funder'])}"
                        for grant in grants if 'award_id' in grant and 'funder' in grant
                    ])
                except:
                    row['grants'] = ''

                # DOI (extract only the identifier part, not the full https://doi.org/...)
                try:
                    row['doi'] = data['doi'].replace('https://doi.org/', '')
                except:
                    row['doi'] = ''
                
                # Language (ISO code, e.g., 'en', 'fr')
                row['language'] = data.get('language', '')

                buffers[row['primary_topic']].append(row)

                # Flush if buffer is too large
                if len(buffers[row['primary_topic']]) >= BUFFER_SIZE:
                    buffers = flush_buffers(buffers, destination_csv_folder, row['primary_topic'])

# Final flush
buffers = flush_buffers(buffers, destination_csv_folder)


 20%|██        | 182/893 [02:06<08:16,  1.43it/s] 


NameError: name 'lines' is not defined