In [1]:
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

In [2]:
client = Client() #call the client from dask.distribute
client #initiate the client

0,1
Client  Scheduler: tcp://127.0.0.1:53155  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 17.14 GB


In [3]:
client.scatter

<bound method Client.scatter of <Client: scheduler='tcp://127.0.0.1:53155' processes=4 cores=8>>

In [4]:
drugs = dd.read_csv('products.txt', sep='~', engine='python')
patents = dd.read_csv('patent.txt', sep='~', engine='python')
exclusivity = dd.read_csv('exclusivity.txt', sep='~', engine='python')

prices = dd.read_csv('national_average_drug_acquisition_cost.csv', engine='python')

In [5]:
# Merge data from drugs, patents, and exclusivity files
all_data = dd.merge(drugs, patents, on = ['Appl_No', 'Product_No'], how = 'left')
all_data = dd.merge(all_data, exclusivity, on=['Appl_No', 'Product_No'], how='left')

In [6]:
all_data['dosage_form'] = all_data['DF;Route'].str.split(';', n=1).str[0]
all_data['route'] = all_data['DF;Route'].str.split(';').str[-1]

In [7]:
all_data['dosage_form'] = all_data['DF;Route'].str.split(';', n=1).str[0]
all_data['route'] = all_data['DF;Route'].str.split(';').str[-1]

In [8]:
cols = list(all_data.columns.values)
cols = ['Ingredient', 'dosage_form', 'route', 'Trade_Name', 'Strength', 'Applicant', 'Appl_Type_x', 'Appl_No', 'Product_No', 'TE_Code', 'Approval_Date', 'RLD', 'RS', 'Type',
 'Applicant_Full_Name', 'Appl_Type_y', 'Patent_No', 'Patent_Expire_Date_Text', 'Drug_Substance_Flag', 'Drug_Product_Flag', 'Patent_Use_Code', 'Delist_Flag',
 'Submission_Date', 'Appl_Type', 'Exclusivity_Code', 'Exclusivity_Date']
all_data = all_data[cols]

In [9]:
all_data.columns = map(str.lower, all_data.columns)

In [10]:
all_data['ndc_description_agg'] = all_data['trade_name'] + " " + all_data['strength'] + " " + all_data['route']

In [11]:
import re
# Correct inconsistencies in 'NDC Descriptions' (i.e. CAP = CAPSULE, etc.)
cap_regex = re.compile(r'\sCAP*?\Z | \sCP*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(cap_regex, ' CAPSULE')

chw_regex = re.compile(r'[\sCHW]*?\Z | [\sCHEW]*?\Z')
chw_regex2 = re.compile(r'[\sCHEW]*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace((chw_regex or chw_regex2), ' CHEWABLE')

tab_regex = re.compile(r'\sTAB\Z')
tab_regex2 = re.compile(r'\sTAB\s')
prices['NDC Description'] = prices['NDC Description'].str.replace((tab_regex or tab_regex2), ' TABLET')

syr_regex = re.compile(r'\sSYR*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(syr_regex, ' SYRINGE')

crm_regex = re.compile(r'\sCRM*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(crm_regex, ' CREAM')

sl_regex = re.compile(r'\sSL*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(sl_regex, ' SUB-LINGUAL')

foam_regex = re.compile(r'\sFOAM*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(foam_regex, ' FOAM')

autoinj_regex = re.compile(r'\sAUTO\-INJ*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(autoinj_regex, ' INJECTION')

eff_regex = re.compile(r'\sEFF*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(eff_regex, ' EFFERVESCENT')

soln_regex = re.compile(r'\sSOLN*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(soln_regex, ' SOLUTION')

inh_regex = re.compile(r'\sINH*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(inh_regex, ' INHALATION')

hcl_regex = re.compile(r'\sHCL\s*?\Z')
prices['NDC Description'] = prices['NDC Description'].str.replace(hcl_regex, ' HYDROCHLORIDE')

# Remove the space in between the dosage and dosage units
prices['NDC Description'] = prices['NDC Description'].str.replace(' MG', 'MG')
prices['NDC Description'] = prices['NDC Description'].str.replace(' ML', 'ML')
prices['NDC Description'] = prices['NDC Description'].str.replace(' MCG', 'MCG')
# Remove any white extra white space in the column data
prices['NDC Description'] = prices['NDC Description'].str.replace(' +', ' ')

#Print the new cleaned unique_titles
prices['NDC Description'].value_counts()

Dask Series Structure:
npartitions=1
    int64
      ...
Name: NDC Description, dtype: int64
Dask Name: value-counts-agg, 533 tasks

In [12]:
prices.columns
prices.rename(columns = {'NDC Description': 'ndc_description', 'NADAC_Per_Unit':'cost_per_unit_usd', 'OTC':'over_the_counter', 'As of Date':'as_of_date'}).compute()
prices.drop(['Pharmacy_Type_Indicator', 'Explanation_Code'], axis=1).compute()

prices.columns = map(str.lower, prices.columns)

In [13]:
prices = prices.astype(object)
all_data = all_data.astype(object)

In [14]:
# Change the following columns to datetime dtype
dd.to_datetime(prices['effective_date'], format='%m/%d/%Y').compute()
dd.to_datetime(prices['as of date'], format='%m/%d/%Y').compute()
dd.to_datetime(all_data['patent_expire_date_text'])

#From here on, this modification means that these drugs were approved prior to Jan 1, 1982
all_data['approval_date'].replace('Approved Prior to Jan 1, 1982', 'Dec 31, 1981')

Dask Series Structure:
npartitions=1
    object
       ...
Name: approval_date, dtype: object
Dask Name: replace, 40 tasks

In [15]:
# Drop all except one of each duplicate
prices.drop_duplicates(keep='first')

Unnamed: 0_level_0,ndc description,ndc,nadac_per_unit,effective_date,pricing_unit,pharmacy_type_indicator,otc,explanation_code,classification_for_rate_setting,corresponding_generic_drug_nadac_per_unit,corresponding_generic_drug_effective_date,as of date
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
,object,object,object,object,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...


In [16]:
all_data[all_data['ndc_description_agg'].replace(all_data['ndc_description_agg'].isnull(),'xxxxxxxxxx')]
prices[prices['ndc description'].replace(prices['ndc description'].isnull(),'xxxxxxxxxx')]

Unnamed: 0_level_0,ndc description,ndc,nadac_per_unit,effective_date,pricing_unit,pharmacy_type_indicator,otc,explanation_code,classification_for_rate_setting,corresponding_generic_drug_nadac_per_unit,corresponding_generic_drug_effective_date,as of date
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
,object,object,object,object,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...


In [17]:
all_data.head()

Unnamed: 0,ingredient,dosage_form,route,trade_name,strength,applicant,appl_type_x,appl_no,product_no,te_code,...,patent_expire_date_text,drug_substance_flag,drug_product_flag,patent_use_code,delist_flag,submission_date,appl_type,exclusivity_code,exclusivity_date,ndc_description_agg
0,BUDESONIDE,"AEROSOL, FOAM",RECTAL,UCERIS,2MG/ACTUATION,VALEANT PHARMS INTL,N,205613,1,,...,,,,,,,,,,UCERIS 2MG/ACTUATION RECTAL
1,BETAMETHASONE VALERATE,"AEROSOL, FOAM",TOPICAL,BETAMETHASONE VALERATE,0.12%,PERRIGO UK FINCO,A,78337,1,AB,...,,,,,,,,,,BETAMETHASONE VALERATE 0.12% TOPICAL
2,BETAMETHASONE VALERATE,"AEROSOL, FOAM",TOPICAL,BETAMETHASONE VALERATE,0.12%,RICONPHARMA LLC,A,207144,1,AB,...,,,,,,,,,,BETAMETHASONE VALERATE 0.12% TOPICAL
3,BETAMETHASONE VALERATE,"AEROSOL, FOAM",TOPICAL,BETAMETHASONE VALERATE,0.12%,TARO PHARM,A,208204,1,AB,...,,,,,,,,,,BETAMETHASONE VALERATE 0.12% TOPICAL
4,CLINDAMYCIN PHOSPHATE,"AEROSOL, FOAM",TOPICAL,CLINDAMYCIN PHOSPHATE,1%,PERRIGO UK FINCO,A,90785,1,AT,...,,,,,,,,,,CLINDAMYCIN PHOSPHATE 1% TOPICAL


In [18]:
# Export the cleaned files
prices = prices.compute()
all_data = all_data.compute()

In [19]:
prices.to_csv('prices.csv')
all_data.to_csv('all_data_new.csv')

In [20]:
import fuzzywuzzy as fuzz
from fuzzywuzzy import process
# Define matching function that will be used to provide a comparison of strings (drug names, strengths, and routes) for later merging of datasets
def match_name(name, list_names, min_score=0):
    # -1 score incase we don't get any matches
    max_score = -1
    # Returning empty name for no match as well
    max_name = ""
    # Iternating over all names in the other
    for name2 in list_names:
        print(name2)
        print(name)
        #Finding fuzzy match score
        score = fuzz.token_sort_ratio(name, name2)
        # Checking if we are above our threshold and have a better score
        if (score > min_score) & (score > max_score):
            max_name = name2
            max_score = score
    return (max_name, max_score)

In [36]:
# Load your smaller, reference dataframe using pandas, and call client.scatter to make sure all the workers have it
all_data = pd.read_csv('all_data_new.csv')
client.scatter
# Load your main data with dd.read_csv
prices = dd.read_csv('prices.csv')
# Call df.map_partitions(..) to process your data, where the function you pass should take two pandas dataframes, and work row-by-row.

# for row in prices['ndc description']:
merger = prices.map_partitions(lambda row: fuzz.token_set_ratio(all_data['ndc_description_agg'], 
        prices['ndc description'],
        85).compute(), meta = ('score', str))

In [52]:
# Option 2.1 - works (w/o Dask!)
# Runs the function above

# prices.set_index('ndc description')
# all_data.set_index('ndc_description_agg')

# List for dicts for easy dataframe creation
dict_list = []
# iterating over our drugs to find a match
for name in prices['ndc description']:
    # Use our method to find best match, we can set a threshold here
    print(name, all_data['ndc_description_agg'])
    
    match = match_name(name, all_data['ndc_description_agg'], 85)
    # New dict for storing data
    dict_ = {}
    dict_.update({'ndc description' : name})
    dict_.update({'ndc_description_agg' : match[0]})
    dict_.update({'score' : match[1]})
    dict_list.append(dict_)
    
merge_table1 = pd.DataFrame(dict_list)
# Display results
merge_table1

NotImplementedError: Series getitem in only supported for other series objects with matching partition structure