In [None]:
import json
import re
from http.client import responses
from typing import List, Optional, Literal, Dict
from pydantic import BaseModel, HttpUrl, ValidationError, ConfigDict, Field, model_validator

In [None]:
import pandas as pd, requests, urllib.parse, time
from collections import defaultdict
from tqdm import tqdm

def get_oa_work():
    BASE = "https://api.openalex.org/works"
    filters = "title_and_abstract.search:Blanchot,publication_year:1998-2025"
    per_page = 200
    records = []
    cursor = "*"
    invalid_works = []
    with tqdm(desc='Downloading', unit='work') as pbar:
        while True:
            encoded_filters = urllib.parse.quote(filters)
            encoded_cursor = urllib.parse.quote(cursor)

            url = f"{BASE}?filter={encoded_filters}&per_page={per_page}&cursor={encoded_cursor}"
            
            resp = requests.get(url).json()
            if pbar.total == None: 
                pbar.total = resp['meta']['count']
            works = resp['results']

            for work in works:
                try:
                    valid_work = OpenAlexWork.model_validate(work).model_dump()
                    id = re.search(r'[A-Z](\d+)', str(valid_work['id'])).group()
                    records[id].append(valid_work)
                except ValidationError as e:
                    invalidation_info = {
                        "work_id": work.get("id"),
                        "error": str(e)
                    }
                    invalid_works.append(invalidation_info)
            
            
            pbar.update(len(works))

            next_cursor = resp['meta'].get('next_cursor')
            if not next_cursor:
                break
            cursor = next_cursor
            time.sleep(0.1)
    return 

Downloading: 100%|█████████▉| 4076/4079 [00:24<00:00, 167.53work/s]


In [6]:
#UNFINSHED CODE FOR DEDUPING
duplicate_ids = []

for id, works in records.items():
    if len(works) > 1:
        duplicate_ids.append(id)

print('\n'.join(duplicate_ids))

W4393910644
W2500671005
W1640876992
W181498903
W2313838823
W2242153549
W4210509993
W2019765947


In [7]:
#FILE CREATION
with open('openalex_blanchot.json', mode='w') as f:
    json.dump(records, f, indent=4)

In [10]:
import re
from collections import defaultdict

import pandas as pd, requests, urllib.parse, time
from tqdm import tqdm
from pydantic import ValidationError

from models import OpenAlexWork

def get_oa_work():
    BASE = "https://api.openalex.org/works"
    filters = "title_and_abstract.search:Blanchot,publication_year:1998-2025"
    per_page = 200
    records = []
    cursor = "*"
    invalid_works = []
    with tqdm(desc='Downloading', unit='work') as pbar:
        while True:
            encoded_filters = urllib.parse.quote(filters)
            encoded_cursor = urllib.parse.quote(cursor)

            url = f"{BASE}?filter={encoded_filters}&per_page={per_page}&cursor={encoded_cursor}"
            
            resp = requests.get(url).json()
            if pbar.total == None: 
                pbar.total = resp['meta']['count']
            works = resp['results']

            for work in works:
                try:
                    valid_work = OpenAlexWork.model_validate(work).model_dump()
                    valid_work['short_id'] = re.search(r'[A-Z]\d+', str(valid_work['id'])).group()
                    records.append(valid_work)
                except ValidationError as e:
                    invalidation_info = {
                        "work_id": work.get("id"),
                        "error": str(e)
                    }
                    invalid_works.append(invalidation_info)
            
            
            pbar.update(len(works))

            next_cursor = resp['meta'].get('next_cursor')
            if not next_cursor:
                break
            cursor = next_cursor
            time.sleep(0.1)

    if invalid_works:
        print(f"\nInvalid: {len(invalid_works)}")

    original_count = len(records)
    print(f"\nDownloaded: {original_count}")

    grouped_records = defaultdict(list)
    for record in records:
        record_id = record['short_id']
        grouped_records[record_id].append(record)

    records = [works_list[0] for works_list in grouped_records.values()]
    final_count = len(records)

    print(f"Duplicates removed: {original_count - final_count}")

    return records