# MedlinePlus Data Extraction for DISNET

## Overview
This project implements an automated data extraction pipeline to integrate phenotypic information from MedlinePlus into the DISNET platform. It systematically identifies disease-related Patient Handouts from MedlinePlus Health Topics XML, maps them to Unified Medical Language System (UMLS) disease identifiers (CUIs), and extracts relevant symptom data.

## Workflow
The implemented pipeline includes the following steps:

1. **XML Parsing and Filtering**:
   - Parse MedlinePlus Health Topic XML.
   - Identify relevant Patient Handouts using Apache Spark, filtering by:
     - `information-category` containing both "Patient Handouts" and "Encyclopedia".
     - `organization` exactly matching "Medical Encyclopedia".

2. **Disease Mapping**:
   - Extract Patient Handout titles.
   - Map titles to UMLS CUIs using the UMLS API and verify disease categorization via the local UMLS Knowledge Base.

3. **Symptom Extraction**:
   - Fetch HTML content for each Patient Handout.
   - Extract the "Symptoms" section using BeautifulSoup by identifying specific HTML headers.
   - Capture all paragraphs and bullet points within the Symptoms section.

4. **Symptom CUI Mapping**:
   - Map extracted symptom bullet points to UMLS CUIs.
   - Verify if the mapped terms are symptoms using the UMLS API and Knowledge Base.


## Output Data Structure
Extracted results are stored in an Excel file structured as follows:

| Column                   | Description                                                  |
|--------------------------|--------------------------------------------------------------|
| `site_title`             | Title of Patient Handout                                     |
| `url`                    | Direct URL to the Patient Handout                            |
| `disease_cui`            | UMLS Concept Unique Identifier of disease                    |
| `disease_name`           | Standardized name of the disease                             |
| `symptom_text`           | Textual description of symptoms                              |
| `symptom_bullet_points`  | Extracted symptoms as bullet points annotated with CUIs      |

## Accessing the Implementation
The complete implementation is publicly accessible on GitHub:

[https://github.com/dasiemens-coder/ds_seminar_medline-extraction.git](https://github.com/dasiemens-coder/ds_seminar_medline-extraction.git)

## Final Dataset
The final dataset (Excel format) is available at:

`https://doi.org/10.5281/zenodo.15088456`


In [1]:
import os
import requests
from bs4 import BeautifulSoup
import csv
import sys
import time
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, lower, array_contains, udf
from pyspark.sql.types import ArrayType, StringType
import pandas as pd
from pyspark.sql.functions import col, isnan, when, count

sys.setrecursionlimit(10000)

csv.field_size_limit(sys.maxsize) # needed to read large csv files

# Paths to local XML and DTD files
DTD_FILE = "input/mplus_topics_2012-06-01.dtd"
XML_FILE = "input/mplus_topics_2025-03-04.xml" 

# Path to local metathesaurus files
MRCONSO = "umls_metathesaurus/MRCONSO.RRF"
MRSTY = "umls_metathesaurus/MRSTY.RRF"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("XMLProcessing") \
    .master("local[*]") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.15.0") \
    .getOrCreate()


25/03/26 00:50:48 WARN Utils: Your hostname, Daviss-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.18 instead (on interface en0)
25/03/26 00:50:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/davis/VSCode/DS%20Seminars/.venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/davis/.ivy2/cache
The jars for the packages stored in: /Users/davis/.ivy2/jars
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-af0bd6ed-881d-404d-8085-3fcde14cc77b;1.0
	confs: [default]
	found com.databricks#spark-xml_2.12;0.15.0 in central
	found commons-io#commons-io;2.11.0 in local-m2-cache
	found org.glassfish.jaxb#txw2;3.0.2 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.3.0 in central
:: resolution report :: resolve 205ms :: artifacts dl 9ms
	:: modules in use:
	com.databricks#spark-xml_2.12;0.15.0 from central in [default]
	commons-io#commons-io;2.11.0 from local-m2-cache in [default]
	org.apache.ws.xmlschema#xmlschema-core;2.3.0 from central in [default]
	org.glassfish.jaxb#txw2;3.0.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf  

In [2]:
def get_diseases_from_umls():
    """
    Reads UMLS MRSTY.RRF and MRCONSO.RRF files in /umls_metathesaurus to extract diseases (excluding symptoms)
    and returns a list of tuples, where each tuple is (CUI, preferred English name).
    """
    # Build a mapping of each CUI to its set of semantic types
    cui_semtypes = {}
    with open(MRSTY, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='|')
        for row in reader:
            if len(row) > 3:
                cui = row[0].strip()
                semtype = row[3].strip()
                cui_semtypes.setdefault(cui, set()).add(semtype)
    
    # Create a set of CUIs that have "Disease or Syndrome" and do NOT have "Sign or Symptom"
    disease_cuis = {cui for cui, types in cui_semtypes.items()
                    if 'Disease or Syndrome' in types and 'Sign or Symptom' not in types}
    
    # Now, map each filtered CUI to its preferred English name using MRCONSO.RRF.
    diseases = {}
    with open(MRCONSO, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='|')
        for row in reader:
            # MRCONSO.RRF columns:
            # 0: CUI, 1: LAT (language), 2: TS, 3: LUI, 4: STT, 5: SUI, 6: ISPREF,
            # 7: AUI, 8: SAUI, 9: SCUI, 10: SDUI, 11: SAB, 12: TTY, 13: CODE,
            # 14: STR (the term), 15: SRL, 16: SUPPRESS, 17: CVF
            if row[0].strip() in disease_cuis and row[1].strip() == 'ENG' and row[6].strip() == 'Y':
                diseases[row[0].strip()] = row[14].strip()
    
    # Return as a list of tuples: (CUI, disease name)
    return list(diseases.items())

In [3]:
def get_signs_symptoms_from_umls():
    """
    Reads UMLS MRSTY.RRF and MRCONSO.RRF files and extracts all concepts 
    with the semantic type "Sign or Symptom". It returns a list of tuples,
    where each tuple is (CUI, preferred English name).

    Returns:
        list: A list of tuples (CUI, preferred English name).
    """
    # Build a set of CUIs that are assigned the semantic type "Sign or Symptom"
    sign_symptom_cuis = set()
    with open(MRSTY, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='|')
        for row in reader:
            # In MRSTY.RRF, the 4th column (index 3) is the semantic type name.
            if len(row) > 3 and row[3].strip() == 'Sign or Symptom':
                sign_symptom_cuis.add(row[0].strip())
    
    # Map each of those CUIs to its preferred English name using MRCONSO.RRF.
    signs_symptoms = {}
    with open(MRCONSO, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='|')
        for row in reader:
            # MRCONSO.RRF columns:
            # 0: CUI, 1: LAT (language), 2: TS, 3: LUI, 4: STT, 5: SUI, 6: ISPREF,
            # 7: AUI, 8: SAUI, 9: SCUI, 10: SDUI, 11: SAB, 12: TTY, 13: CODE,
            # 14: STR (the term), 15: SRL, 16: SUPPRESS, 17: CVF
            if (row[0].strip() in sign_symptom_cuis and 
                row[1].strip() == 'ENG' and 
                row[6].strip() == 'Y'):
                signs_symptoms[row[0].strip()] = row[14].strip()
    
    # Return as a list of tuples: (CUI, preferred English name)
    return list(signs_symptoms.items())

In [4]:
#Retrieve the dictionary of all symtoms and diseases 
#and broadcast it to all nodes in the spark cluster for better performance    
diseas_dict_df_bc = spark.sparkContext.broadcast(get_diseases_from_umls())
symptoms_dict_df_bc = spark.sparkContext.broadcast(get_signs_symptoms_from_umls())

25/03/26 00:51:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [5]:
def search_for_disaes_cui(name):
    """
    Performs an API call to the UMLS API to search the CUI for a given name.
    Then checks UMLS database if the CUI is a disease.
    API doc: https://documentation.uts.nlm.nih.gov/rest/search/

    Args:
        name (str): The name to search for.
    
    Returns:
        tuple: (CUI, disease name) if a matching disease is found; None otherwise.
    """
    load_dotenv()  # Load environment variables (including API_KEY)
    base_url = "https://uts-ws.nlm.nih.gov/rest/search/current"
    params = {
        'string': name,
        # TODO: Replace with your own API key if necessary.
        'apiKey': os.getenv('API_KEY'),
        'pageNumber': 1,  # Retrieve only the first result.
        'pageSize': 1
    }
    
    max_retries = 3
    retry_delay = 2  # seconds
    response = None
    
    for attempt in range(max_retries):
        try:
            response = requests.get(base_url, params=params, timeout=10)
            # If the server returns a 5xx error, raise an HTTPError to trigger a retry.
            if 500 <= response.status_code < 600:
                raise requests.exceptions.HTTPError(f"Server error: {response.status_code}")
            response.raise_for_status()  # Raise an error for 4xx responses.
            break  # Request was successful; exit the retry loop.
        except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.Timeout) as err:
            # If it's the last attempt, print the error and return None.
            if attempt == max_retries - 1:
                print(f"Error fetching UMLS data for '{name}': {err}")
                return None
            # Otherwise, wait a bit and then retry.
            time.sleep(retry_delay)
    
    data = response.json()
    
    # Extract UI values from the API response.
    ui_values = [result.get('ui') for result in data.get('result', {}).get('results', [])]
    
    # Use the broadcasted lookup list for diseases.
    # bc_diseases_df.value is assumed to be a list of tuples, e.g., [(CUI, Preferred_Name), ...]
    matched_rows = [row for row in diseas_dict_df_bc.value if row[0] in ui_values]
    matching_disease = (matched_rows[0][0], matched_rows[0][1]) if matched_rows else None
    
    return matching_disease

In [6]:
def search_for_symptom_cui(name):
    """
    Performs an API call to the UMLS API to search the CUI for the given name.
    Then checks the UMLS database if the CUI is a symptom.
    API documentation: https://documentation.uts.nlm.nih.gov/rest/search/

    Args:
        name (str): The name to search for.
    
    Returns:
        tuple: (CUI, symptom name) if a matching symptom is found; None otherwise.
    """
    load_dotenv()  # Load environment variables (including API_KEY)

    base_url = "https://uts-ws.nlm.nih.gov/rest/search/current"
    params = {
        'string': name,
        # Replace with your own API key if needed.
        'apiKey': os.getenv('API_KEY'),
        'pageNumber': 1,
        'pageSize': 1
    }

    max_retries = 3
    retry_delay = 2  # seconds
    response = None

    for attempt in range(max_retries):
        try:
            response = requests.get(base_url, params=params, timeout=10)
            # If the server returns a 5xx error, force an exception to trigger a retry.
            if 500 <= response.status_code < 600:
                raise requests.exceptions.HTTPError(f"Server error: {response.status_code}")
            response.raise_for_status()  # Raise an error for 4xx responses.
            break  # Request was successful; exit the retry loop.
        except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.Timeout) as err:
            if attempt == max_retries - 1:
                print(f"Error fetching UMLS symptom data for '{name}': {err}")
                return None
            time.sleep(retry_delay)

    data = response.json()
    # Extract the UI values from the API response.
    ui_values = [result.get('ui') for result in data.get('result', {}).get('results', [])]
    
    # Use the broadcasted Spark DataFrame for lookup.
    # Use the broadcasted lookup list for diseases.
    # bc_diseases_df.value is assumed to be a list of tuples, e.g., [(CUI, Preferred_Name), ...]
    matched_rows = [row for row in symptoms_dict_df_bc.value if row[0] in ui_values]
    matching_symptom = (matched_rows[0][0], matched_rows[0][1]) if matched_rows else None
    
    return matching_symptom 

In [7]:
def extraxct_html(url, cui, download=False, folder="html"):
    """
    reads html file from the given site a

    Args:
        url (str): The URL to fetch.
        folder (str): The folder where the .html file will be saved.
        download (bool): Whether to download the .html file or not.

    Returns:
        str: The full file path of the saved HTML file.
    """
    # save each CUI in a separate folder
    folder_cui = os.path.join(folder, cui)
    os.makedirs(folder_cui, exist_ok=True)

    file_path = os.path.join(folder_cui, url)

    # Send the GET request
    response = requests.get(url)
    response.raise_for_status()  # Raise an error if request failed

    # Write the response text (HTML content) to a file
    if download:
        with open(file_path, "w", encoding="utf-8") as f:
            f.write(response.text)

    return file_path, response.text

In [8]:
def extract_symptoms_udf(site):
    
    # Send the GET request
    response = requests.get(site)
    response.raise_for_status()  # Raise an error if the request failed
    html_content = response.text
    
    # Parse the HTML content
    soup = BeautifulSoup(html_content, "html.parser")
    
    # Find the section with <h2>Symptoms</h2>
    target_section = None
    for h2_tag in soup.find_all("h2"):
        if h2_tag.get_text(strip=True).lower() == "symptoms":
            parent_section = h2_tag.find_parent("section")
            if parent_section:
                target_section = parent_section
                break

    if not target_section:
        not_found = f"No 'Symptoms' section found in Site: {site}"
        print(not_found)
        return ["", ""]  # No "Symptoms" section found

    # Extract all text from the target section
    symptoms_text = target_section.get_text(" ", strip=True)
    
    
    # Extract bullet points and join them into a single string
    bullet_points = [li.get_text(" ", strip=True) for li in target_section.find_all("li")]
    
    # Try to map each bullet point to a symptom CUI
    mapped_bullet_points = []
    for bullet_point in bullet_points:
        result = search_for_symptom_cui(bullet_point)
        if result is not None:
            cui, name = result
            bullet_point = f"{bullet_point} [CUI: {cui}, Name: {name}]"
        else:
            bullet_point = f"{bullet_point} [CUI: None Found]"
        mapped_bullet_points.append(bullet_point)
        
    mapped_bullet_points = " | ".join(mapped_bullet_points)
    
    return [symptoms_text, mapped_bullet_points]

In [9]:

# Initialize Spark session
spark = SparkSession.builder \
    .appName("XMLProcessing") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.15.0") \
    .getOrCreate()

# Load the XML file. Replace 'your_xml_file.xml' with your file path.
# Assume each <health-topic> element is a row.
df = spark.read.format("xml") \
    .options(rowTag="health-topic") \
    .load(XML_FILE)
df = df.repartition(8)
print(f"DataFrame length: {df.count()}")

df_sites = df.select(explode("site").alias("site"))

# Extract required fields from each <site> element.
df_sites = df_sites.select(
    col("site._title").alias("site_title"),
    col("site._url").alias("url"),
    col("site._language-mapped-url").alias("language_mapped_url"),
    col("site.information-category").alias("information_categories"),
    col("site.organization").alias("organization")
)
# Filter for sites meeting these conditions:
# - Contains both 'Patient Handouts' and 'Encyclopedia' in information_categories.
# - The organization is exactly "Medical Encyclopedia".
df_filtered = df_sites.filter(
    array_contains(col("information_categories"), "Patient Handouts") &
    array_contains(col("information_categories"), "Encyclopedia") &
    array_contains(col("organization"), "Medical Encyclopedia")
)
df_filtered = df_filtered.repartition(8)
# Create a UDF to search for the disease CUI based on the site's title.
search_udf = udf(search_for_disaes_cui, ArrayType(StringType()))

# Apply the UDF to obtain disease information.
df_with_disease = df_filtered.withColumn("disease_info", search_udf(col("site_title")))

# Filter out rows where no disease info was found.
df_with_disease = df_with_disease.filter(col("disease_info").isNotNull())

# Split the disease_info array into separate columns: disease_cui and disease_name.
df_with_disease = df_with_disease.withColumn("disease_cui", col("disease_info").getItem(0)) \
    .withColumn("disease_name", col("disease_info").getItem(1))

# Sort the results alphabetically by title (case-insensitive).
df_final = df_with_disease.orderBy(lower(col("site_title")))

# Show the final DataFrame.
df_final = df_final.select(
    "site_title", "url", "language_mapped_url",
    "information_categories", "organization",
    "disease_cui", "disease_name"
)
df_final.show(truncate=False)

extract_symptoms = udf(extract_symptoms_udf, ArrayType(StringType()))

# Apply the UDF to obtain symptoms information.
df_final = df_final.withColumn("symptoms", extract_symptoms(col("url")))

# Split the "symptoms" column into "symptom_text" and "symptom_bullet_points"
df_final = df_final.withColumn("symptom_text", col("symptoms").getItem(0)) \
                   .withColumn("symptom_bullet_points", col("symptoms").getItem(1))

# drop columns that are not needed 
df_final = df_final.drop("symptoms", "information_categories", "organization", "language_mapped_url")

# Show the final DataFrame with symptoms.
df_final.show(truncate=False)

# Save the final DataFrame to an Excel file in the /output folder
output_excel_path = "output/medline_extraction_final_version.xlsx"


#drop duplicates that happened due to parallel processing
df_final = df_final.dropDuplicates()

# Count total rows
total_count = df_final.count()
print(f"Total rows after dropping duplicates: {total_count}")

# Count NaN/Null values and empty strings per column
nan_counts = df_final.select([
    count(when(isnan(c) | col(c).isNull() | (col(c) == ""), c)).alias(c) for c in df_final.columns
])
print("NaN/Null values per column:")
nan_counts.show(truncate=False)

# Show the final DataFrame
#df_final.show(truncate=False)


try:
    df_final.toPandas().to_excel(output_excel_path, index=False)
    print(f"DataFrame successfully saved to Excel at {output_excel_path}")
except Exception as e:
    print(f"Failed to save DataFrame to Excel. Error: {e}")
    output_csv_path = "output/medline_extraction_final_version.csv"
    df_final.toPandas().to_csv(output_csv_path, index=False, sep='~')
    print(f"DataFrame successfully saved to CSV at {output_csv_path}")
spark.stop()

                                                                                

DataFrame length: 2033


                                                                                

+-----------------------------------+-----------------------------------------------+-------------------------------------------------------+--------------------------------+----------------------+-----------+-----------------------------------------------------+
|site_title                         |url                                            |language_mapped_url                                    |information_categories          |organization          |disease_cui|disease_name                                         |
+-----------------------------------+-----------------------------------------------+-------------------------------------------------------+--------------------------------+----------------------+-----------+-----------------------------------------------------+
|Abscess                            |https://medlineplus.gov/ency/article/001353.htm|https://medlineplus.gov/spanish/ency/article/001353.htm|[Patient Handouts, Encyclopedia]|[Medical Encyclopedia]|C0000833   

No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001353.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/003219.htm
                                                                                

+-----------------------------------+-----------------------------------------------+-----------+-----------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/002270.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/patientinstructions/000595.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001223.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001105.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/000293.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/003043.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001355.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/007447.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001439.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/000163.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/patientinstructions/000050.htm
No 'Symptoms' section found in Site: http

Total rows after dropping duplicates: 697
NaN/Null values per column:


No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/002270.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/patientinstructions/000595.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001223.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001105.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/000293.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/003043.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001355.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/007447.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001439.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/000163.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/patientinstructions/000050.htm
No 'Symptoms' section found in Site: http

+----------+---+-----------+------------+------------+---------------------+
|site_title|url|disease_cui|disease_name|symptom_text|symptom_bullet_points|
+----------+---+-----------+------------+------------+---------------------+
|0         |0  |0          |0           |39          |82                   |
+----------+---+-----------+------------+------------+---------------------+



No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/002270.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/patientinstructions/000595.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001223.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001105.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/000293.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/003043.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001355.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/007447.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/001439.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/article/000163.htm
No 'Symptoms' section found in Site: https://medlineplus.gov/ency/patientinstructions/000050.htm
No 'Symptoms' section found in Site: http

DataFrame successfully saved to Excel at output/medline_extraction_final_version.xlsx
