# MT Samples Data
> Liam Barrett, 16 July, 2024

Notebook to connect up to chrome and automatically retrieve ENT clinically letters from the openly available [MT Samples](https://mtsamples.com/) website.

## Import libraries for auto-extraction

In [14]:
# Import necessary libraries
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException

from functools import wraps
import time
import re

from bs4 import BeautifulSoup
from tqdm import tqdm

## Define functions for auto-extraction

In [18]:
def check_element_exists(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        driver = args[0]  # Assuming the first argument is always the driver
        xpath = kwargs.get('xpath') or args[1]  # Get xpath from kwargs or args
        
        try:
            # Wait for the element to be present
            WebDriverWait(driver, 20).until(
                EC.presence_of_element_located((By.XPATH, xpath))
            )
        except TimeoutException:
            raise ElementNotFoundException(f"Element with XPath '{xpath}' not found on the page.")
        
        return func(*args, **kwargs)
    return wrapper

class ElementNotFoundException(Exception):
    """Raised when an element is not found on the page."""
    pass

@check_element_exists
def click_element(driver, xpath):
    """Click an element on the page given its XPath."""
    element = driver.find_element(By.XPATH, xpath)
    element.click()

def accept_cookies(driver):
    """Accept all cookies by clicking the 'Accept All' button."""
    click_element(driver, "//button[@id='ez-accept-all']")
    print("Cookies accepted.")

def set_dropdown_value(driver, dropdown_xpath, value):
    """Set the value of a dropdown after ensuring it's available and interactive."""
    max_attempts = 5
    for attempt in range(max_attempts):
        try:
            # Wait for the dropdown to be present and visible
            dropdown = WebDriverWait(driver, 10).until(
                EC.visibility_of_element_located((By.XPATH, dropdown_xpath))
            )
            
            # Create a Select object
            select = Select(dropdown)
            
            # Select by visible text
            select.select_by_visible_text(str(value))
            
            print(f"Successfully set dropdown to value '{value}'")
            return
        except (TimeoutException, NoSuchElementException, StaleElementReferenceException) as e:
            if attempt < max_attempts - 1:
                print(f"Attempt {attempt + 1} failed. Retrying...")
            else:
                print(f"Failed to set dropdown value after {max_attempts} attempts.")
                raise e

def clean_text(text):
    """Remove HTML tags, &nbsp;, and extra whitespace from text."""
    text = re.sub('<[^<]+?>', '', text)
    text = text.replace('&nbsp;', ' ')
    text = ' '.join(text.split())
    return text.strip()

def clean_value(key, value):
    """Clean the value by removing duplicated key at start and truncating at 'See More Samples'."""
    if value is None:
        return None
    
    # Remove key from the beginning of the value if present
    if value.startswith(key):
        value = value[len(key):].strip()
    
    # Truncate at 'See More Samples'
    see_more_index = value.find('See More Samples')
    if see_more_index != -1:
        value = value[:see_more_index].strip()
    
    return value

def extract_page_info(driver, url):
    """Extract information from the page and return as a dictionary."""
    info_dict = {'url': url}
    
    try:
        # Find the div containing the information
        info_div = driver.find_element(By.XPATH, "/html/body/main/div/div/div[2]/div[2]")
        
        # Get the HTML content of the div
        html_content = info_div.get_attribute('innerHTML')
        
        # Parse the HTML content
        soup = BeautifulSoup(html_content, 'html.parser')
        
        current_key = None
        current_value = []
        
        for element in soup.descendants:
            if element.name == 'b':
                # If we have a previous key-value pair, add it to the dictionary
                if current_key:
                    value = ' '.join(current_value).strip()
                    info_dict[current_key] = clean_value(current_key, value) if value else None
                
                # Start a new key
                current_key = clean_text(element.text)
                current_value = []
            elif element.name == 'br':
                # Add a space for line breaks
                current_value.append(' ')
            elif element.name == 'div' and 'row my-2' in element.get('class', []):
                # We've reached the end of the relevant content
                break
            elif isinstance(element, str) and element.strip():
                # Add non-empty text to the current value
                current_value.append(clean_text(element))
        
        # Add the last key-value pair if it exists
        if current_key:
            value = ' '.join(current_value).strip()
            info_dict[current_key] = clean_value(current_key, value) if value else None
        
    except Exception as e:
        print(f"An error occurred while extracting information: {str(e)}")
    
    return info_dict

def iterate_table_links(driver, table_xpath):
    """
    Iterate through all rows in the table, open each link in a new tab,
    extract information, and then close the tab. Display progress with a progress bar.
    """
    # Find all rows in the table
    rows = driver.find_elements(By.XPATH, f"{table_xpath}/tbody/tr")
    
    all_page_info = []
    
    # Create a progress bar
    pbar = tqdm(total=len(rows), desc="Processing pages", unit="page")
    
    for row in rows:
        try:
            # Find the link in the current row
            link = row.find_element(By.XPATH, ".//a")
            
            # Get the link's href attribute
            href = link.get_attribute('href')
            
            # Open link in a new tab
            driver.execute_script(f"window.open('{href}', '_blank');")
            
            # Switch to the new tab (it's the last one in the list of window handles)
            driver.switch_to.window(driver.window_handles[-1])
            
            # Wait for the page to load
            WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.TAG_NAME, "body"))
            )
            
            # Extract information from the page
            page_info = extract_page_info(driver, driver.current_url)
            all_page_info.append(page_info)
            
            # Close the current tab
            driver.close()
            
            # Switch back to the main tab
            driver.switch_to.window(driver.window_handles[0])
            
            # Update the progress bar
            pbar.update(1)
            
            # Optional: add a small delay to prevent overwhelming the server
            time.sleep(1)
            
        except Exception as e:
            print(f"An error occurred while processing a row: {str(e)}")
            # If an error occurs, make sure we're back on the main tab
            if len(driver.window_handles) > 1:
                driver.switch_to.window(driver.window_handles[0])
    
    # Close the progress bar
    pbar.close()
    
    return all_page_info
# Usage example:
def interact_with_page(driver):
    # Accept cookies
    accept_cookies(driver)
    
    # Set dropdown value to show 100 entries
    dropdown_xpath = "//*[@id='tblSamples_length']/label/select"
    set_dropdown_value(driver, dropdown_xpath, '100')
    
    # Wait for the table to load or update
    table_xpath = "//*[@id='tblSamples']"
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, table_xpath))
    )
    
    # Iterate through table links and extract information
    all_page_info = iterate_table_links(driver, table_xpath)
    
    # Print summary of extracted information
    print(f"\nExtracted information from {len(all_page_info)} pages.")
    print("First few entries:")
    for page_info in all_page_info[:5]:  # Print first 5 entries as an example
        print(page_info)
    print("...")
    return all_page_info

## Run auto-extraction

In [2]:
# Set up Chrome options
chrome_options = Options()
chrome_options.add_argument("--start-maximized")  # Starts the browser maximized
chrome_options.add_argument("--disable-extensions")  # Disables browser extensions
chrome_options.add_argument("--disable-popup-blocking")  # Disables popup blocking

In [52]:
# Main execution
if __name__ == "__main__":
    # Assume driver is already initialized
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)
    driver.get("https://mtsamples.com/site/pages/browse.asp?type=100-ENT%20-%20Otolaryngology")
    
    all_page_info_test = interact_with_page(driver)
    
    # Don't forget to close the driver when you're done
    driver.quit()

Cookies accepted.
Successfully set dropdown to value '100'


Processing pages: 100%|██████████| 99/99 [02:27<00:00,  1.49s/page]


Extracted information from 99 pages.
First few entries:
{'url': 'https://mtsamples.com/site/pages/sample.asp?Type=100-ENT%20-%20Otolaryngology&Sample=2740-Adenoidectomy%20&%20Tonsillectomy%20&%20Lingual%20Frenulectomy', 'Medical Specialty:': 'ENT - Otolaryngology', 'Sample Name:': 'Adenoidectomy & Tonsillectomy & Lingual Frenulectomy', 'Description:': 'Adenoidectomy and tonsillectomy and lingual frenulectomy. Chronic adenotonsillitis and ankyloglossia.   (Medical Transcription Sample Report)', 'PREOPERATIVE DIAGNOSES:': '1. Chronic adenotonsillitis.   2. Ankyloglossia', 'POSTOPERATIVE DIAGNOSES:': '1. Chronic adenotonsillitis.   2. Ankyloglossia', 'PROCEDURE PERFORMED:': '1. Adenoidectomy and tonsillectomy.   2. Lingual frenulectomy.', 'ANESTHESIA:': 'General endotracheal.', 'FINDINGS/SPECIMEN:': 'Tonsil and adenoid tissue.', 'COMPLICATIONS:': 'None.', 'CONDITION:': 'The patient is stable and tolerated the procedure well, and sent to PACU.', 'HISTORY OF PRESENT ILLNESS:': 'This is a 3




In [54]:
all_page_info_test[1]

{'url': 'https://mtsamples.com/site/pages/sample.asp?Type=100-ENT%20-%20Otolaryngology&Sample=1968-Adenoidectomy%20-%201',
 'Medical Specialty:': 'ENT - Otolaryngology',
 'Sample Name:': 'Adenoidectomy - 1',
 'Description:': 'Adenoidectomy. Adenoid hypertrophy. The McIvor mouth gag was placed in the oral cavity and the tongue depressor applied.   (Medical Transcription Sample Report)',
 'PREOPERATIVE DIAGNOSIS:': 'Adenoid hypertrophy.',
 'POSTOPERATIVE DIAGNOSIS:': 'Adenoid hypertrophy.',
 'PROCEDURE PERFORMED:': 'Adenoidectomy.',
 'ANESTHESIA:': 'General endotracheal.',
 'DESCRIPTION OF PROCEDURE:': 'The patient was taken to the operating room and prepped and draped in the usual fashion after induction of general endotracheal anesthesia. The McIvor mouth gag was placed in the oral cavity and the tongue depressor applied. Two #12-French red rubber Robinson catheters were placed, 1 in each nasal passage, and brought out through the oral cavity and clamped over a dental gauze roll placed

## Tidy up extracted letters

In [55]:
def process_dictionary_keys(dictionary):
    """
    Process the keys of a dictionary to remove colons and convert to lowercase.
    
    Args:
    dictionary (dict): The dictionary to process.
    
    Returns:
    dict: A new dictionary with processed keys.
    """
    processed_dict = {}
    for key, value in dictionary.items():
        # Remove colon and convert to lowercase
        new_key = key.rstrip(':').lower()
        processed_dict[new_key] = value
    return processed_dict


In [56]:
all_processed_info = [process_dictionary_keys(page_info) for page_info in all_page_info_test]

In [57]:
all_processed_info[0]

{'url': 'https://mtsamples.com/site/pages/sample.asp?Type=100-ENT%20-%20Otolaryngology&Sample=2740-Adenoidectomy%20&%20Tonsillectomy%20&%20Lingual%20Frenulectomy',
 'medical specialty': 'ENT - Otolaryngology',
 'sample name': 'Adenoidectomy & Tonsillectomy & Lingual Frenulectomy',
 'description': 'Adenoidectomy and tonsillectomy and lingual frenulectomy. Chronic adenotonsillitis and ankyloglossia.   (Medical Transcription Sample Report)',
 'preoperative diagnoses': '1. Chronic adenotonsillitis.   2. Ankyloglossia',
 'postoperative diagnoses': '1. Chronic adenotonsillitis.   2. Ankyloglossia',
 'procedure performed': '1. Adenoidectomy and tonsillectomy.   2. Lingual frenulectomy.',
 'anesthesia': 'General endotracheal.',
 'findings/specimen': 'Tonsil and adenoid tissue.',
 'complications': 'None.',
 'condition': 'The patient is stable and tolerated the procedure well, and sent to PACU.',
 'history of present illness': 'This is a 3-year-old child with a history of adenotonsillitis.',
 'p

In [64]:
from functools import reduce
from collections import Counter

def find_common_keys(list_of_dicts):
    """
    Find keys that appear in all dictionaries in the given list.
    
    Args:
    list_of_dicts (list): A list of dictionaries to analyze.
    
    Returns:
    set: A set of keys that appear in all dictionaries.
    """
    if not list_of_dicts:
        return set()
    
    # Get all unique keys
    all_keys = set().union(*list_of_dicts)
    
    # Count occurrences of each key
    key_counts = Counter(key for d in list_of_dicts for key in d.keys())
    
    # Find keys that appear in all dictionaries
    common_keys = {key for key, count in key_counts.items() if count == len(list_of_dicts)}
    
    return common_keys

# Assume all_processed_info is your list of processed dictionaries
common_keys = find_common_keys(all_processed_info)
print("Keys that appear in all dictionaries:")
print(common_keys)

# Percentage of dictionaries each key appears in
total_dicts = len(all_processed_info)
key_percentages = {key: (count / total_dicts) * 100 for key, count in Counter(key for d in all_processed_info for key in d.keys()).items()}

print("\nPercentage of dictionaries each key appears in (descending order):")
# Sort the key_percentages dictionary by value (percentage) in descending order
sorted_percentages = sorted(key_percentages.items(), key=lambda x: x[1], reverse=True)

for key, percentage in sorted_percentages:
    print(f"{key}: {percentage:.2f}%")

# Additional analysis for keys not present in all dictionaries
if len(common_keys) < len(key_percentages):
    print("\nKeys not present in all dictionaries:")
    for key, percentage in sorted_percentages:
        if key not in common_keys:
            print(f"{key}: {percentage:.2f}%")

Keys that appear in all dictionaries:
{'keywords', 'medical specialty', 'url', 'sample name', 'description'}

Percentage of dictionaries each key appears in (descending order):
url: 100.00%
medical specialty: 100.00%
sample name: 100.00%
description: 100.00%
keywords: 100.00%
anesthesia: 49.49%
postoperative diagnosis: 33.33%
preoperative diagnosis: 32.32%
procedure: 31.31%
preoperative diagnoses: 26.26%
postoperative diagnoses: 25.25%
procedure performed: 18.18%
complications: 16.16%
estimated blood loss: 15.15%
findings: 14.14%
impression: 14.14%
plan: 13.13%
indications: 13.13%
history of present illness: 12.12%
description of procedure: 10.10%
physical examination: 10.10%
family history: 10.10%
history: 9.09%
chief complaint: 8.08%
allergies: 8.08%
social history: 8.08%
operation performed: 8.08%
indications for procedure: 7.07%
review of systems: 7.07%
operation: 6.06%
past surgical history: 6.06%
recommendations: 6.06%
description of operation: 6.06%
procedure in detail: 5.05%
cu

## Save and/or load the processed letters

In [65]:
import json
from datetime import datetime

def save_processed_info(all_processed_info, filename=None):
    """
    Save the processed information to a JSON file.
    
    Args:
    all_processed_info (list): The list of processed dictionaries to save.
    filename (str, optional): The name of the file to save to. If not provided,
                              a default name with timestamp will be used.
    
    Returns:
    str: The name of the file that was created.
    """
    if filename is None:
        # Generate a default filename with timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"processed_info_{timestamp}.json"
    
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(all_processed_info, f, ensure_ascii=False, indent=4)
        print(f"Successfully saved processed information to {filename}")
        return filename
    except Exception as e:
        print(f"An error occurred while saving the file: {str(e)}")
        return None


In [66]:
# Save list of dictionaries with clinical letter info
saved_filename = save_processed_info(all_processed_info, '../data/list_of_processed_letters.json')

if saved_filename:
    print(f"Data saved to {saved_filename}")
else:
    print("Failed to save data")

Successfully saved processed information to ../data/list_of_processed_letters.json
Data saved to ../data/list_of_processed_letters.json


In [67]:
import json
# To load the data later:
def load_processed_info(filename):
    """
    Load processed information from a JSON file.
    
    Args:
    filename (str): The name of the file to load from.
    
    Returns:
    list: The list of processed dictionaries loaded from the file.
    """
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            data = json.load(f)
        print(f"Successfully loaded processed information from {filename}")
        return data
    except Exception as e:
        print(f"An error occurred while loading the file: {str(e)}")
        return None

In [68]:
# Load list of clinical letters as dictionaries
loaded_data = load_processed_info('../data/list_of_processed_letters.json')
if loaded_data:
     print(f"Loaded {len(loaded_data)} items from the file")

Successfully loaded processed information from ../data/list_of_processed_letters.json
Loaded 99 items from the file


In [69]:
loaded_data[0]

{'url': 'https://mtsamples.com/site/pages/sample.asp?Type=100-ENT%20-%20Otolaryngology&Sample=2740-Adenoidectomy%20&%20Tonsillectomy%20&%20Lingual%20Frenulectomy',
 'medical specialty': 'ENT - Otolaryngology',
 'sample name': 'Adenoidectomy & Tonsillectomy & Lingual Frenulectomy',
 'description': 'Adenoidectomy and tonsillectomy and lingual frenulectomy. Chronic adenotonsillitis and ankyloglossia.   (Medical Transcription Sample Report)',
 'preoperative diagnoses': '1. Chronic adenotonsillitis.   2. Ankyloglossia',
 'postoperative diagnoses': '1. Chronic adenotonsillitis.   2. Ankyloglossia',
 'procedure performed': '1. Adenoidectomy and tonsillectomy.   2. Lingual frenulectomy.',
 'anesthesia': 'General endotracheal.',
 'findings/specimen': 'Tonsil and adenoid tissue.',
 'complications': 'None.',
 'condition': 'The patient is stable and tolerated the procedure well, and sent to PACU.',
 'history of present illness': 'This is a 3-year-old child with a history of adenotonsillitis.',
 'p