In [1]:
import pandas as pd
import requests, zipfile, io
import os, glob, warnings
from datetime import datetime
now = datetime.now()

#Set dependencies
source_url = "https://www.accessdata.fda.gov/premarket/ftparea/pmnlstmn.zip"
source_file_name = "pmnlstmn.txt"
fda_product_codes = "https://www.accessdata.fda.gov/premarket/ftparea/foiclass.zip"

current_folder = os.path.dirname(os.path.abspath("__file__"))


#Staging file params
stage_ = current_folder+"\\Fetched data\\FDA"
stage_file_path = glob.glob(os.path.join(stage_,"FDA*.xlsx"))[0]
prev_update_file = glob.glob(os.path.join(stage_, source_file_name))[0]

#Extract path
extract_path = current_folder+"\\update_files\\"

print("Requesting update...")
print(datetime.now().strftime("%a, %b %d %Y   %H:%M:%S %p"))
r = requests.get(source_url)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall(extract_path)


update_file = pd.read_csv(extract_path+source_file_name, sep="|", encoding="latin")
update_timestamp = max(update_file.DECISIONDATE)
update_timestamp = datetime.strptime(update_timestamp, '%m/%d/%Y')



#Check update status
status = pd.read_csv(prev_update_file, sep = "|", encoding="latin")
checkpoint = max(status.DECISIONDATE)
checkpoint = datetime.strptime(checkpoint, '%m/%d/%Y')
if update_timestamp > checkpoint:
    print(f"\nUpdate available!\nUpdate version: {update_timestamp.strftime('%b %d, %Y')}\nStage version: {checkpoint.strftime('%b %d, %Y')}")
    control = input("Proceed to update? (Y/N): ")
    if control == "Y" or control == 'y':
        print("\nPreparing update...")
        print("Getting product code information from FDA.", end="\t")
        try:
            #Fetch product codes file from FDA
            response = requests.get(fda_product_codes)
            z = zipfile.ZipFile(io.BytesIO(response.content))
            z.extractall(extract_path)

            #Load product code files:
            pdt_codes = pd.read_csv(extract_path+"\\foiclass.txt", sep="|", encoding="latin")
            print("Success.")
        except Exception as e:
            print("Error.")
            print(e)

        print("Adding product code informtion to devices in update.", end="\t")
        #Add product code information to update
        fda_devices = update_file[['KNUMBER','APPLICANT','DEVICENAME',
                                   'REVIEWADVISECOMM','PRODUCTCODE','DECISIONDATE']]
        medical_specialty = pdt_codes[['REVIEW_PANEL','MEDICALSPECIALTY','PRODUCTCODE',
                                       'DEVICENAME','DEVICECLASS','REGULATIONNUMBER']]

        update = pd.merge(fda_devices,medical_specialty, on='PRODUCTCODE', how="left")
        no_pdt_code = update[update['MEDICALSPECIALTY'].isna()]
        print(str(100 - round(len(no_pdt_code)/len(update)*100))+"% records matched.")


        update.columns = ['K_Number','Player','Device Name', 'Review Advisory','Product Code','Decision Date','Review Panel','Medical Specialty'
                       ,'Device Meta','Device Class','Regulation Number']
        update['Decision Date'] = pd.to_datetime(update['Decision Date'])
        update.sort_values(by=['Decision Date'], ascending = False)


        #Convert Abbreviated Specialty to full strings
        #read file
        full_strs = {'AN': 'Anesthesiology',
                    'CV': 'Cardiovascular',
                    'CH': 'Clinical Chemistry',
                    'DE': 'Dental',
                    'EN': 'Ear, Nose, & Throat',
                    'GU': 'Gastroenterology & Urology',
                    'HO': 'General Hospital',
                    'HE': 'Hematology',
                    'IM': 'Immunology',
                    'MI': 'Microbiology',
                    'NE': 'Neurology',
                    'OB': 'Obstetrics/Gynecology',
                    'OP': 'Ophthalmic',
                    'OR': 'Orthopedic',
                    'PA': 'Pathology',
                    'PM': 'Physical Medicine',
                    'RA': 'Radiology',
                    'SU': 'General & Plastic Surgery',
                    'TX': 'Clinical Toxicology'}
        
        
        update['Specialty'] = update['Medical Specialty'].map(lambda x: full_strs.get(x))
        quit()
        update.set_index("K_Number", inplace=True)


        #Get Stage file
        print("\nRetrieving DB...")
        stage = pd.read_excel(stage_file_path)
#         columns_ = stage.iloc[8].tolist()
        
#         stage.columns = columns_
#         stage = stage.iloc[9:,]
        stage.set_index("K_Number", inplace=True)
        
        #Check for existing records
        if len(update[update.index.isin(stage.index.tolist())]) > 0:
            print(f"Found {len(update[update.index.isin(stage.index.tolist())])} overlapping rows. Skipping.")
            update = update[~update.index.isin(stage.index.tolist())]
        if not update.empty:
            print("\nApplying update...")
            stage = pd.concat([stage, update])
            update_export_path = current_folder+"\\Fetched data\\FDA\\"
            stage.to_excel(update_export_path+"FDA MDR "+str(update_timestamp.strftime("%m-%d-%Y"))+".xlsx")
            print("\nUpdating stage version...")
            r = requests.get(source_url)
            z = zipfile.ZipFile(io.BytesIO(r.content))
            z.extractall(update_export_path)
            print(f"\nUpdate Successful.\t Added {len(update)} rows.")
        else:
            print("No new rows identified. Data is already available in latest stage. Update Skipped.")
            control = input("Proceed to update stage version? (Y/N):")
            if control == "y" or control == 'Y':
                r = requests.get(source_url)
                z = zipfile.ZipFile(io.BytesIO(r.content))
                z.extractall(update_export_path)
                print("Stage version updated!")
            else:
                warnings.warn("Stage version update is advisable to resolve false updates.")
        
    
    else:
        print("Update skipped.")



    
else:
    print(f"\nNo updates available at the moment.")
    print(f"Update version: {update_timestamp.strftime('%b %d, %Y')}\nStage version: {checkpoint.strftime('%b %d, %Y')}")
    
    
    
    


Requesting update...
Thu, Mar 30 2023   05:59:37 AM

Update available!
Update version: Mar 25, 2023
Stage version: Mar 12, 2023
Update skipped.
