**Implementation: PubMed Research Paper Fetcher**

In [19]:
#Import Libraries
import csv
import logging
import re
import time
from typing import Dict, List, Optional, Tuple

import requests
from lxml import etree
from tqdm import tqdm

In [20]:
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

In [21]:
# Constants
PUBMED_API_BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
EMAIL_REGEX = r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"
COMPANY_KEYWORDS = [
    "pharmaceutical",
    "corp",
    "inc",
    "ltd",
    "corp",
    "gmbh",
    "biotech",
    "bv",
    "ag",
]
DELAY_SECONDS = 0.5  # Initial delay
MAX_RETRIES = 3
BATCH_SIZE = 10  # Number of PubMed IDs to fetch in a batch

In [22]:
#Define fetch_pubmed_ids Function
def fetch_pubmed_ids(query: str, debug: bool = False) -> List[str]:
    params = {
        "db": "pubmed",
        "term": query,
        "retmode": "xml",
        "retmax": 500,
        "rettype": "uilist",
    }
    response = requests.get(f"{PUBMED_API_BASE_URL}esearch.fcgi", params=params)
    if debug:
        logger.debug(f"Search API Response: {response.content}")
    response.raise_for_status()
    root = etree.fromstring(response.content)
    ids = [element.text for element in root.xpath("//Id")]
    return ids


In [23]:
#Define fetch_paper_details_batch Function
def fetch_paper_details_batch(pubmed_ids: List[str], debug: bool = False) -> List[Optional[Dict[str, Optional[str]]]]:
    """Fetches details of a batch of papers."""
    retries = 0
    while retries < MAX_RETRIES:
        try:
            params = {
                "db": "pubmed",
                "id": ",".join(pubmed_ids),
                "retmode": "xml",
                "rettype": "full",
            }
            response = requests.get(
                f"{PUBMED_API_BASE_URL}efetch.fcgi", params=params
            )
            response.raise_for_status()
            root = etree.fromstring(response.content)
            paper_details = []
            for article in root.xpath("//PubmedArticle"):
                try:
                    pubmed_id = article.xpath(".//PMID/text()")[0]
                    title = article.xpath(".//ArticleTitle/text()")[0]
                    pub_date = "/".join(article.xpath(".//PubDate/Year/text() | .//PubDate/Month/text() | .//PubDate/Day/text()"))
                    authors = article.xpath(".//AuthorList/Author")
                    affiliation_map: Dict[str, List[str]] = {}
                    for author in authors:
                        name_parts = author.xpath(".//LastName/text() | .//ForeName/text()")
                        if not name_parts:
                            continue
                        name = " ".join(name_parts)
                        affiliations = author.xpath(".//AffiliationInfo/Affiliation/text()")
                        affiliation_map[name] = affiliations
                    non_academic_authors, company_affiliations = identify_non_academic_authors(affiliation_map)
                    corresponding_author_email = find_corresponding_author_email(article, affiliation_map)
                    paper_details.append({
                        "PubmedID": pubmed_id,
                        "Title": title,
                        "Publication Date": pub_date,
                        "Non-academic Author(s)": ", ".join(non_academic_authors),
                        "Company Affiliation(s)": ", ".join(company_affiliations),
                        "Corresponding Author Email": corresponding_author_email,
                    })
                except IndexError as e:
                    logger.warning(f"Could not parse article: {e}")
                    paper_details.append(None)

            return paper_details
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                retries += 1
                delay = DELAY_SECONDS * (2**retries)
                logger.warning(f"Too Many Requests. Retrying in {delay} seconds...")
                time.sleep(delay)
            else:
                logger.warning(f"HTTP Error fetching batch: {e}")
                return [None] * len(pubmed_ids)
        except Exception as e:
            logger.warning(f"Error fetching batch: {e}")
            return [None] * len(pubmed_ids)
    logger.error(f"Failed to fetch batch after {MAX_RETRIES} retries.")
    return [None] * len(pubmed_ids)

In [24]:
#Define identify_non_academic_authors
def identify_non_academic_authors(affiliation_map: Dict[str, List[str]]) -> Tuple[List[str], List[str]]:
    non_academic_authors = []
    company_affiliations = []
    for author, affiliations in affiliation_map.items():
        for affiliation in affiliations:
            if any(keyword in affiliation.lower() for keyword in COMPANY_KEYWORDS):
                non_academic_authors.append(author)
                company_affiliations.append(affiliation)
                break
    return non_academic_authors, company_affiliations

In [25]:
#Find_corresponding_author_email Functions
def find_corresponding_author_email(root: etree.Element, affiliation_map: Dict[str, List[str]]) -> Optional[str]:
    emails = []
    for author in root.xpath("//AuthorList/Author"):
        if author.xpath(".//AffiliationInfo/Affiliation/text()"):
            for affil in author.xpath(".//AffiliationInfo/Affiliation/text()"):
                emails.extend(re.findall(EMAIL_REGEX, affil))
    if emails:
        return emails[0]
    return None

In [26]:
#Define run_pubmed_query Function and Execute
def run_pubmed_query(query: str, output_file: str = "pubmed_results.csv", debug: bool = False):
    pubmed_ids = fetch_pubmed_ids(query, debug)
    total_ids = len(pubmed_ids)
    results = []
    start_time = time.time()

    with tqdm(total=total_ids, desc="Processing papers") as pbar:
        for i in range(0, total_ids, BATCH_SIZE):
            batch = pubmed_ids[i:i + BATCH_SIZE]
            paper_details_batch = fetch_paper_details_batch(batch, debug)
            for paper_details in paper_details_batch:
                if paper_details:
                    results.append(paper_details)
            time.sleep(DELAY_SECONDS)
            pbar.update(len(batch))

    end_time = time.time()
    elapsed_time = end_time - start_time
    logger.info(f"Processing completed in {elapsed_time:.2f} seconds.")

    with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
        fieldnames = [
            "PubmedID",
            "Title",
            "Publication Date",
            "Non-academic Author(s)",
            "Company Affiliation(s)",
            "Corresponding Author Email",
        ]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for row in results:
            writer.writerow(row)
    logger.info(f"Results saved to {output_file}")
    return output_file

if __name__ == "__main__":
    query = "cancer immunotherapy"
    output_file = "pubmed_results.csv"
    debug = False
    run_pubmed_query(query, output_file, debug)
    print(f"Results saved to {output_file}")
    from google.colab import files
    files.download(output_file)

Processing papers: 100%|██████████| 500/500 [00:55<00:00,  9.00it/s]

Results saved to pubmed_results.csv





<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>