# FDA - Device Report Downloader

## Background
Medical Device reports are sent to the FDA from across the country. These reports are gathered and reviewed by experts in the FDA, and are also made available to the public. At the moment, there is only one easy way to retrieve MDRs from the FDA, which is to use the MAUDE database (https://www.accessdata.fda.gov/scripts/cdrh/cfdocs/cfmaude/search.cfm). However, this database suffers from a poor interface, few search features, and a limitation of only 500 reports retrievable at a time. Therefore, the MAUDE database is only suitable for analyzing small numbers of reports, and a new way of extracting and using device report data is needed.

The FDA has a second repository of medical device reports, kept as raw data in JSON format on another site (https://open.fda.gov/data/downloads/). Each file exists inside its own ZIP folder and has the same name as all others, making it a nightmare to manually download, unzip, rename, and merge the JSONs needed to make up the base dataset. Furthermore, the links to the downloads are hidden behind Javascript and buttons.

This code uses the Selenium webdriver to get around these challenges. Just enter the date range needed, the product codes, and your download folder--then run the notebook. The end result will be csv file with the requested data.

## User Input

In [1]:
start_year = 2020
end_year = 2020
pathf = 'C:/Users/Allen/Documents/FDA'
pcode = ["DYE","LWR","MIE","MWH","NPX","OHA","PAL","PAP"]
trim = "No"

#Import os package to determine if the user's entered filepath is valid
import os.path
from os import path

#Tkinter is used to build the input menu
from tkinter import *

#Create main window that everything else will fit into
root = Tk()
root.title("FDA Input")

#Create year entry fields
dialoglabelyearstart = Label(root, text = "Beginning Year")
dialoglabelyearstart.grid(row=0, column = 0, columnspan = 1, sticky = W)
dialogentryyearstart = Entry(root, width = 10)
dialogentryyearstart.grid(row = 0, column = 1, sticky = W)
dialoglabelyearend = Label(root, text = "Ending Year")
dialoglabelyearend.grid(row=1, column = 0, columnspan = 1, sticky = W)
dialogentryyearend = Entry(root, width = 10)
dialogentryyearend.grid(row = 1, column = 1, sticky = W)

#Automatically select the first entry field
dialogentryyearstart.focus_force()

#Create blank space as a separator
dialoglabelspace = Label(root, text = " ")
dialoglabelspace.grid(row=2, column = 0, columnspan = 1)

#Create product code entry field
dialoglabelproductcodes = Label(root, text = "Product Codes")
dialoglabelproductcodes.grid(row=3, column = 0, columnspan = 1, sticky = W)
dialogentryproductcodes = Entry(root, width = 40)
dialogentryproductcodes.grid(row = 3, column = 1, sticky = W)
dialoglabelproductcodes2 = Label(root, text = "Use formatting:")
dialoglabelproductcodes2.grid(row= 4, column = 0, columnspan = 1, sticky = W)
dialoglabelproductcodes2 = Label(root, text = "     abc, def, ghi")
dialoglabelproductcodes2.grid(row= 4, column = 1, columnspan = 2, sticky = W)
dialoglabelproductcodes2 = Label(root, text = "For all codes, use:")
dialoglabelproductcodes2.grid(row= 5, column = 0, columnspan = 1, sticky = W)
dialoglabelproductcodes2 = Label(root, text = "     all")
dialoglabelproductcodes2.grid(row= 5, column = 1, columnspan = 2, sticky = W)

dialoglabelspace2 = Label(root, text = " ")
dialoglabelspace2.grid(row=6, column = 0, columnspan = 1)

#Create filepath code entry field
dialoglabelfilepath = Label(root, text = "Filepath")
dialoglabelfilepath.grid(row=7, column = 0, columnspan = 1, sticky = W)
dialogentryfilepath = Entry(root, width = 40)
dialogentryfilepath.grid(row = 7, column = 1, sticky = W)
dialoglabelfilepath2 = Label(root, text = "Ex:    C:/Users/Myname/Documents/FDA")
dialoglabelfilepath2.grid(row= 8, column = 0, columnspan = 3, sticky = W)

dialoglabelspace3 = Label(root, text = " ")
dialoglabelspace3.grid(row=9, column = 0, columnspan = 1)

#Create trim entry field
dialoglabeltrim = Label(root, text = "Trim?")
dialoglabeltrim.grid(row=10, column = 0, columnspan = 1, sticky = W)
dialogentrytrim = Entry(root, width = 5)
dialogentrytrim.grid(row = 10, column = 1, sticky = W)
dialoglabeltrim2 = Label(root, text = "Yes or No, Trimming performs basic text formatting")
dialoglabeltrim2.grid(row= 11, column = 0, columnspan = 3, sticky = W)

#Create error field
dialoglabelerror = Label(root, text = "", fg = "red")
dialoglabelerror.grid(row=13, column = 1, columnspan = 1, sticky = W)

#Create what happens when you click on the button
def buttonClick():
    global start_year
    global end_year
    global pcode
    global pathf
    global trim
    start_year = dialogentryyearstart.get()
    end_year = dialogentryyearend.get()
    pcode = dialogentryproductcodes.get()
    pathf = dialogentryfilepath.get()
    trim = dialogentrytrim.get()
    
    if start_year == "":
        dialoglabelerror.config(text = "Beginning Year must be entered")
        return
    try:
        int(start_year)
    except ValueError:
        dialoglabelerror.config(text = "Beginning Year is not a year")
        return
    start_year = int(start_year)
    if start_year < 2000 | start_year > 2100:
        dialoglabelerror.config(text = "Beginning Year is out of bounds")
        return
    
    if end_year == "":
        dialoglabelerror.config(text = "Ending Year must be entered")
        return
    try:
        int(end_year)
    except ValueError:
        dialoglabelerror.config(text = "Ending Year is not a year")
        return
    end_year = int(end_year)
    if end_year < 2000:
        dialoglabelerror.config(text = "Ending Year is out of bounds")
        return
    
    if pcode == "":
        dialoglabelerror.config(text = "Product Code must be entered")
        return
    #Update the Product Code field to match what is needed for filtering later
    pcode = pcode.upper()
    pcode = "".join(pcode.split())
    pcode = pcode.upper()
    pcode = pcode.split(",")
    pcodeerrors = 0
    for code in pcode: pcodeerrors = pcodeerrors + max(len(code)-3,0)
    if pcodeerrors > 0:
        dialoglabelerror.config(text = "Erroneous Product Code")
        return        
    
    if pathf == "":
        dialoglabelerror.config(text = "Filepath must be entered")
        return
    if path.isdir(pathf) == False:
        dialoglabelerror.config(text = "Improper Filepath")
        return
    
    if trim == "":
        dialoglabelerror.config(text = "Trim preference must be entered")
        return
    trim = trim.lower()
    if trim != "yes" and trim != "no":
        dialoglabelerror.config(text = "Trim must be either yes or no")
        return

    root.destroy()

#Create button
myButton = Button(root, text = "Enter", padx = 20, pady = 3, command = buttonClick)
#Attach button
myButton.grid(row=12, column = 0, columnspan = 2)

#Create the mainloop to display the above dialog
root.mainloop()

## Installing specialized packages

In [2]:
pip install webdriver-manager

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install selenium

Note: you may need to restart the kernel to use updated packages.


In [4]:
#Importing of useful packages
import pandas as pd
import re
import numpy as np
import json
import datetime
from bs4 import BeautifulSoup as bs
import requests
import re
import urllib
import time
import os
import string
import nltk
import requests, zipfile
from io import BytesIO

URL = "https://open.fda.gov/data/downloads/"

## Web Scraping

In [5]:
#The FDA website does not load the downloadable files unless you scroll to that area of the page first
#Web-Scraping involves using the Selenium webdriver to open the site with Chrome, navigate to the
    #needed area, and hit the correct buttons at the correct time

from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import selenium.common.exceptions
from selenium import webdriver
import time

from selenium.webdriver.support.wait import WebDriverWait

options = webdriver.ChromeOptions()

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.get("https://www.google.com")

driver.get(URL)
driver.maximize_window()

time.sleep(1)

#Get past the light screen
button1 = driver.find_element(By.CLASS_NAME, "button.bg-primary.clr-white")
button1.click()

time.sleep(1)
 
    #Scroll to the button for medical device events
element_link=WebDriverWait(driver, 10).until(EC.presence_of_element_located(
   (By.XPATH, '//*[@id="Medical Device Event"]')))

driver.execute_script("arguments[0].scrollIntoView(true)", element_link)

time.sleep(1)

#Click the medical device event button
button2 = driver.find_element(By.XPATH, '//*[@id="Medical Device Event"]/section/button')
button2.click()

time.sleep(1)

#Retrieve the html code now that it displays the links we need
html = driver.execute_script("return document.getElementsByTagName('html')[0].innerHTML")
#print (html)

driver.close()




[WDM] - Current google-chrome version is 103.0.5060
[WDM] - Get LATEST chromedriver version for 103.0.5060 google-chrome
[WDM] - There is no [win32] chromedriver for browser 103.0.5060 in cache
[WDM] - About to download new driver from https://chromedriver.storage.googleapis.com/103.0.5060.53/chromedriver_win32.zip
[WDM] - Driver has been saved in cache [C:\Users\Allen\.wdm\drivers\chromedriver\win32\103.0.5060.53]


## Downloading Data

In [6]:
#Snip HTML to just the portion in question
pattern = '1991(.*?)<li id="Medical Device PMA">'
substring = re.search(pattern, html).group(1)

In [7]:
#Itemize links into array
import lxml.html

url_list = lxml.html.fromstring(substring)
url_list = url_list.xpath('//a/@href')

In [8]:
#Determine which links to follow, based on start and end year
year_list = list(range(start_year, end_year+1))

index_to_download = []

for meh in year_list:
    for bleh in range(0, len(url_list)):
        if str(meh) in url_list[bleh]:
            index_to_download.append(url_list.index(url_list[bleh]))

index_count = len(index_to_download)

In [9]:
#Follow links in array to download/process ZIPs

pathfull = pathf + "/FDAdata.json"
import requests, zipfile
from io import BytesIO

#Run loop, opening JSONs
loopnumber = 0
datamain = ""
for snuh in index_to_download:
    print('Download ' + (str(loopnumber+1)) + " of " + (str(index_count)) + " started ")
    url = url_list[snuh]
    import requests, zipfile
    req = requests.get(url)
    print('Download ' + (str(loopnumber+1))+ " completed ")
    zipfile = zipfile.ZipFile(BytesIO(req.content))
    filename = "FDAdata.json"
    for i, f in enumerate(zipfile.filelist):
        f.filename = filename.format(i)
        zipfile.extract(f)
    print('File ' + (str(loopnumber+1))+ ' extracted')
    data = json.load(open(pathfull))
    data = data["results"]
    datamain = data
    if loopnumber == 0:
        print('Creating Dataframe with JSON ' + (str(loopnumber+1)))
        dfmain = pd.json_normalize(data,
                  record_path = "device",
                  meta = ["report_number","report_source_code","date_received","event_type","type_of_report","mdr_text"],
                  record_prefix = "_",
                  errors = "ignore")
        if pcode[0] != "All":
            dfmain = dfmain[dfmain._device_report_product_code.isin(pcode)]
        print('Dataframe Created')
    else:
        print('Appending Dataframe with JSON ' + (str(loopnumber+1)))
        dfnew = pd.json_normalize(data,
                  record_path = "device",
                  meta = ["report_number","report_source_code","date_received","event_type","type_of_report","mdr_text"],
                  record_prefix = "_",
                  errors = "ignore")
        if pcode[0] != "All":
            dfnew = dfnew[dfnew._device_report_product_code.isin(pcode)]
        dfmain = pd.concat([dfmain, dfnew])
        print('JSON ' + (str(loopnumber+1)) + ' appended')
        
    os.remove(pathf + "/" + filename)
    loopnumber = loopnumber + 1
print("Dataframe ready")

Download 1 of 4 started 
Download 1 completed 
File 1 extracted
Creating Dataframe with JSON 1
Dataframe Created
Download 2 of 4 started 
Download 2 completed 
File 2 extracted
Appending Dataframe with JSON 2
JSON 2 appended
Download 3 of 4 started 
Download 3 completed 
File 3 extracted
Appending Dataframe with JSON 3
JSON 3 appended
Download 4 of 4 started 
Download 4 completed 
File 4 extracted
Appending Dataframe with JSON 4
JSON 4 appended
Dataframe ready


## Trimming Data/Export

In [10]:
if trim == "Yes":
    
    #Keep only relevant columns
    dfmain = dfmain[["_device_report_product_code","_brand_name","_generic_name","_manufacturer_d_name","type_of_report","report_number","report_source_code",
                     "date_received","event_type","mdr_text"]]

    #Rename columns
    dfmain.columns = ["product_code","brand_name","generic_name","manufacturer_name","type_of_report","report_number",
                            "report_source_code","date_received","event_type","mdr_text"]

    #Update date column to date format
    dfmain["date_received"] = pd.to_datetime(dfmain["date_received"])
    
    #Remove brackets from type of report column
    dfmain['type_of_report'] = dfmain['type_of_report'].str.join(', ')
    
    #Update MDR Text to only show the text narrative items--also lowercase the text
    newmdr = []
    for crag in dfmain["mdr_text"]:
        newmdr.append(''.join(re.findall("'text': .+?}",str(crag))).translate(str.maketrans('', '', string.punctuation)).replace("text"," - ")[4:])
    
    #Lowercase MDR Text
    dfmain["mdr_text"] = [x.lower() for x in newmdr]
    
#Export to CSV
dfmain.to_csv(pathf + "/fda_device_reports.csv", index = False)