# PubMed Alzheimer Data Extraction and PDF Downloader

This notebook demonstrates how to download free full‑text PDFs from PubMed Central (PMC) using a list of PubMed IDs (PMIDs) and PMC IDs (PMCIDs) provided in a CSV file.

This notebook performs the following tasks:
- Queries PubMed for free full-text articles on Alzheimer's Disease (2020–2024)
- Filters for 'Clinical Trial' and 'Review' types
- Extracts metadata and writes it to CSV

Once the PubMed metadata info extraction is completed the workflow downloads the open-access full text articles in pdf format.

The following steps are outlined:

1. **Read** a CSV file containing metadata for Alzheimer’s disease articles.
2. **Filter** rows to retain only records published between 2020 and 2024 (inclusive).
3. **Retrieve** publication type information for each PMID using the NCBI Entrez API.
4. **Select** only those articles whose publication type list contains "Clinical Trial" or "Review" (case‑insensitive).
5. **Scrape** the PMC article page via PMCID scraping to find a direct link to the PDF.
6. **Download** full-text PDFs and save it local drive.

An NCBI API key is used to permit up to 10 requests per second【287421594061082†L135-L141】. Remember to replace the `EMAIL` constant with your own email address before running the notebook.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [47]:
!pip install biopython beautifulsoup4 requests



In [53]:
import csv
import os
import re
import time
import xml.etree.ElementTree as ET
from typing import Dict, Iterable, List, Tuple

import requests
from bs4 import BeautifulSoup
from Bio import Entrez

EMAIL = 'aritra.lahiri@torontomu.ca' #Insert your own email
API_KEY = 'xxxxxxx' #Insert your NCBI API key
WAIT_TIME = 0.12

Entrez.email = EMAIL
Entrez.api_key = API_KEY

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/115.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}

def publication_matches(pub_types: List[str], targets: List[str]) -> bool:
    pub_types_lower = [pt.lower() for pt in pub_types]
    targets_lower = [t.lower() for t in targets]
    return any(pt in pub_types_lower for pt in targets_lower)

def parse_pubmed_xml(record_xml: str) -> Dict[str, str]:
    try:
        root = ET.fromstring(record_xml)
    except Exception:
        return {}
    article = root.find('.//PubmedArticle')
    if article is None:
        return {}
    result = {
        'pmid': '', 'title': '', 'authors': [], 'first_author': '',
        'journal': '', 'volume': '', 'issue': '', 'pages': '',
        'pub_year': '', 'pub_date': '', 'create_date': '',
        'pmcid': '', 'nihms_id': '', 'doi': '', 'pub_types': [],
    }
    pmid_elem = article.find('.//MedlineCitation/PMID')
    if pmid_elem is not None and pmid_elem.text:
        result['pmid'] = pmid_elem.text.strip()
    title_elem = article.find('.//Article/ArticleTitle')
    if title_elem is not None and title_elem.text:
        result['title'] = title_elem.text.strip()
    journal_elem = article.find('.//Article/Journal/Title')
    if journal_elem is not None and journal_elem.text:
        result['journal'] = journal_elem.text.strip()
    volume_elem = article.find('.//Article/Journal/JournalIssue/Volume')
    if volume_elem is not None and volume_elem.text:
        result['volume'] = volume_elem.text.strip()
    issue_elem = article.find('.//Article/Journal/JournalIssue/Issue')
    if issue_elem is not None and issue_elem.text:
        result['issue'] = issue_elem.text.strip()
    pages_elem = article.find('.//Article/Pagination/MedlinePgn')
    if pages_elem is not None and pages_elem.text:
        result['pages'] = pages_elem.text.strip()
    pubdate_elem = article.find('.//Article/Journal/JournalIssue/PubDate')
    pubdate = ''
    if pubdate_elem is not None:
        year_elem = pubdate_elem.find('Year')
        month_elem = pubdate_elem.find('Month')
        day_elem = pubdate_elem.find('Day')
        year = year_elem.text.strip() if year_elem is not None and year_elem.text else ''
        month = month_elem.text.strip() if month_elem is not None and month_elem.text else ''
        day = day_elem.text.strip() if day_elem is not None and day_elem.text else ''
        if year and month and day:
            pubdate = f"{year} {month} {day}"
        elif year and month:
            pubdate = f"{year} {month}"
        elif year:
            pubdate = year
        else:
            medline_date = pubdate_elem.find('MedlineDate')
            if medline_date is not None and medline_date.text:
                pubdate = medline_date.text.strip()
        result['pub_date'] = pubdate
        m = re.search(r'(\d{4})', pubdate)
        if m:
            result['pub_year'] = m.group(1)
    datecreated_elem = article.find('.//MedlineCitation/DateCreated')
    if datecreated_elem is not None:
        y = datecreated_elem.find('Year').text.strip() if datecreated_elem.find('Year') is not None else ''
        mth = datecreated_elem.find('Month').text.strip() if datecreated_elem.find('Month') is not None else ''
        d = datecreated_elem.find('Day').text.strip() if datecreated_elem.find('Day') is not None else ''
        if y and mth and d:
            result['create_date'] = f"{y}/{mth.zfill(2)}/{d.zfill(2)}"
        elif y and mth:
            result['create_date'] = f"{y}/{mth.zfill(2)}"
        elif y:
            result['create_date'] = y
    authors = []
    for auth in article.findall('.//Article/AuthorList/Author'):
        lastname = auth.find('LastName').text.strip() if auth.find('LastName') is not None and auth.find('LastName').text else ''
        forename = auth.find('ForeName').text.strip() if auth.find('ForeName') is not None and auth.find('ForeName').text else ''
        initials = auth.find('Initials').text.strip() if auth.find('Initials') is not None and auth.find('Initials').text else ''
        name = ''
        if lastname and forename:
            name = f"{lastname} {forename}"
        elif lastname and initials:
            name = f"{lastname} {initials}"
        elif lastname:
            name = lastname
        elif forename:
            name = forename
        if name:
            authors.append(name)
    result['authors'] = authors
    result['first_author'] = authors[0] if authors else ''
    pubtypes = []
    for pt in article.findall('.//Article/PublicationTypeList/PublicationType'):
        if pt.text:
            pubtypes.append(pt.text.strip())
    result['pub_types'] = pubtypes
    for aid in article.findall('.//PubmedData/ArticleIdList/ArticleId'):
        id_type = aid.get('IdType')
        id_text = aid.text.strip() if aid.text else ''
        if not id_text:
            continue
        if id_type == 'pmc' or id_type == 'pmcid':
            result['pmcid'] = id_text.replace('PMC', '').replace('pmc', '').strip()
        elif id_type == 'nihms' or id_type == 'nihmsid':
            result['nihms_id'] = id_text
        elif id_type == 'doi':
            result['doi'] = id_text
    return result

def search_pubmed_and_save_csv(query: str, start_year: int, end_year: int, article_types: List[str], max_results: int, output_csv: str) -> None:
    date_query = f'("{start_year}"[Date - Publication] : "{end_year}"[Date - Publication])'
    term = f'({query}) AND {date_query} AND free full text[filter]'
    handle = Entrez.esearch(db='pubmed', term=term, retmax=max_results, usehistory='n')
    search_results = Entrez.read(handle)
    handle.close()
    pmids: List[str] = []
    total_count = 0
    try:
        pmids = search_results['IdList']
        total_count = int(search_results['Count'])
    except Exception:
        pmids = []
        total_count = 0
    print(f'Found {total_count} results; processing {len(pmids)} PMIDs')
    rows: List[Dict[str, str]] = []
    for pmid in pmids:
        try:
            fetch_handle = Entrez.efetch(db='pubmed', id=pmid, retmode='xml')
            record_xml = fetch_handle.read()
            fetch_handle.close()
        except Exception as exc:
            print(f'[!] Error fetching PMID {pmid}: {exc}')
            continue
        data = parse_pubmed_xml(record_xml)
        time.sleep(WAIT_TIME)
        if not data:
            continue
        if not data.get('pmcid'):
            continue
        pub_year = data.get('pub_year')
        if not pub_year:
            continue
        try:
            year_int = int(pub_year)
        except Exception:
            continue
        if year_int < start_year or year_int > end_year:
            continue
        if not publication_matches(data.get('pub_types', []), article_types):
            continue
        citation_parts: List[str] = []
        if data['journal']:
            citation_parts.append(data['journal'])
        if data['pub_date']:
            citation_parts.append(data['pub_date'])
        details = []
        if data['volume']:
            details.append(data['volume'])
        if data['issue']:
            details.append(f"({data['issue']})")
        if data['pages']:
            if details:
                details.append(f":{data['pages']}")
            else:
                details.append(data['pages'])
        if details:
            citation_parts.append(';'.join(details))
        citation = '; '.join(citation_parts)
        if data['doi']:
            citation = f"{citation}. doi: {data['doi']}"
        rows.append({
            'PMID': data['pmid'],
            'Title': data['title'],
            'Authors': '; '.join(data['authors']),
            'Citation': citation,
            'First Author': data['first_author'],
            'Journal/Book': data['journal'],
            'Publication Year': data['pub_year'],
            'Create Date': data['create_date'],
            'PMCID': f"PMC{data['pmcid']}",
            'NIHMS ID': data['nihms_id'],
            'DOI': data['doi'],
        })
    fieldnames = ['PMID','Title','Authors','Citation','First Author','Journal/Book','Publication Year','Create Date','PMCID','NIHMS ID','DOI']
    with open(output_csv, 'w', encoding='utf-8', newline='') as out:
        writer = csv.DictWriter(out, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)
    print(f'Saved {len(rows)} records to {output_csv}')


In [55]:
# Example usage (uncomment to run)
query= "Alzheimers disease"
start_year = 2020
end_year = 2024
article_types = ['clinical trial', 'review']
max_results = 500 # adjust as needed (e.g., 2000 or more)
output_csv = 'csv-alzheimers-set.csv'
search_pubmed_and_save_csv(query, start_year, end_year, article_types, max_results, output_csv)
# download_pdfs_from_csv(output_csv, start_year, end_year, article_types, output_dir='PMC_Filtered_PDFs')

Found 51860 results; processing 500 PMIDs
Saved 93 records to csv-alzheimers-set.csv


In [34]:
import csv
import os
import time
import xml.etree.ElementTree as ET

import requests
from bs4 import BeautifulSoup
from Bio import Entrez

# Configuration
EMAIL = 'aritra.lahiri@torontomu.ca' #Insert your own email
API_KEY = 'xxxxxxx' #Insert your NCBI API key
TARGET_PUB_TYPES = ['clinical trial', 'review']
YEAR_START = 2020
YEAR_END = 2024
DOWNLOAD_DIR = 'PMC_Filtered_PDFs'
WAIT_TIME = 0.12  # seconds between requests to respect rate limits【287421594061082†L135-L141】
CSV_FILE = '/content/csv-alzheimers-set.csv'

# Custom headers to mimic a browser and avoid 403 errors
HEADERS = {
    'User-Agent': (
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
        '(KHTML, like Gecko) Chrome/115.0 Safari/537.36'
    ),
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}

Entrez.email = EMAIL
Entrez.api_key = API_KEY

In [35]:
def load_filtered_records(csv_path):
    """Read CSV and return (PMID, PMCID) pairs filtered by publication year."""
    pairs = []
    with open(csv_path, newline='', encoding='utf-8-sig') as handle:
        reader = csv.DictReader(handle)
        for row in reader:
            year_str = row.get('Publication Year', '').strip()
            pmid = row.get('PMID', '').strip()
            pmcid = row.get('PMCID', '').strip()
            if not year_str or not pmid or not pmcid:
                continue
            try:
                year = int(year_str)
            except ValueError:
                continue
            if YEAR_START <= year <= YEAR_END:
                pmcid_clean = pmcid.replace('PMC', '')
                pairs.append((pmid, pmcid_clean))
    return pairs


def fetch_pub_types(pmid):
    """Fetch publication types for a PMID using Entrez."""
    try:
        handle = Entrez.efetch(db='pubmed', id=pmid, retmode='xml')
        xml_data = handle.read()
        handle.close()
        root = ET.fromstring(xml_data)
        types = []
        for elem in root.findall('.//PublicationType'):
            if elem.text:
                types.append(elem.text.strip())
        return types
    except Exception as exc:
        print(f'[!] Error retrieving publication types for PMID {pmid}: {exc}')
        return []
    finally:
        time.sleep(WAIT_TIME)


def publication_matches(pub_types):
    """Return True if any target publication type matches."""
    lower = [t.lower() for t in pub_types]
    for target in TARGET_PUB_TYPES:
        t_lower = target.lower()
        for t in lower:
            if t_lower in t:
                return True
    return False


def get_pdf_link(pmc_id):
    """Retrieve the PDF download URL for a given PMC article.
    Uses a custom User-Agent to avoid 403 errors and falls back to the
    legacy domain if the modern domain returns a 403. Checks the
    citation_pdf_url meta tag, then anchor text containing 'pdf', and
    finally any href ending with .pdf or containing '/pdf/'."""
    base_url = f'https://pmc.ncbi.nlm.nih.gov/articles/PMC{pmc_id}/'
    response = None
    try:
        response = requests.get(base_url, timeout=30, headers=HEADERS)
        response.raise_for_status()
    except requests.HTTPError:
        if response is not None and response.status_code == 403:
            alt_url = f'https://www.ncbi.nlm.nih.gov/pmc/articles/PMC{pmc_id}/'
            try:
                response = requests.get(alt_url, timeout=30, headers=HEADERS)
                response.raise_for_status()
                base_url = alt_url
            except requests.RequestException:
                print(f'[!] Error accessing PMC{pmc_id}: 403 Forbidden on both domains')
                return None
        else:
            print(f'[!] Error accessing PMC{pmc_id}: {response.status_code if response else "HTTP error"}')
            return None
    except requests.RequestException as exc:
        print(f'[!] Error accessing PMC{pmc_id}: {exc}')
        return None
    soup = BeautifulSoup(response.text, 'html.parser')
    meta_tag = soup.find('meta', attrs={'name': 'citation_pdf_url'})
    if meta_tag and meta_tag.get('content'):
        return meta_tag['content']
    pdf_tag = soup.find('a', string=lambda text: text and 'pdf' in text.lower())
    if pdf_tag and pdf_tag.get('href'):
        href = pdf_tag['href']
        if href.startswith('http'):
            return href
        if href.startswith('/'):
            domain = 'https://' + base_url.split('/')[2]
            return domain + href
        return base_url + href
    for link in soup.find_all('a', href=True):
        href = link['href']
        if href.lower().endswith('.pdf') or '/pdf/' in href.lower():
            if href.startswith('http'):
                return href
            if href.startswith('/'):
                domain = 'https://' + base_url.split('/')[2]
                return domain + href
            return base_url + href
    return None


def download_pdf(pdf_url, filename):
    """Download and save a PDF from a URL using custom headers."""
    try:
        response = requests.get(pdf_url, timeout=60, headers=HEADERS)
        response.raise_for_status()
    except requests.RequestException as exc:
        print(f'[!] Failed to download {pdf_url}: {exc}')
        return
    with open(filename, 'wb') as fh:
        fh.write(response.content)


def run_download(csv_path):
    records = load_filtered_records(csv_path)
    print(f'Loaded {len(records)} candidate records from {csv_path}.')
    if not os.path.exists(DOWNLOAD_DIR):
        os.makedirs(DOWNLOAD_DIR)
    count = 0
    for pmid, pmc_id in records:
        pub_types = fetch_pub_types(pmid)
        if not pub_types or not publication_matches(pub_types):
            continue
        filename = os.path.join(DOWNLOAD_DIR, f'PMC{pmc_id}.pdf')
        if os.path.exists(filename):
            print(f'[✓] Already downloaded: PMC{pmc_id}')
            continue
        pdf_url = get_pdf_link(pmc_id)
        time.sleep(WAIT_TIME)
        if pdf_url:
            print(f'[*] Downloading PMC{pmc_id} -> {pdf_url}')
            download_pdf(pdf_url, filename)
            count += 1
        else:
            print(f'[x] PDF not found for PMC{pmc_id}')
        time.sleep(WAIT_TIME)
    print(f'Done. Downloaded {count} PDFs into "{DOWNLOAD_DIR}".')

# Uncomment below to run the download.
run_download(CSV_FILE)

Loaded 2176 candidate records from /content/csv-alzheimers-set.csv.
[✓] Already downloaded: PMC12142702
[✓] Already downloaded: PMC11970185
[✓] Already downloaded: PMC12221384
[✓] Already downloaded: PMC11839357
[✓] Already downloaded: PMC11740155
[x] PDF not found for PMC11813243
[✓] Already downloaded: PMC11704942
[✓] Already downloaded: PMC11677798
[✓] Already downloaded: PMC11674444
[✓] Already downloaded: PMC11695123
[✓] Already downloaded: PMC11361754
[✓] Already downloaded: PMC11676485
[✓] Already downloaded: PMC11668837
[✓] Already downloaded: PMC11663918
[✓] Already downloaded: PMC11668108
[✓] Already downloaded: PMC11657227
[✓] Already downloaded: PMC11657461
[✓] Already downloaded: PMC11641710
[✓] Already downloaded: PMC11641728
[✓] Already downloaded: PMC11641227
[✓] Already downloaded: PMC11643865
[✓] Already downloaded: PMC11643850
[✓] Already downloaded: PMC11643945
[✓] Already downloaded: PMC11643887
[✓] Already downloaded: PMC11645901
[✓] Already downloaded: PMC1172634

KeyboardInterrupt: 