In [None]:
project_id = 'elife-data-pipeline'
source_dataset = 'de_dev'
output_dataset = 'de_dev'
output_table_prefix = 'data_science_'
mv_prefix = 'mv_'

max_workers = 10
max_manuscripts = 100000
manuscript_upload_batch_size = 10000
email = 'd.ecer@elifesciences.org'

In [None]:
import logging
from datetime import datetime
from functools import partial
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Sized, cast

from tqdm.auto import tqdm

import data_science_pipeline.configure_warnings  # pylint: disable=unused-import
import data_science_pipeline.configure_notebook_logging  # pylint: disable=unused-import

from data_science_pipeline.utils.europepmc import (
    EUROPEPMC_MAX_PAGE_SIZE,
    EuropePMCApi,
    europepmc_requests_retry_session
)
from data_science_pipeline.utils.misc import iter_batches
from data_science_pipeline.utils.bq import (
    with_limit_sql,
    is_bq_not_found_exception,
    load_json_list_and_append_to_bq_table_with_auto_schema
)
from data_science_pipeline.utils.jupyter import (
    read_big_query as _read_big_query,
)

In [None]:
LOGGER = logging.getLogger(__name__)

logging.basicConfig(level='INFO')
# logging.getLogger('data_science_pipeline.utils.bq').setLevel(logging.DEBUG)

In [None]:
editor_parsed_pubmed_links_table_name = '{output_dataset}.{prefix}{suffix}'.format(
    output_dataset=output_dataset,
    prefix=output_table_prefix,
    suffix='editor_pubmed_links'
)

editor_parsed_pubmed_ids_table_name = '{output_dataset}.{mv_prefix}{prefix}{suffix}'.format(
    output_dataset=output_dataset,
    mv_prefix=mv_prefix,
    prefix=output_table_prefix,
    suffix='editor_pubmed_ids'
)

external_manuscript_summary_output_table_name = '{prefix}{suffix}'.format(
    prefix=output_table_prefix,
    suffix='external_manuscript_summary'
)

In [None]:
read_big_query = partial(_read_big_query, project_id=project_id)

In [None]:
existing_pmids_sql = (
    'SELECT pmid FROM `{dataset}.{table}`'
).format(table=external_manuscript_summary_output_table_name, dataset=output_dataset)

all_pmids_sql = '\n'.join([
    'SELECT DISTINCT pubmed_id',
    'FROM `{editor_parsed_pubmed_ids_table_name}`',
    'JOIN UNNEST(pubmed_ids) AS pubmed_id',
    '',
    'UNION DISTINCT',
    '',
    'SELECT DISTINCT pubmed_id',
    'FROM `{editor_parsed_pubmed_links_table_name}`',
    'JOIN UNNEST(relevant_pubmed_ids) AS pubmed_id',
    '',
    'UNION DISTINCT',
    '',
    'SELECT DISTINCT pubmed_id',
    'FROM `{editor_parsed_pubmed_links_table_name}`',
    'JOIN UNNEST(parsed_search_term.include.pmid) AS pubmed_id'
]).format(
    editor_parsed_pubmed_ids_table_name=editor_parsed_pubmed_ids_table_name,
    editor_parsed_pubmed_links_table_name=editor_parsed_pubmed_links_table_name
)

remaining_pmids_sql = '\n'.join([
    'SELECT DISTINCT pubmed_id',
    'FROM ({all_pmids_sql})',
    'WHERE pubmed_id NOT IN ({existing_pmids_sql})'
]).format(
    all_pmids_sql=all_pmids_sql,
    existing_pmids_sql=existing_pmids_sql
)

try:
    remaining_pmids_df = read_big_query(with_limit_sql(
        remaining_pmids_sql,
        limit=max_manuscripts
    ))
except Exception as e:  # pylint: disable=broad-except
    if not is_bq_not_found_exception(e):
        raise
    print('table not found: %s', external_manuscript_summary_output_table_name)
    remaining_pmids_df = read_big_query(with_limit_sql(
        all_pmids_sql,
        limit=max_manuscripts
    ))
print(len(remaining_pmids_df))
remaining_pmids_df.head()

In [None]:
remaining_pmids_df.dtypes

In [None]:
query_pubmed_ids = remaining_pmids_df['pubmed_id'].values
len(query_pubmed_ids)

In [None]:
query_pubmed_ids_batches = list(iter_batches(query_pubmed_ids, EUROPEPMC_MAX_PAGE_SIZE))
print('number of batches:', len(query_pubmed_ids_batches))
if query_pubmed_ids_batches:
    print('first batch:', len(cast(Sized,query_pubmed_ids_batches[0])))

In [None]:
def handle_http_error(error: BaseException, data: Optional[dict] = None):
    LOGGER.warning('error: %s, data=%s', error, data)

In [None]:
def add_provenance(manuscript_summary_list: List[dict]) -> List[dict]:
    imported_timestamp = datetime.utcnow().isoformat()
    provenance = {
        'source': 'europepmc',
        'imported_timestamp': imported_timestamp
    }
    return [
        {
            **manuscript_summary,
            'provenance': provenance
        }
        for manuscript_summary in manuscript_summary_list
    ]

In [None]:
with europepmc_requests_retry_session() as session:
    europepmc_api = EuropePMCApi(
        session,
        on_error=handle_http_error,
        params={'email': email}
    )
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        try:
            retrieved_editor_manuscript_list_batches_it = tqdm(
                executor.map(
                    europepmc_api.get_summary_by_page_pmids,
                    query_pubmed_ids_batches
                ),
                total=len(query_pubmed_ids_batches),
                leave=False
            )
            retrieved_flattened_manuscript_list_it = (
                manuscript_summary
                for manuscript_summary_list in retrieved_editor_manuscript_list_batches_it
                for manuscript_summary in manuscript_summary_list
            )
            manuscript_upload_batch_list_it = iter_batches(
                retrieved_flattened_manuscript_list_it,
                manuscript_upload_batch_size
            )
            for manuscript_upload_list in manuscript_upload_batch_list_it:
                manuscript_upload_list = list(manuscript_upload_list)
                LOGGER.info('writing to: %s (%d rows)',
                    external_manuscript_summary_output_table_name,
                    len(manuscript_upload_list)
                )
                load_json_list_and_append_to_bq_table_with_auto_schema(
                    add_provenance(manuscript_upload_list),
                    project_id=project_id,
                    dataset_name=output_dataset,
                    table_name=external_manuscript_summary_output_table_name
                )
        except Exception as e:
            LOGGER.error("Error processing manuscript: %s", e)
            raise e
print('done')