In [1]:
import os
#Ensure we are at the base level 
os.chdir("../")
%pwd

'C:\\Users\\Markus\\Documents\\Cambridge_Projects\\GroupProject\\conservation_synthesis'

In [2]:
from sources.data_processing.repositories import CrossrefRepository, OpenAireRepository, CoreRepository, AbstractRepository
from sources.data_processing.queries import ArticleMetadata, Response, KeywordQuery
from data.cleancsv import CleanCSV
import pandas as pd
import asyncio
import aiohttp
import time
import json
import pathlib
from itertools import chain

In [11]:
%pwd

'C:\\Users\\Markus\\Documents\\Cambridge_Projects\\GroupProject\\conservation_synthesis'

In [3]:
repo = CoreRepository().__class__ #Insert your Repo here
data = CleanCSV("data/cleaned_references.csv")
responses = []
all_entries = len(data)

In [4]:
def batch_data(repo, start_index, max_index):
    batch_size = repo().max_queries_per_second
    cur_index = start_index
    while cur_index<max_index:
        yield range(cur_index, min(cur_index+batch_size, max_index))
        cur_index += batch_size

In [5]:
def get_kwd_query_from_row(data_row):
    return KeywordQuery(query_id=0, 
                        authors = 
                            data_row.authors if len(data_row.authors) > 0 else None, 
                        title = data_row.title, 
                        journal_name = data_row.pub_title if data_row.pub_title!="" else None, 
                        doi = data_row.doi if data_row.doi!="" else None)

In [6]:
async def process_data(repo, data, start=0, end=None):
    if end is None:
        end = len(data)

    async with aiohttp.ClientSession() as session:
        for _range in batch_data(repo, start, end):
            start_time = time.time()
            
            _queries = [get_kwd_query_from_row(data[i]) for i in _range]
            _repos = [repo() for i in _range]
            
            response = await asyncio.gather(
                *[_repo.execute_query(_query, session) 
                    for _repo, _query in zip(_repos, _queries)], 
                return_exceptions=True)
            
            response = [(i, resp.metadata.as_dict()) 
                            for i, resp in zip(_range, response) 
                                if not isinstance(resp, Exception)]
            
            for i, rmeta in response:
                rmeta["index"] = i
                responses.append(rmeta)

            time_taken = start_time - time.time() #Number of seconds that passed
            if time_taken>=1:
                continue
            else:
                time.sleep(1-time_taken)

In [7]:
#Process Data in Batches of 1000
start_ind = 0
end_ind = len(data)
step = 1000

data_dir = pathlib.Path('.') / "data" / f"clean_references_{repo().get_identifier()}"
data_dir.mkdir(exist_ok=True)

while start_ind < end_ind:
    next_end_ind = min(start_ind + step, end_ind)
    await process_data(repo, data, start_ind, next_end_ind)
    
    write_path = data_dir / f"{repo().get_identifier()}_cr_{start_ind}_{next_end_ind}"
    with write_path.open("w") as f:
        json.dump(responses, f)
    
    start_ind += step

In [8]:
all_data = []
all_files = [file_path for file_path in data_dir.iterdir()]
for file_path in all_files:
    with file_path.open("r") as f:
        all_data.append(json.load(f))

merged_dicts = list(chain.from_iterable(all_data))
write_path = data_dir / f"{repo().get_identifier()}_merged"
with write_path.open("w") as f:
    json.dump(merged_dicts, f)

In [18]:
###############CLEANUP -- Remove all files, but the merged one
for file_path in all_files: 
    file_path.unlink()

