# Helper functions

Run this notebook to import helper functions for tutorial.


In [None]:
import re
import urllib.request
from pathlib import Path
from collections import defaultdict
import os
from metapub.convert import doi2pmid
import requests
from ratelimit import limits, sleep_and_retry
from bs4 import BeautifulSoup
import json
import lxml.etree as ET
import subprocess
import copy
import numpy as np
import pandas as pd

In [None]:
# Environment variable
%env NCBI_API_KEY="6667a919224612da1287d74ff0d3f7b5e208"

# Regex format of DOI links, mutations, blocks, and literature type
doi_pattern = r'https:\/\/doi\.org\/[\w/.-]+'
mutation_pattern = r'(.*)?\n\n'
block_pattern = r'(?:(?<=\n\n)|^)(.+?)(?=\n\n|\Z)'
literature_pattern = r'(?<=\[)(.*?)(?=\])'
url_pattern_alone = r'https:\/\/[^\s]+'
url_pattern = r'https?:\/\/[\w\/.%()-]+(?=\s*\[[^\]]*\])'
url_and_lit_pattern = r'https?:\/\/[\w\/.%()-]+\s+\[(.*?)\]'

In [None]:
# Categorize entries in Pokay
def recategorize_pokay(directory):
    # Dictionary of doi to BioC JSON files 
    publication_bioc = {}
    grey_bioc = {}
    rxiv_bioc = {}

    # Dictionary of doi to pokay mutation summaries from that article
    publication_key = defaultdict(list)
    grey_key = defaultdict(list)
    rxiv_key = defaultdict(list)
    
    # Retrieve all files from Pokay directory
    files = Path(directory).glob('*/*')
    
    # Iterate through all files in the pokay directory
    for file in files:
        with open(file, 'r') as f:
        
            # Read file
            file_contents = f.read()
    
            # Find all mutations
            # mutations = re.findall(mutation_pattern, file_contents)
    
            # Find all text blocks
            text_blocks = re.findall(block_pattern, file_contents, re.DOTALL)
    
            # Iterate through all text blocks
            for text in text_blocks:
                
                # Find article types
                article_type = re.findall(url_and_lit_pattern, text)

                # Find url links
                matches = re.findall(url_pattern, text)

                # If no article type provided, check format of the link
                if len(article_type)==0:
                    url = re.search(url_pattern_alone, text)

                    if url:
                        doi = re.search(doi_pattern, url.group())
                        
                        # Check if it is preprint
                        if doi:
                            rxiv_key[doi.group()].append(text)
                            rxiv_bioc[doi.group()] = None
                        # Otherwise, grey literature
                        else:
                            grey_key[url.group()].append(text)
                            grey_bioc[url.group()] = None
                    continue
                
                for i in range(len(article_type)):
    
                    if "Journal publication" in article_type[i]:
                        # Search for the DOI of the publication
                        # doi = re.search(doi_pattern, text).group()
                        doi = matches[i]
                        publication_key[doi].append(text)
                        publication_bioc[doi] = None
                        
                    elif "Preprint" in article_type[i]:
                        # Check if new DOI is provided
                        # doi = re.search(doi_pattern, article_type[-1])
                        doi = re.search(doi_pattern, article_type[i])
                        
                        # Check if Rxiv is now published
                        if doi is not None:
                            publication_key[doi.group()].append(text)
                            publication_bioc[doi.group()] = None
        
                        # Store as Rxiv
                        else:
                            # doi = re.search(doi_pattern, text)
                            doi = matches[i]
                            # DOI link provided
                            if doi is not None:
                                # rxiv_key[doi.group()].append(text)
                                # rxiv_bioc[doi.group()] = None
                                rxiv_key[doi].append(text)
                                rxiv_bioc[doi] = None
                            # DOI link not provided
                            else: 
                                # rxiv_key[re.search(url_pattern, text).group()].append(text)
                                # rxiv_bioc[re.search(url_pattern, text).group()] = None
                                print("special case")
    
                    # Check if the article is grey literature
                    elif "Grey literature" in article_type[i]:
                        # Search for url link
                        # url = re.search(url_pattern, text)
                        url = matches[i]
                        if url is not None:
                            # grey_key[url.group()].append(text)
                            # grey_bioc[url.group()] = None
                            grey_key[url].append(text)
                            grey_bioc[url] = None
                    
                    # All other groups categorize as grey literature
                    # else:
                    #     url = re.search(url_pattern, text)
                    #     if url is not None:
                    #         grey_key[url.group()].append(text)
                    #         grey_bioc[url.group()] = None

    return publication_bioc, publication_key, rxiv_bioc, rxiv_key, grey_bioc, grey_key

In [None]:
# Obtain BioC JSON file from PMID or PMC with a maximum of 3 API calls per second
@sleep_and_retry
@limits(calls=3, period=1)
def get_pubtator_bioc_json(id):
    # API link for BioC
    url = "https://www-ncbi-nlm-nih-gov.ezproxy.lib.ucalgary.ca/research/bionlp/RESTful/pmcoa.cgi/BioC_json/" + str(id) + "/unicode"
    bioc = requests.get(url, allow_redirects=True)

    if bioc.status_code != 200:
        raise ConnectionError('could not download {}\nerror code: {}'.format(url, bioc.status_code))
        return None

    if bioc.content.decode('utf-8') == '[]':
        return None
    
    return (bioc.content.decode('utf-8'))

In [None]:
bioc = get_pubtator_bioc_json(9005165)
print(bioc)

In [None]:
# Obtain PMID ID from DOI link with a maximum of 3 API calls per second
@sleep_and_retry
@limits(calls=3, period=1)
def get_pmid(doi):
    # pmid = doi2pmid(doi)
    doi_part = doi.split('doi.org/')[-1]

    # Api link for paper details
    api_link = 'https://www-ncbi-nlm-nih-gov.ezproxy.lib.ucalgary.ca/pmc/utils/idconv/v1.0/?tool=doi2pmid&email=david.yang1@ucalgary.ca&ids=' + doi_part
    paper = requests.get(api_link)
    soup = BeautifulSoup(paper.content, "xml")
        
    pmid = soup.find('record')['pmid']
    return pmid

In [None]:
# Get metadata of Rxiv paper
@sleep_and_retry
@limits(calls=1, period=1)
def get_rxiv_details(doi, is_biorxiv):
    doi_part = doi.split('doi.org/')[-1]
    
    if is_biorxiv:
        api_link = 'https://api.biorxiv.org/details/biorxiv/' + doi_part
    else:
        api_link = 'https://api.medrxiv.org/details/medrxiv/' + doi_part
    
    preprint_details = requests.get(api_link)
    
    if preprint_details.status_code != 200:
        raise ConnectionError('could not download {}\nerror code: {}'.format(api_link, preprint_details.status_code))
        return None
    
    return preprint_details.content

In [None]:
# Get PMID of Rxiv paper
@sleep_and_retry
@limits(calls=3, period=1)
def get_rxiv_pmid(doi, is_biorxiv):
    details = get_rxiv_details(doi,is_biorxiv).decode('utf-8')
    pmid = None
    
    # Load the JSON data
    data = json.loads(details)
    title = data['collection'][0]['title']
    modified_title = title.replace(" ", "%20")
    pubmed_link = "http://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&retmode=json&retmax=1000&term=" + modified_title + "&field=title"
    
    data_json = requests.get(pubmed_link).content.decode('utf-8')
    data = json.loads(data_json)
    pmid = data['esearchresult']['idlist'][0]

    return pmid

In [None]:
def get_rxiv_published_doi(details):
    data = json.loads(details)

    # Check if it is published
    if "published" in data["collection"][0]:
        doi = "https://doi.org/" + data['collection'][0]['published']
        return doi
    else:
        return None 

In [None]:
def get_rxiv_jats_xml(details):
    data = json.loads(details)
    
    # Grab the JATS XML
    jatsxml_url = data['collection'][0]['jatsxml']
    jats_xml = requests.get(jatsxml_url).content.decode('utf-8')
    return jats_xml

In [None]:
def convert_jatsxml_to_html(input_file, output_file):
    # dom = ET.parse(input_file)
    dom = ET.fromstring(input_file)

    # XSL style sheet
    xslt = ET.parse('../data/other/jats-to-html.xsl')
    transform = ET.XSLT(xslt)
    newdom = transform(dom)
    newdom.write_output(output_file)

In [None]:
def command_line_call(call):
    x = call.split(" ")
    subprocess.run(x)

In [None]:
def get_journal_publication_bioc(dict, isPmidDict = False):
    count = 0
    unk_dict = {}
    for key in dict:
        rxiv = False
        count += 1 

        if count > 1000:
            break
        
        bioc = None
        
        try:
            # Convert to PMID
            if isPmidDict:
                pmid = key
            else:
                pmid = get_pmid(key)
            bioc = get_pubtator_bioc_json(pmid)
            if bioc == '[]':
                bioc = None
        except:
            # Alternative way to get bioc
            # print(key)
            bioc = None
            pass

        if bioc is None:
            try:
                pmid = doi2pmid(doi)
                bioc = get_pubtator_bioc_json(pmid)
                if bioc == '[]':
                    bioc = None
            except:
                # Alternative way to get bioc
                # print(key)
                bioc = None
                pass

        # Check if it is a preprint
        if bioc is None or bioc == '[]':
            unk_dict[key] = None
            continue

        dict[key] = bioc

    for key in unk_dict:
        dict.pop(key, None)
        

    return dict, unk_dict

In [None]:
def get_rxiv_bioc(dict):
    count = 0
    unk_dict = {}
    for key in dict:
        bioc = None
        converted = False
        biorxiv = True
        details = None
        
        count += 1

        if count > 100:
            break
        
        # Get PMID as BioRxiv
        try:
            # Convert to PMID
            pmid = get_rxiv_pmid(key, is_biorxiv=True)
            bioc_temp = get_pubtator_bioc_json(pmid)
            if bioc_temp != '[]' and bioc_temp is not None:
                    bioc = bioc_temp
                    converted = True
        except:
            # cannot_convert_m1 += 1
            converted = False
            # print("fail bio pmid")
            pass
    
        # Get PMID as MedRxiv
        if converted == False:
            try:
                # Convert to PMID
                pmid = get_rxiv_pmid(key, is_biorxiv=False)
                bioc_temp = get_pubtator_bioc_json(pmid)
                if bioc_temp != '[]' and bioc_temp is not None:
                    bioc = bioc_temp
                    converted = True
            except:
                # cannot_convert_m1 += 1
                converted = False
                # print("fail med pmid")
                pass

        if converted == False:
        # Convert to PMID then BioC using another tool
            try:
                # Convert to DOI then PMID then BioC
                pmid = get_pmid(key)
                bioc_temp = get_pubtator_bioc_json(pmid)
                if bioc_temp != '[]' and bioc_temp is not None:
                    bioc = bioc_temp
                    converted = True
            except:
                # cannot_convert_m2 += 1
                converted = False
                # print("no doi")
                pass
    
        # Successful, goto next key
        if bioc and bioc != "[]":
            dict[key] = bioc
            # print("done")
            continue
    
        # Try to get details as BioRxiv
        try: 
            details = get_rxiv_details(key, is_biorxiv=True).decode('utf-8')
    
        except:
            biorxiv = False
            converted = False
            # print("fail bio detail")
    
        if details:
            status = json.loads(details)
            status = status['messages'][0]['status']
    
            if status != 'ok': 
                biorxiv = False
        
        # Try to get details as MedRxiv
        if biorxiv == False:
            try:
                details = get_rxiv_details(key, is_biorxiv=False).decode('utf-8')
            except:
                # print("fail med detail")
                continue

        if converted == False:
        # Convert to PMID then BioC using another tool
            try:
                # Convert to DOI then PMID then BioC
                doi = get_rxiv_published_doi(details)
                pmid = get_pmid(doi)
                bioc_temp = get_pubtator_bioc_json(pmid)
                if bioc_temp != '[]' and bioc_temp is not None:
                    bioc = bioc_temp
                    converted = True
            except:
                # cannot_convert_m2 += 1
                converted = False
                # print("no published doi")
                pass
    
        # Retreive JATS XML then convert to HTML for later conversions
        if converted == False or bioc == "[]":
            try:
                jats_xml = get_rxiv_jats_xml(details)
                file_name = key.split('doi.org/')[-1]
                # Replace . in DOI with -
                file_name = file_name.replace(".", "-")
                # Replace / in DOI with _
                file_name = file_name.replace("/", "_")
                output_file = '../data/pokay/processed/html/' + file_name + ".html"
                convert_jatsxml_to_html(jats_xml, output_file)
                bioc = "converting"
            except:
                bioc = None
                # print("fail jats")
        
        # Check if it is a preprint
        if bioc is None or bioc == '[]':
            unk_dict[key] = None
            continue
    
        dict[key] = bioc

    for key in unk_dict:
        dict.pop(key, None)

    return dict, unk_dict

In [None]:
def get_file_name(key):
    doi = re.search(doi_pattern, key)

    if doi is not None:
        file_name = key.split('doi.org/')[-1]
    else:
        key = key.split('https://')[-1]
        file_name = key

    # Replace . in DOI with -
    file_name = file_name.replace(".", "-")
    # Replace / in DOI with _
    file_name = file_name.replace("/", "_")
    # file_name += ".pdf"

    return file_name

In [None]:
import re
import json
import random
import copy
from bioc import biocjson
import pandas as pd
import pypdf
import os

In [None]:
# Regular expressions
one_letter_aa_change = r'\b([ARNDCQEGHILKMFPSTWYV])([1-9]+\d*)(del|(?!\1)[ARNDCQEGHILKMFPSTWYV])\b'
# three_letter_aa_change = r'\b(?:ALA|ARG|ASN|ASP|CYS|GLN|GLU|GLY|HIS|ILE|LEU|LYS|MET|PHE|PRO|SER|THR|TRP|TYR|VAL)[1-9]+\d*(?:ALA|ARG|ASN|ASP|CYS|DEL|GLN|GLU|GLY|HIS|ILE|LEU|LYS|MET|PHE|PRO|SER|THR|TRP|TYR|VA|DEL)\b'
# three_letter_aa_change = r'\b((?:ALA|ARG|ASN|ASP|CYS|GLN|GLU|GLY|HIS|ILE|LEU|LYS|MET|PHE|PRO|SER|THR|TRP|TYR|VAL))(([1-9]+\d*)(?!\1)(?:ALA|ARG|ASN|ASP|CYS|DEL|GLN|GLU|GLY|HIS|ILE|LEU|LYS|MET|PHE|PRO|SER|THR|TRP|TYR|VAL)\b'
three_letter_aa_change = r'\b((?:ALA|ARG|ASN|ASP|CYS|GLN|GLU|GLY|HIS|ILE|LEU|LYS|MET|PHE|PRO|SER|THR|TRP|TYR|VAL))([1-9]+\d*)(?!(\1))(ALA|ARG|ASN|ASP|CYS|DEL|GLN|GLU|GLY|HIS|ILE|LEU|LYS|MET|PHE|PRO|SER|THR|TRP|TYR|VAL)\b'
genome_change = r'\bg\.[ATGCU][1-9]+\d*[ATGCU]\b'
genome_change_alt =  r'\bg\.[1-9]+\d*[ATGCU]\>[ATGCU]\b'

In [None]:
def check_dictionary(d):
    print("size: " + str(len(d)))
    for key in d:
        if d[key] is None:
            print("None: " + key)
    
        if d[key] == "converting":
            print("Converting: " + key)

In [None]:
# Remove papers that are in pokay database

def related_paper(paper):
    try:
        doi = paper["passages"][0]['infons']['article-id_doi']
        
        if doi in pokay_data:
            return True
            
    except:
        return False

    return False

# filtered_papers_copy = [x for x in filtered_papers if not related_paper(x)]

In [None]:
# Function to grab subsample from data
def subset_sample(original, n):
    sub = []
    df = copy.deepcopy(original)
    random.seed(42)
    random.shuffle(df)
    
    for i in range(n):
        entry = df.pop(-1)
        sub.append(entry)

    return df, sub

In [None]:
# Function to un-nest data. Example JSON file will contain Key1: {Key 2: {Key3: Val}}  
def extract_nested_elements(input_string):
    elements = []
    start = 0
    brace_count = 0
    inside_element = False

    for i, char in enumerate(input_string):
        if char == '{':
            if brace_count == 0:
                start = i
                inside_element = True
            brace_count += 1
        elif char == '}':
            brace_count -= 1
            if brace_count == 0 and inside_element:
                elements.append(input_string[start:i+1])
                inside_element = False

    return elements

In [None]:
# Extract from training data (litcovid portion), passes DOI, output is dictionary 
def litcovid_text_extract(data):
    count = 0
    out = {}
    for paper in data:
        try:
            passage = paper["passages"]
            pmid = paper["pmid"]
        except:
            count += 1
            continue

        text = ""
        
        for section in passage:
            try:
                text += section['text']
        
            except:
                pass

            if text[-1].isalnum(): 
                text += ". "
            else:
                text += " "

        out[pmid] = text
        
    # print(count)
    return out

In [None]:
# Grab file name from DOI
def get_file_name(key):
    doi_pattern = r'https:\/\/doi\.org\/[\w/.-]+'
    doi = re.search(doi_pattern, key)

    if doi is not None:
        file_name = key.split('doi.org/')[-1]
    else:
        key = key.split('https://')[-1]
        file_name = key

    # Replace . in DOI with -
    file_name = file_name.replace(".", "-")
    # Replace / in DOI with _
    file_name = file_name.replace("/", "_")
    # file_name += ".pdf"

    return file_name

In [None]:
# text extract of JSON from pubtator API
def pubtator_extract(paper):
    text = ""
    paper = paper[1:-1]

    try:
        bioc_list = extract_nested_elements(paper)
        
        bioc_collection = biocjson.loads(bioc_list[-1])
        
    except:
        return None

    for document in bioc_collection.documents:    
        for passage in document.passages:
            try:
                text += passage.text
                
            except:
                pass

            if text[-1].isalnum(): 
                text += ". "
            else:
                text += " "
   
    if text == "":
        return None

    return text

In [None]:
# text extract of JSON from conversions of JATS XML
def jats_extract(paper):
    text = ""
    
    try:
        paper_copy = paper[1:-1]
        bioc_collection = biocjson.loads(paper_copy)

    except:
        try:
            bioc_collection = biocjson.loads(paper)
        except:
            return None

    for document in bioc_collection.documents:    
        for passage in document.passages:
            try:
                text += passage.text
            except:
                pass

            if text[-1].isalnum(): 
                text += ". "
            else:
                text += " "

    if text == "":
        return None

    return text

In [None]:
# text extract of JSON from conversions of PDF
def pdf_extract(data):
    text = ""

    try:
        bioc_collection = biocjson.loads(paper)

    except:
        return None
        
    for document in bioc_collection.documents:    
        for passage in document.passages:
            try:
                text += passage.text
            except:
                pass

            if text[-1].isalnum(): 
                text += ". "
            else:
                text += " "

    if text == "":
        return None

    return text

In [None]:
# Extract pokay text from each individual pokay paper
def pokay_text_extract(paper):
    text_extracted = False
    text = ""
    
    if paper is not None:
        # Try to extract as pubtator
        try:
            text = pubtator_extract(paper)

            if text is not None:
                text_extracted = True
                pokay_text.append(text)
        except:
            pass

        if text_extracted:
            return text

        # Try to extract as JATS
        try:
            text = jats_extract(paper)

            if text is not None:
                text_extracted = True
                pokay_text.append(text)
        except:
            pass

        if text_extracted:
            return text

        # Try to extract as PDF
        try:
            text = pdf_extract(paper)

            if text is not None:
                text_extracted = True
                pokay_text.append(text)
        except:
            pass

    else:
        file = get_file_name(key)
        file = "/home/david.yang1/autolit/viriation/data/raw/pdf/unconverted/" + file + ".pdf"
        isExist = os.path.exists(file) 
        if isExist:
            print(file)
            reader = pypdf.PdfReader(file)
    
            for page in reader.pages:
                text += page.extract_text()

    return text

In [None]:
# Final function to handle all cases for pokay data
def pokay_extract(data):
    pokay_text = []

    for key in data:
        paper = data[key]
        text_extracted = False
        text = ""
        
        if paper is not None:
            # Try to extract as pubtator
            try:
                text = pubtator_extract(paper)
    
                if text is not None:
                    text_extracted = True
                    pokay_text.append(text)
            except:
                pass
    
            if text_extracted:
                continue
    
            # Try to extract as JATS
            try:
                text = jats_extract(paper)
    
                if text is not None:
                    text_extracted = True
                    pokay_text.append(text)
            except:
                pass
    
            if text_extracted:
                continue
    
            # Try to extract as PDF
            try:
                text = pdf_extract(paper)
    
                if text is not None:
                    text_extracted = True
                    pokay_text.append(text)
            except:
                pass
    
        else:
            file = get_file_name(key)
            file = "/home/david.yang1/autolit/viriation/data/raw/pdf/unconverted/" + file + ".pdf"
            isExist = os.path.exists(file) 
            if isExist:
                print(file)
                reader = pypdf.PdfReader(file)
        
                for page in reader.pages:
                    text += page.extract_text()
        
                if text != "":
                    pokay_text.append(text)
    
    return pokay_text

In [None]:
# Label each text in dataframe
def get_label(df, column):
    label_input=[]
    for i in df[column]:
        print(i)
        
        while True:
            label = int(input('Does this relate to viral variants? (0 = No, 1 = Yes)'))
            if label == 0 or label == 1:
                break
            
        label_input.append(label)
        print(" ")
    df['label'] = label_input
    return df