In [None]:
#!/usr/bin/env python

"""RadCLIP Pubmed Scraper"""

__author__ = "Christoper Alexander"
__copyright__ = "Copyright 2023"
__credits__ = ["Andrew D'Amico", "Christoper Alexander", "Katya Nosulko", "Vivek Chamala", "Matthew Conger"]
__license__ = ""
__version__ = "0.0.1"
__maintainer__ = "Andrew Damico"
__email__ = "andrew.damico@u.northwestern.edu"

In [1]:
import re
import unicodedata

from Bio import Entrez
from bs4 import BeautifulSoup

In [2]:
def strip_html_tags_from_string(my_string: str) -> str:
    soup = BeautifulSoup(my_string, 'html.parser')
    return soup.get_text()

In [3]:
def remove_tags_and_content(text):
    # This regular expression pattern will match both opening and closing HTML/XML tags,
    # as well as their content (including nested tags).
    pattern = r'<[^>]*>[^<]*</[^>]*>|<[^/>]+/>'

    # Use the re.sub function to replace all matches of the pattern with an empty string.
    cleaned_text = re.sub(pattern, '', text)

    return cleaned_text

In [4]:
def remove_tags_and_content2(text):
    soup = BeautifulSoup(text, 'html.parser')

    # Find all tags in the soup
    for tag in soup.find_all(True):
        # Extract the tag's parent and replace the tag with an empty string
        tag.extract()

    # Get the cleaned text
    cleaned_text = soup.get_text()

    return cleaned_text

In [5]:
url_pattern = r"http\S+"
url_regexp = re.compile(url_pattern)


def replace_urls_in_string(my_string: str) -> str:
    # Replace a URL with the string "URL" 
    return url_regexp.sub("URL", my_string)

In [6]:
def remove_special_chars(text):
    # Replace newline characters and carriage returns with spaces
    text = re.sub(r'[\n\r]', ' ', text)

    # Remove non-standard UTF elements (e.g., "\xa0", "\u2009") by replacing them with a space
    text = re.sub(r'[\u00A0\u2009\xa0]', ' ', text)

    # Replace multiple consecutive spaces with a single space
    text = re.sub(r'\s+', ' ', text).strip()

    return text

In [7]:
def strip_corrupt_utf_from_string(my_string: str) -> str:
    return ''.join([c if unicodedata.is_normalized('NFC', c) else ' ' for c in my_string])

In [8]:
punctuation_pattern = r'[-.,:;,!?\'"\[\]\(\)\{\}|\\`#\$%@^&*_+<>“”—’‘/]+'
punctuation_regexp = re.compile(punctuation_pattern)


def strip_punctuation_from_string(my_string: str) -> str:
    return punctuation_regexp.sub("", my_string)

In [9]:
numbers_pattern = r'\d+'
numbers_regexp = re.compile(numbers_pattern)


def strip_numbers_from_string(my_string: str) -> str:
    return numbers_regexp.sub("", my_string)

In [10]:
articles_pattern = '(\s+)(a|an|the)(\s+)'
articles_regexp = re.compile(articles_pattern, re.IGNORECASE)


def strip_articles_from_string(my_string: str) -> str:
    return articles_regexp.sub(" ", my_string)

In [11]:
prepositions_pattern = '(\s+)(about|above|across|after|against|along|among|around|at|away|before|behind|below|between|by|during|for|from|in|into|like|out|since|than|through|to|toward|under|until|upon|with|within|without)(\s+)'
prepositions_regexp = re.compile(prepositions_pattern, re.IGNORECASE)


def strip_prepositions_from_string(my_string: str) -> str:
    return prepositions_regexp.sub(" ", my_string)

In [12]:
def clean_doc(document: str) -> str:
    doc = remove_tags_and_content2(document)
    doc = doc.strip()
    doc = remove_special_chars(doc)
    doc = replace_urls_in_string(doc)
    doc = strip_corrupt_utf_from_string(doc)
    doc = strip_punctuation_from_string(doc)
    doc = strip_numbers_from_string(doc)
    doc = strip_articles_from_string(doc)
    doc = strip_prepositions_from_string(doc)
    return doc

In [13]:
BASE_QUERY = 'medline[sb] AND "open access"[filter]'

In [14]:
def search(query, db="pmc"):
    Entrez.email = "christopheralexander2023@u.northwestern.edu"
    new_query = f"{query} AND {BASE_QUERY}"
    handle = Entrez.esearch(db=db,
                            sort="relevance",
                            retmax="20",
                            retmode="xml",
                            term=new_query,
                            usehistory="y")
    results = Entrez.read(handle)
    return results


def fetch_details(results):
    ids = ','.join(results["IdList"])
    Entrez.email = "christopheralexander2023@u.northwestern.edu"
    handle = Entrez.efetch(db="pmc",
                           rettype="full",
                           retmode="xml",
                           id=ids,
                           webenv=results["WebEnv"],
                           query_key=results["QueryKey"])
    docs = Entrez.read(handle)
    return docs

In [15]:
def process_section(section):
    if not section.get("sec", []):
        return [section]

    result = []
    subs = section["sec"]
    if isinstance(subs, list):
        for sub in section["sec"]:
            result.extend(process_section(sub))
    else:
        return [subs]

    return result

In [16]:
def process_query(query):
    results = search(query)
    docs = fetch_details(results)

    processed_docs = []

    for i, doc in enumerate(docs):
        doc_dict = {}
        counter = 0
        for section in process_section(doc["body"]):
            title = section.get("title", "no_title")
            if not title:
                title = "no_title"
            sec_id = section.attributes.get("id", "no_id")
            string_elements = section["p"]
            if isinstance(string_elements, list):
                for string_element in section["p"]:
                    text = str(string_element)
                    sub_id = string_element.attributes.get("id", "no_sub_id")
                    doc_dict[(title, sec_id, sub_id, counter)] = clean_doc(text)
                    counter += 1
        processed_docs.append(doc_dict)
        print(f"processed {i + 1} docs")

    return processed_docs

In [17]:
test = process_query("cancer")

processed 1 docs
processed 2 docs
processed 3 docs
processed 4 docs
processed 5 docs
processed 6 docs
processed 7 docs
processed 8 docs
processed 9 docs
processed 10 docs
processed 11 docs
processed 12 docs
processed 13 docs
processed 14 docs
processed 15 docs
processed 16 docs
processed 17 docs
processed 18 docs
processed 19 docs
processed 20 docs




In [18]:
results = search("cancer")
docs = fetch_details(results)

In [19]:
docs[0]["body"]

{'sec': DictElement({'title': {}, 'p': DictElement({}, attributes={'id': 'para430'})}, attributes={'id': 'cesec80'})}