# API

With no API key: 240 requests per minute, per IP address. 1,000 requests per day, per IP address.

With an API key: 240 requests per minute, per key. 120,000 requests per day, per key.

Apply your api key [here](https://open.fda.gov/apis/authentication/)

In [1]:
api_key = ''

## Maude database

### Parameter:
    - manufacturer_name (required)
        - ex. 'Edwards+Lifesciences'
    - zip_code (required)
    - product code (optional)
    - date_start & date_end (optional)(recommended)
        - ex. 20220301
    - other_para & other_para_value 
(For using the other_para, please use the variables in [searchable field](https://open.fda.gov/apis/device/event/searchable-fields/))
(Can not get more than 25000 record with the same parameter)

### Companies with their postal code
- Edwards - 92614
- Abbott  - 60064
- Boston Scientific Corporation - 01760
- Livanova - 77058

In [None]:
import pandas as pd
import numpy as np
import requests
from pathlib import Path  

def maude_extract_by_manu(manufacture_name = None,zip_code=None, product_code = None ,date_start = None, date_end = None, other_para = None, other_para_value = None, limit = 1000):
    URL = "https://api.fda.gov/device/event.json?search="
    if manufacture_name != None:
        URL += f"device.manufacturer_d_name:'{manufacture_name}'"
        if product_code != None:
            URL += f"+AND+device.device_report_product_code:'{product_code}'"
        if zip_code != None:
            URL += f"+AND+device.manufacturer_d_zip_code:'{zip_code}'"
        if (date_start != None) and (date_end != None):
            URL += f"+AND+date_received:[{date_start}+TO+{date_end}]"
        if (other_para != None) and (other_para_value != None):
            URL += f"+AND+{other_para}:'{other_para_value}'"
            
    URL += f"&limit={limit}"
    json_data = requests.get(URL).json()
    data = pd.json_normalize(json_data,'results')
    
    if json_data['meta']['results']['total'] > 1000:
        skip_data = pd.DataFrame()
        num = 1000
        while num < json_data['meta']['results']['total']:
            TEM_URL = URL + f"&skip={num}"
            skip_json_data = requests.get(TEM_URL).json()
            try:
                TEM_data = pd.json_normalize(skip_json_data,'results')
                skip_data = pd.concat([skip_data,TEM_data])
            except KeyError:
                pass
            num += 1000
        return skip_data
    if 'skip_data' in globals():
        data.append(skip_data)
    
    return data


In [None]:
def maude_extract_by_manu_api(api_key ,manufacture_name = None, zip_code = None ,product_code = None ,date_start = None, date_end = None, other_para = None, other_para_value = None, limit = 1000):
    URL = "https://api.fda.gov/device/event.json?"
    URL += f"api_key={api_key}&search="
    if manufacture_name != None:
        URL += f"device.manufacturer_d_name.exact:'{manufacture_name}'"
        if zip_code != None:
            URL += f"+AND+device.manufacturer_d_zip_code:'{zip_code}'"
        if product_code != None:
            URL += f"+AND+device.device_report_product_code:'{product_code}'"
        if (date_start != None) and (date_end != None):
            URL += f"+AND+date_received:[{date_start}+TO+{date_end}]"
        if (other_para != None) and (other_para_value != None):
            URL += f"+AND+{other_para}:'{other_para_value}'"
            
    URL += f"&limit={limit}"
    json_data = requests.get(URL).json()
    data = pd.json_normalize(json_data,'results')
    

    if json_data['meta']['results']['total'] > 1000:
        skip_data = pd.DataFrame()
        num = 1000
        while num < json_data['meta']['results']['total']:
            TEM_URL = URL + f"&skip={num}"
            skip_json_data = requests.get(TEM_URL).json()
            try:
                TEM_data = pd.json_normalize(skip_json_data,'results')
                skip_data = pd.concat([skip_data,TEM_data])
            except KeyError:
                pass
            num += 1000
        return skip_data
    if 'skip_data' in globals():
        data.append(skip_data)
    
    return data

In [None]:
def maude_extract_by_product(product_code = None ,date_start = None, date_end = None, other_para = None, other_para_value = None, limit = 1000):
    URL = "https://api.fda.gov/device/event.json?search="
    if product_code != None:
        URL += f"device.device_report_product_code:'{product_code}'"
        if (date_start != None) and (date_end != None):
            URL += f"+AND+date_received:[{date_start}+TO+{date_end}]"
        if (other_para != None) and (other_para_value != None):
            URL += f"+AND+{other_para}:'{other_para_value}'"
            
    URL += f"&limit={limit}"
    json_data = requests.get(URL).json()
    data = pd.json_normalize(json_data,'results')
    

    if json_data['meta']['results']['total'] > 1000:
        skip_data = pd.DataFrame()
        num = 1000
        while num < json_data['meta']['results']['total']:
            TEM_URL = URL + f"&skip={num}"
            skip_json_data = requests.get(TEM_URL).json()
            try:
                TEM_data = pd.json_normalize(skip_json_data,'results')
                skip_data = pd.concat([skip_data,TEM_data])
            except KeyError:
                pass
            num += 1000
        return skip_data
    if 'skip_data' in globals():
        data.append(skip_data)
    
    return data

In [None]:
from datetime import datetime

def ymd_to_y_m_d(d):
    return datetime.strptime(d, '%Y%m%d').strftime('%Y-%m-%d')

In [None]:
def series_ymd_to_y_m_d(dataframe, col):

    for i in range(len(dataframe)):
        if isinstance(dataframe.loc[i,col],str) != True:
            dataframe.loc[i,col] = None
    for i in range(len(dataframe)):
        if (dataframe.loc[i,col] != None) and (dataframe.loc[i,col] != ""):
            dataframe.loc[i,col] = ymd_to_y_m_d(dataframe.loc[i,col])

In [None]:
def maude_data(dataframe):
    df = pd.DataFrame()

    data = dataframe
    data.reset_index(drop=True, inplace=True)

    df['report_number'] = data['report_number']
    df['date_of_event'] = data['date_of_event']
    df['event_type'] = data['event_type']
    df['manufacturer_name'] = data['manufacturer_g1_name']
    df['date_received'] = data['date_received']
    df['product_problem'] = data['product_problems']
    df['zip_code'] = data['manufacturer_contact_zip_code']
    for i in range(len(data)):
        if data['device'][i][0]['device_report_product_code'] != None:
            df.loc[i,'prodcut_code'] = data['device'][i][0]['device_report_product_code']
        if data['device'][i][0]['brand_name'] != None:
            df.loc[i,'brand_name'] = data['device'][i][0]['brand_name']
        if data['device'][i][0]['model_number'] != None:
            df.loc[i,'model_number'] = data['device'][i][0]['model_number']
        if(len(data['patient'][i])>0):
            if "patient_problems" in data['patient'][i][0]:
            
                if data['patient'][i][0]['patient_problems'] != None:
                #print(", ".join(data['patient'][i][0]['patient_problems']))
                
                    df.loc[i,'patient_problem'] = ", ".join(data['patient'][i][0]['patient_problems'])
        if len(data['mdr_text'][i]) >= 2:
            for j in range(2):
                if data['mdr_text'][i][j]['text_type_code'] != None:
                    if data['mdr_text'][i][j]['text_type_code'] == 'Additional Manufacturer Narrative':
                        df.loc[i,'Manufacture_Narrative'] = data['mdr_text'][i][j]['text']
                    if data['mdr_text'][i][j]['text_type_code'] == 'Description of Event or Problem':
                        df.loc[i,'Event_Description'] = data['mdr_text'][i][j]['text']
    
    for i in range(len(df)):
        for col in df.columns:
            if df.loc[i,col] == "":
                df.loc[i,col] = None
    df.dropna(subset=['model_number'],inplace=True)
    
    df.reset_index(inplace=True, drop=True)
    df['model_number'] = df['model_number'].astype(str).str[0:4]
    series_ymd_to_y_m_d(dataframe=df, col= 'date_of_event')
    series_ymd_to_y_m_d(dataframe=df, col= 'date_received')
    
    return df

#### Extract MAUDE data by company example - Edwards

In general, we will use company's name, its postal code, the time you want to extract to the data.

In [None]:
edwards = maude_extract_by_manu(manufacture_name='Edwards+Lifesciences',zip_code=92614,date_end=20220331, date_start=20180101)
edwards = maude_data(edwards)
edwards.head()

In [None]:
#save data as cvs file
from pathlib import Path

#put the file path where you want to save in the filepath variable
# ex. "/Users/sam/Desktop/UCI/Capstone/Maude/data/Edwards/Edwards_2018to2022.csv"
filepath = Path('')
filepath.parent.mkdir(parents=True, exist_ok=True)
edwards.to_csv(filepath)

#### Extract MAUDE data by company example - Boston Scientific Corporation

Sometimes the company may have multiple postal code due to different factory or office.

In this time, we will extract the data by their name and do some cleaning.

In [None]:
boston = maude_extract_by_manu(manufacture_name='boston+scientific', date_start=20180101, date_end=20220331)
boston = maude_data(boston)
boston.head()

In [None]:
boston.dropna(subset = ['manufacturer_name'],inplace = True)
boston = boston[boston.manufacturer_name.str.contains('BOSTON SCIENTIFIC')]
boston.reset_index(inplace=True, drop=True)

In [None]:
#save data as cvs file
from pathlib import Path

#put the file path where you want to save in the filepath variable
# ex. "/Users/sam/Desktop/UCI/Capstone/Maude/data/Edwards/Edwards_2018to2022.csv"
filepath = Path('')
filepath.parent.mkdir(parents=True, exist_ok=True)
boston.to_csv(filepath)

#### Extract MAUDE data by company - Livanova

In [None]:
livanova = maude_extract_by_manu(manufacture_name='livanova', date_start= 20180101, date_end=20220331)
livanova = maude_data(livanova)

In [None]:
filepath = Path('')
filepath.parent.mkdir(parents=True, exist_ok=True)
livanova.to_csv(filepath)

#### Extract MAUDE data by company - Medtronic

In [None]:
medtronic = maude_extract_by_manu(manufacture_name='medtronic', date_start= 20180101, date_end=20220331)
medtronic = maude_data(medtronic)

In [None]:
filepath = Path('')
filepath.parent.mkdir(parents=True, exist_ok=True)
medtronic.to_csv(filepath)

#### Extract MAUDE data by product code example - NPT

In [None]:
NPT = maude_extract_by_product(product_code='NPT',date_start=20180101, date_end=20220331)
NPT = maude_data(NPT)

#### Extract MAUDE data by product code example - DYE

For product code extraction function, we use only the product code and the date to extract the data.

In [None]:
DYE = maude_extract_by_product(product_code='DYE',date_start=20180101, date_end=20220331)
DYE = maude_data(DYE)

#### Extract MAUDE data by product code - LWR

In [None]:
LWR = maude_extract_by_product(product_code='LWR', date_start=20180101, date_end=20220331)
LWR = maude_data(LWR)

## Recall database

### Parameter:
    - recall firm (required)
    - product code (optional)
    - date_start & date_end (optional)
    - other_para & other_para_value 
(For using the other_para, please use the variables in [searchable field](https://open.fda.gov/apis/device/recall/searchable-fields/))

In [None]:
import requests

def recall_extract_by_firm(recall_firm = None,date_start = None, date_end = None, other_para = None, other_para_value = None, limit = 1000):
    URL = "https://api.fda.gov/device/recall.json?search="
    if recall_firm != None:
        URL += f"recalling_firm:'{recall_firm}'"
        if (date_start != None) and (date_end != None):
            URL += f"+AND+event_date_initiated:[{date_start}+TO+{date_end}]"
        if (other_para != None) and (other_para_value != None):
            URL += f"+AND+{other_para}:'{other_para_value}'"
            
    URL += f"&limit={limit}"
    json_data = requests.get(URL).json()
    data = pd.json_normalize(json_data,'results')
    
    if json_data['meta']['results']['total'] > 1000:
        skip_data = pd.DataFrame()
        num = 1000
        while num < json_data['meta']['results']['total']:
            TEM_URL = URL + f"&skip={num}"
            skip_json_data = requests.get(TEM_URL).json()
            try:
                TEM_data = pd.json_normalize(skip_json_data,'results')
                skip_data = pd.concat([skip_data,TEM_data])
            except KeyError:
                pass
            num += 1000
        return skip_data
    if 'skip_data' in globals():
        data.append(skip_data)
    

    return data


def trans_recall(data1, recall_firm):
    data1.dropna(subset = ['recalling_firm'], inplace = True)
    data = data1[data1['recalling_firm'].str.lower().str.contains(f"{recall_firm.replace('+',' ').lower()}")]
    data.reset_index(inplace=True, drop=True)

    df = pd.DataFrame()
    df['recall_number'] = data['product_res_number']
    df['product_code'] = data['product_code']
    df['product_description'] = data['product_description']
    df['firm_name'] = data['recalling_firm']
    df['termination_date'] = data['event_date_terminated']
    df['posted_internet_date'] = data['event_date_posted']
    df['center_classification_date'] = data['event_date_initiated']
    df['root_cause_description'] = data['root_cause_description']
    df['recall_status'] = data['recall_status']
    df['action'] = data['action']
    df['recall_class'] = data['openfda.device_class']
    df['recall_reason'] = data['reason_for_recall']
    df['recall_class'] = df['recall_class'].replace(['1','3'],['3','1'])

    return df


#replace the recall class from [1,3] to [3,1], because the recall class we extracted using API is different from what FDA usually used
#For class 1 in API, it's class III for what FDA usually uses
def recall_class_trans(dataframe):
    dataframe['recall_class'] = dataframe['recall_class'].replace([1, 3],[3, 1])

#### Extract Recall data by company example - Edwards

In [None]:
recall_edwards = recall_extract_by_firm(recall_firm='edwards+lifesciences')
recall_edwards = trans_recall(recall_firm='edwards+lifesciences', data1 = recall_edwards)

In [None]:
edwards_productcode = ['NPT', 'DYE', 'LWR', 'NPU', 'DXO', 'DYG', 'DXE', 'DXG' , 'LDF', 'DQE']
recall_edwards = recall_edwards.loc[recall_edwards.product_code.isin(edwards_productcode),:]

#### Extract Recall data by company example - Boston Scientific Corporation

recall_boston = recall_extract_by_firm(recall_firm= 'boston+scientific+corporation')
recall_boston = trans_recall(data1=recall_boston, recall_firm='boston+scientific+corporation')

In [None]:
boston_productcode = ['NIQ', 'LWP', 'LWS', 'NVN', 'LOX', 'LIT', 'NGV', 'DQY', 'NWX', 'MCX', 'DTB', 'NVY', 'NIK', 'DQX', 'OBJ', 'NPT']
recall_boston = recall_boston.loc[recall_boston.product_code.isin(boston_productcode),:]

#### Extract Recall data by company example - Abbott

After 2017, St. Jude had been merged by Abbott. Therefore, we also extract St. Jude data.

recall_abbott = recall_extract_by_firm(recall_firm='abbott+laboratories')
recall_abbott = trans_recall(recall_firm='abbott', data1= recall_abbott)

In [None]:
abbott_productcode = ['DSQ', 'NIQ','LWS','LOX','NIK']

In [None]:
recall_abbott = recall_abbott.loc[recall_abbott.product_code.isin(abbott_productcode),:]

In [None]:
recall_jude = recall_extract_by_firm(recall_firm='jude')
recall_jude = trans_recall(data1=recall_jude, recall_firm='jude')

In [None]:
jude_productcode = ['NVN', 'NIK', 'NVY', 'NVZ', 'LWP', 'LWS','MXC', 'MOM','DYB','LWR','OJX','OAE','DRF',
'DXY','NKE','DQK','LWQ','OAD','DRC','DTB','DQX','OBJ','DQO','LPB','DRA']

In [None]:
recall_jude = recall_jude.loc[recall_jude.product_code.isin(jude_productcode),:]

In [None]:
recall_abbott_final = pd.concat([recall_abbott, recall_jude])
recall_abbott_final.drop_duplicates(keep='last',inplace=True)

#### Extract Recall data by company - Livanova

recall_livanova = recall_extract_by_firm(recall_firm = 'livanova')
recall_livanova = trans_recall(data1= recall_livanova, recall_firm= 'livanova')

In [None]:
livanova_productcode = ['DWE', 'DWF', 'DWC', 'DTS', 'DTL', 'DTZ', 'LWR']

In [None]:
recall_livanova = recall_livanova.loc[recall_livanova.product_code.isin(livanova_productcode),:]

#### Extract Recall data by company - Medtronic

recall_medtronic = recall_extract_by_firm(recall_firm='medtronic')

In [None]:
medtronic_productcode = ['NVZ', 'LWS', 'NPT', 'NIK', 'DSI', 'DTB', 'MIH', 'KRG', 'NIQ', 'PNJ', 'DYE', 'LWR']

In [None]:
recall_medtronic = recall_medtronic.loc[recall_medtronic.product_code.isin(medtronic_productcode),:]
recall_medtronic = trans_recall(data1= recall_medtronic, recall_firm='medtronic')

## Warning letter

In [None]:
import requests
import json
import re
from bs4 import BeautifulSoup
from urllib.request import urlopen


def extract_warning_letter_list(search):
    url = f"https://www.fda.gov/datatables/views/ajax?search_api_fulltext={search}&search_api_fulltext_issuing_office=&field_letter_issue_datetime=All&field_change_date_closeout_letter=&field_change_date_response_letter=&field_change_date_2=All&field_letter_issue_datetime_2=&draw=4&columns%5B0%5D%5Bdata%5D=0&columns%5B0%5D%5Bname%5D=&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=true&columns%5B0%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B0%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B1%5D%5Bdata%5D=1&columns%5B1%5D%5Bname%5D=&columns%5B1%5D%5Bsearchable%5D=true&columns%5B1%5D%5Borderable%5D=true&columns%5B1%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B1%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B2%5D%5Bdata%5D=2&columns%5B2%5D%5Bname%5D=&columns%5B2%5D%5Bsearchable%5D=true&columns%5B2%5D%5Borderable%5D=true&columns%5B2%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B2%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B3%5D%5Bdata%5D=3&columns%5B3%5D%5Bname%5D=&columns%5B3%5D%5Bsearchable%5D=true&columns%5B3%5D%5Borderable%5D=true&columns%5B3%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B3%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B4%5D%5Bdata%5D=4&columns%5B4%5D%5Bname%5D=&columns%5B4%5D%5Bsearchable%5D=true&columns%5B4%5D%5Borderable%5D=true&columns%5B4%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B4%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B5%5D%5Bdata%5D=5&columns%5B5%5D%5Bname%5D=&columns%5B5%5D%5Bsearchable%5D=true&columns%5B5%5D%5Borderable%5D=true&columns%5B5%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B5%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B6%5D%5Bdata%5D=6&columns%5B6%5D%5Bname%5D=&columns%5B6%5D%5Bsearchable%5D=true&columns%5B6%5D%5Borderable%5D=true&columns%5B6%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B6%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B7%5D%5Bdata%5D=7&columns%5B7%5D%5Bname%5D=&columns%5B7%5D%5Bsearchable%5D=true&columns%5B7%5D%5Borderable%5D=false&columns%5B7%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B7%5D%5Bsearch%5D%5Bregex%5D=false&start=0&length=10&search%5Bvalue%5D=&search%5Bregex%5D=false&_drupal_ajax=1&_wrapper_format=drupal_ajax&pager_element=0&view_args=&view_base_path=inspections-compliance-enforcement-and-criminal-investigations%2Fcompliance-actions-and-activities%2Fwarning-letters%2Fdatatables-data&view_display_id=warning_letter_solr_block&view_dom_id=216facc099c04af0f56180f89afb9f1d0a008020ee32fb027a8c49bc45f91f90&view_name=warning_letter_solr_index&view_path=%2Finspections-compliance-enforcement-and-criminal-investigations%2Fcompliance-actions-and-activities%2Fwarning-letters&total_items=3033&_=1653280484120"
    text = requests.get(url)
    search_response = json.loads(text.text)["data"]
    print(search_response)
    list_of_urls = list()
    for i, text in enumerate(search_response):
        if "<a href" in text[2]:
            list_of_urls.append(text[2].split("<a href=")[1].split(">")[0][1:-1])
    
    url_base = "https://www.fda.gov"
    urls_to_be_fired = list()
    for i, url in enumerate(list_of_urls):
        url_final = url_base + url
        urls_to_be_fired.append(url_final)
    
    return urls_to_be_fired

def web_scrape_warning_letter(list_name):

    web_text = list()
    for i, url in enumerate(list_name):
        page = urlopen(url)
        html_bytes = page.read()
        html = html_bytes.decode("utf-8")
        start_index = html.find(">WARNING LETTER</")
        end_index = html.find("/S/")
        title = html[start_index:end_index]
        title = title.split('</div><div>\xa0</div><div>')
        cleantext = list()
        for i, text in enumerate(title):
            TEMP = BeautifulSoup(text, "html.parser").text
            TEMP = re.sub("\xa0", " ", TEMP)
            TEMP = re.sub("\n", '', TEMP)
            cleantext.append(TEMP)
        #cleantext = cleantext[cleantext.find('Dear'):cleantext.find('Sincerely')]
        web_text.append(cleantext)
    return web_text



In [None]:
jude_list = extract_warning_letter_list(search='jude')
jude = web_scrape_warning_letter(jude_list)
jude1 = jude[0]

In [None]:
medtronic_list = extract_warning_letter_list(search='medtronic')
medtronic_warning = web_scrape_warning_letter(medtronic_list)

#### Text cleaning

In [None]:
import nltk
from nltk.stem.snowball import SnowballStemmer
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
import matplotlib.pyplot as plt

stopwords = set(STOPWORDS)
lemmatizer = WordNetLemmatizer()

In [None]:
stopwords.update(['caon', "firm’s", "firm", "analysis","failed",'dear','mr','rousseau',"rousseau:" "during",'will'])

In [None]:
for i, text in enumerate(jude1):
    jude1[i] = text.lower()

for i, text in enumerate(jude1):
    jude1[i] = re.sub('[,\.!?â€™]', '', text)

In [None]:
for i, text in enumerate(jude1):
    lemmatized_sentence = " ".join([lemmatizer.lemmatize(word) for word in text.split()])
    jude1[i] = lemmatized_sentence

#### Using N-gram to know more about warning letter

while using n-gram, we are looking for device category or name. For instance, battery.

In [None]:
def generate_ngrams(text, n_gram = 1):
    token = [token for token in text.lower().split(' ') if token != '' if token not in stopwords]
    ngrams = zip(*[token[i:] for i in range(n_gram)])
    return [' '.join(ngram) for ngram in ngrams]

In [None]:
text = " ".join([token for token in jude1])

In [None]:
from collections import defaultdict
unigram = defaultdict(int)
for word in generate_ngrams(text):
    unigram[word] += 1

In [None]:
import pandas as pd
df_unigram = pd.DataFrame(sorted(unigram.items(), key= lambda x : x[1])[::-1])

In [None]:
import seaborn as sns

sns.barplot(y=df_unigram[0].values[:15], x = df_unigram[1].values[:15])

In [None]:
### bigram

bigram = defaultdict(int)
for word in generate_ngrams(text, n_gram= 2):
    bigram[word] += 1

df_bigram = pd.DataFrame(sorted(bigram.items(), key= lambda x:x[1])[::-1])

sns.barplot( y = df_bigram[0].values[:15], x = df_bigram[1].values[:15])

In [None]:
### trigram

trigram = defaultdict(int)
for word in generate_ngrams(text, n_gram = 3):
    trigram[word] += 1

df_trigram = pd.DataFrame(sorted(trigram.items(), key= lambda x: x[1])[::-1])

sns.barplot( y = df_trigram[0].values[:15], x = df_trigram[1].values[:15])

Using specific word we found in n-gram to know more about the warning letter

In [None]:
def find_all(a_str, sub):
    start = 0
    while True:
        start = a_str.find(sub, start)
        if start == -1: return
        yield start
        start += len(sub)

def find_specific(word, text_list):
    specific = list()
    for i, text in enumerate(text_list):
        if word.lower() in text:
            specific_index = list(find_all(text, word.lower()))
            for a in specific_index:
                print(i, a)
                print(text_list[i][(a-15):])
            specific.append(text_list[i][(a-15):])
    
    return specific

We use battery as an example.

In [None]:
battery = find_specific(word='battery', text_list=jude1)

In [None]:
### trigram

trigram = defaultdict(int)
for word in generate_ngrams(battery[0], n_gram = 3):
    trigram[word] += 1

df_trigram = pd.DataFrame(sorted(trigram.items(), key= lambda x: x[1])[::-1])

sns.barplot( y = df_trigram[0].values[:15], x = df_trigram[1].values[:15])

#### Topic modeling for warning letter (not useful cuz the number of data is too small)

In [None]:
# import tools
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD

# TfidfVectorizer

def getTopics(text, topicsNum):
    topic = ''
    vectorizer = TfidfVectorizer(stop_words= 'english',
                                max_features = 1000,
                                max_df = 0.8,
                                min_df = 0.05)
    #temp_list = list()
    #temp_list.append(text)
    X = vectorizer.fit_transform(text)

    print(X.shape)
    svd_model = TruncatedSVD(n_components=topicsNum, algorithm='randomized', n_iter= 100, random_state= 122)
    svd_model.fit(X)

    terms = vectorizer.get_feature_names()
    print('components output shape '+ str(svd_model.components_.shape))
    print(svd_model.components_)

    for i, comp in enumerate(svd_model.components_):
        terms_comp = zip(terms, comp)
        sorted_terms = sorted(terms_comp, key= lambda x:x[1], reverse= True)[:10]
        string = "Topic "+str(i+1) +": "
        for t in sorted_terms:
            string = string + t[0] + ' '
            topic = topic + t[0] + ' '
        print(string)

    print(topic)

We use the text with battery to do topic modeling

In [None]:
getTopics(battery,10)

### Summary

In [None]:
import spacy
from spacy.lang.en.stop_words import STOP_WORDS
from string import punctuation
import string
from spacy.lang.en import English
from heapq import nlargest
punctuations = string.punctuation
from spacy.language import Language

nlp = English()
nlp.add_pipe('sentencizer') # updated
parser = English()

Get the bullet point of the warning letter using the get_point_warning function

In [None]:
def get_point_warning(text_list):
    a = 0
    for i, text in enumerate(text_list):
        if text[0].isnumeric() and (int(text[0]) > a):
            print(text[:(text.find(',',2))])
            a += 1

In [None]:
get_point_warning(jude1)

In [None]:
get_point_warning(medtronic[1])

Summary the whole warning letter(need some data to tune the model)

In [1]:
def pre_process(document):
    clean_tokens = [ token.lemma_.lower().strip() for token in document ]
    clean_tokens = [ token for token in clean_tokens if token not in STOP_WORDS and token not in punctuations ]
    tokens = [token.text for token in document]
    lower_case_tokens = list(map(str.lower, tokens))
    
    return lower_case_tokens


def generate_numbers_vector(tokens):
    frequency = [tokens.count(token) for token in tokens]
    token_dict = dict(list(zip(tokens,frequency)))
    maximum_frequency=sorted(token_dict.values())[-1]
    normalised_dict = {token_key:token_dict[token_key]/maximum_frequency for token_key in token_dict.keys()}
    return normalised_dict

def sentences_importance(text, normalised_dict):
    importance ={}
    for sentence in nlp(text).sents:
        for token in sentence:
            target_token = token.text.lower()
            if target_token in normalised_dict.keys():
                if sentence in importance.keys():
                    importance[sentence]+=normalised_dict[target_token]
                else:
                    importance[sentence]=normalised_dict[target_token]
    return importance

def generate_summary(rank, text):
    target_document = parser(text)
    importance = sentences_importance(text, generate_numbers_vector(pre_process(target_document)))
    summary = nlargest(rank, importance, key=importance.get)
    return summary

In [None]:
num = 3
for i, text in enumerate(corrective_action):
    print(generate_summary(num, text))