# Parser

> Functionalities to parse the different information comming from pubmed results

In [None]:
#| default_exp parser

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
#| export
from Bio import Entrez
import sys
import pandas as pd
import xlsxwriter
from datetime import datetime, timedelta, date
from collections import defaultdict, Counter
import  pickle
from fastcore.all import *
from dotenv import load_dotenv


In [None]:
from pubmed_lib.core import *
from pubmed_lib.data import *



In [None]:
#| export
#| hide
regex = re.compile(("([a-z0-9!#$%&'*+\/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+\/=?^_`"
                    "{|}~-]+)*(@|\sat\s)(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?(\.|"
                    "\sdot\s))+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?)"))
reg_email = re.compile("[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
db_name = ''

In [None]:
#| export
def parsePubmedData(pubmeddata):
    """
    Receive the xml section of PubmedData and return list of ids
    :param pubmeddata:
    :return:
    """
    # accepted = {x.attributes['PubStatus']: x['Year'] for x in pubmeddata['History']}
    # print(accepted)
    ids = {x.attributes['IdType']: str(x) for x in pubmeddata['ArticleIdList']}
    # print(ids)
    # accepted.update(ids)
    return ids

In [None]:
#| export
def parseArticle(article_info):
    """

    :param article_info: dictionary from key Article of an Medline citation
    :return (dict): tuple of dictionary with information from paper and autors
    """
    # Extract information about paper content
    title = article_info['ArticleTitle']
    journal = article_info['Journal']['Title']
    published_date = article_info['Journal']['JournalIssue']['PubDate']
    if 'Year' in published_date:
        published = published_date['Year']
    elif 'MedlineDate' in published_date:
        try:
            published = re.findall(r'\d\d\d\d',published_date['MedlineDate'])[0]
        except:
            published = published_date['MedlineDate'][:4]
    try:
#         print(article_info)
        abstract = '. '.join(article_info['Abstract']['AbstractText'])
    except:
        abstract = ''
    try:
        autorlist = article_info['AuthorList']
    except:
        print('no autors found, jumping next')
        autorlist = []
    return {'abstract': abstract, 'autorlist': autorlist, 'title': title, 'journal': journal,
            'published':published}

In [None]:
#| export
def parse_email(affil_text):

    """
    Find email from given string
    :param affil_text:
    :return str:
    """
    match = re.search(r'[\w.-]+@[\w.-]+', affil_text)
    if match is not None:
        email = match.group()
        email = email.strip('.;,')
    else:
        email = ''
    return email

In [None]:
#| export
def parseMayorKeys(citationInfo):
    keywordList = citationInfo['KeywordList']
    
    if len(keywordList) == 0:
        (mayorMesh, minorMesh) = parseMeshKeys(citationInfo)
        mayorMesh.extend(minorMesh)
        keys = mayorMesh
    else:
        keys = [str(x) for x in keywordList[0] if x.attributes['MajorTopicYN'] == 'Y']
        # keys.extend(mayorMesh)
        # keys.extend(minorMesh)
    return keys

# def parseMayorKeys(citationInfo):
#     keywordList = citationInfo['KeywordList']
#     if len(keywordList) == 0:
#         return []
#     else:
#         return [str(x) for x in keywordList[0] if x.attributes['MajorTopicYN'] == 'Y']

In [None]:
#| export
def parseMeshKeys(citationInfo):
    if 'MeshHeadingList' not in citationInfo.keys():
        return [], []
    meshKeys = citationInfo['MeshHeadingList']
    mayorkeys = [str(x['DescriptorName']) for x in meshKeys if x['DescriptorName'].attributes['MajorTopicYN']=='Y']
    minorKeys = [str(x['DescriptorName']) for x in meshKeys if x['DescriptorName'].attributes['MajorTopicYN']=='N']
    for x in [mayorkeys, minorKeys]:
        if x is None:
            x = []
    return mayorkeys, minorKeys

In [None]:
#| export
# def parseMeshKeys(citationInfo):
#     if 'MeshHeadingList' not in citationInfo.keys():
#         return [], []
#     meshKeys = citationInfo['MeshHeadingList']
#     mayorkeys = [str(x['DescriptorName']) for x in meshKeys if x['DescriptorName'].attributes['MajorTopicYN']=='Y']
#     minorKeys = [str(x['DescriptorName']) for x in meshKeys if x['DescriptorName'].attributes['MajorTopicYN']=='N']
#     return mayorkeys, minorKeys

In [None]:
#| export
def parseKeys(citationInfo):
    return parseMayorKeys(citationInfo), parseMeshKeys(citationInfo)

In [None]:
res = searchpb('Daniel Maturana', email=email, api_key=api_key)
pubs = fetch_details(res['IdList'])

In [None]:
def getParsedArticles(name, years = 3):
    query = name + '[Author]'
    results = searchpb(query, 100, maxdate = 2021, mindate = 2021 - years)
    id_list = results['IdList']
    if len(id_list) == 0:
        return 0
    papers = fetch_details(id_list)
    n_papers = len(id_list)
    print('checking in {} Articles'.format(n_papers))
    articles=[]
    for i, paperinfo in enumerate(papers['PubmedArticle']):
        article = parse_paperinfo(paperinfo)
        if int(article['published']) < 2020 - years:
            # print('to old, article published on {}'.format(article['published']))
            continue
        articles.append(article)
    print('Keeping with {} from last {} years'.format(len(articles), years))
    return articles

def getParsedArticlesPeriod(name, maxdate=2020, years = 3, top_n=None, verbose=False):
    query = name + '[Author]'
    results = searchpb(query, 1000, maxdate = maxdate, mindate = maxdate - years)
    id_list = results['IdList']
    if len(id_list) == 0:
        return ([],0)
    papers = fetch_details(id_list)
    n_papers = len(id_list)
    if verbose:
        print('checking in {} Articles'.format(n_papers))
    articles=[]
    for i, paperinfo in enumerate(papers['PubmedArticle']):
        article = parse_paperinfo(paperinfo)
        if maxdate < int(article['published'])  or int(article['published']) < maxdate -years :
            # print('to old, article published on {}'.format(article['published']))
            continue
        articles.append(article)
    if len(articles) == 0:
        if verbose:
            print('No articles in the time period')
        return ([],n_papers)
    elif top_n:
        df = pd.DataFrame(articles).sort_values('published', ascending=False)
        df = df.iloc[:top_n]
        articles = df.to_dict('records')
    if verbose:
        print('Keeping with {} from last {} years'.format(len(articles), years))
    return (articles, n_papers)


def fetchPubmedArticles(name, start, end, path, db_path = '/Volumes/Users/matu/Documents/Xcode/SFDC/db.pckl'):
    """Function to search in pubmed by name, start and end year.
    It checks first in the database of abstracts downloaded before.
    Create a csv file with the parsed pubmed results including abstract, authors, etc. (look at pubmed_utils)

    return (pd.Dataframe) -> the DataFrame with all the information retrieved"""
    db = loadDB(db_path)
    if name not in db:
        print('adding new year {} for {}'.format(start, name))
        (pubmedData, total) = getParsedArticlesPeriod(name, start, end)
        if pubmedData == 0:
            db.update({name: {str(start):[total, 0]}})
            return 
        else:
            db.update({name: {str(start):[total, len(pubmedData)]}})
    else:
        if (str(start) in db[name]) and (str(end) in db[name]):
            print(f"{name} already in DB with year {start} - {end}, passing")
            df = pd.read_csv('{}/{}_{}_{}.csv'.format(path, name, start, start - end))
            return df
        else:
            (pubmedData, total) = getParsedArticlesPeriod(name, start, end)
            if pubmedData == 0:
                db[name].update({str(start):[total, 0]})
                return 
            else:
                db[name].update({str(start):[total, len(pubmedData)]})
    df = pd.DataFrame(pubmedData)
    file_output = '{}/{}_{}_{}.csv'.format(path, name, start, start - end)
    df.to_csv(file_output)
    saveDB(db, db_path)
    if df.shape[0] >= 10:
        df = df.sort_values('published', ascending=False)
        print('Using the 10 newer papers')
        return df.iloc[:10]
    return df

def retrieveArticles():
    results = searchpb('Peter Ihnat[Author]')
    papers = fetch_details(results['IdList'])
    articles=[]
    for i, paperinfo in enumerate(papers['PubmedArticle']):
        article = parse_paperinfo(paperinfo)
        articles.append(article)
    return articles


In [None]:
#| export 
def parse_paperinfo(
    paperinfo_xml:str #Information 
    ):
    """
    :param paperinfo_xml:
    :return:
    """
    PubmedData = parsePubmedData(paperinfo_xml['PubmedData'])
    article_xml = parseArticle(paperinfo_xml['MedlineCitation']['Article'])
    mayorKeys, (mayorMeshKeys, minorMeshKeys) = parseKeys(paperinfo_xml['MedlineCitation'])
    article_xml['mayorKeys'] = mayorKeys
    article_xml['mayorMesh'] = mayorMeshKeys
    article_xml['minorMesh'] = minorMeshKeys
    try:
        autorlist = []
        for author_xml in article_xml['autorlist']:
            if author_xml.attributes['ValidYN'] == 'N':
                continue
            autor_dict = parse_author_xml(author_xml)
            if autor_dict is None:
                continue
            autorlist.append(autor_dict)
            # article_xml['autorlist'][i] = autor_dict
    except:
        print('ERROR: parsing author {}'.format(author_xml))
    finally:
        article_xml['autorlist'] = autorlist
        PubmedData.update(article_xml)
    return PubmedData
    

In [None]:
#| export
def parse_author_xml(autor_xml):
    """
    (dict)->dict
    Receive un diccionario con las informaciones de autor proveniente de pubmed xml article

    :param autor_xml:
    :return:
    """
    # Return false if no author information found
    if 'CollectiveName' in autor_xml:
        return
    # try to parse information from XML
    try:
        #get Identifier (only orcid is used now so if they have identifier it should be the first value
        if len(autor_xml['Identifier']) > 0:
            autorID = autor_xml['Identifier'][0]
        else:
            autorID = ''
        #Get the affilaition details from that author, if he had
        if len(autor_xml['AffiliationInfo']) > 0:
            AFFs = ';'.join([affiliationinfo['Affiliation'] for affiliationinfo in autor_xml['AffiliationInfo']])
        else:
            AFFs = ''
        #Retrieving the name information, it is a must and should exist
        autorFN = autor_xml['ForeName']
        autorLN = autor_xml['LastName']
        autorIN = autor_xml['Initials']
        name = autorFN + ' ' + autorLN
        #Start parsing or retrieving information for country, email, company, institute from affiliation
        country_name, state = find_country(AFFs)
        emails = parse_email(AFFs)
        data = {'Fname': autorFN, 'Lname': autorLN, 'emails': emails, 'affiliations': AFFs, 'countries': country_name,
                'identifier': autorID, 'name': name, 'n_papers': 0, 'updated': date.today().strftime('%d-%m-%Y'),
                'state': state, 'initials': autorIN}
        return data

    except ValueError:
        print('not possible to get info value error')
        return
    except OSError as err:
        print("OS Error: {0}".format(err))
        return
    except:
        print('error en parsing')
        return

In [None]:
#| export
def find_country(location):
    """
    Find country from string
    """
    if len(location) == 0:
        return '', ''
    location_lower = location.lower()
    for country in COUNTRY:
        for c in country:
            if c in location_lower:
                if country[0] == 'brazil':
                    state = find_state(location)
                else:
                    state = ''
                return country[0], state
    return '', ''

def find_state(location):
    """
    (str)->str
    Find state of Brazl from the affiliation details
    :param location:
    :return:
    """
    location_lower = location
    for state in BR_STATES:
        if state in location_lower:
                return state.lstrip(' ')
    return ''
