In [3]:
import pandas as pd
import numpy as np
import math
from jellyfish import jaro_winkler
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
from py_common_subseq import find_common_subsequences
import numbers
import time
from collections import Counter 
from fuzzywuzzy import fuzz

In [None]:
import io
import shutil

with io.open('recipients_reduced.csv', encoding='utf-8', errors='ignore') as source:
    with io.open('recipients_reduced_utf.csv', mode='w', encoding='utf-8') as target:
        shutil.copyfileobj(source,target)

In [4]:
#define column names
org_id = 'Recipient_ID'
org_name = 'RecipientName'
org_address1 = 'AddressLine1Txt'
org_city = 'CityNm'
org_state = 'StateAbbreviationCd'
org_zip = 'Zip'
org_web = 'WebsiteAddressTxt'

#set parameters
token_match_min = 2 # minimum number of matched tokens to be considered a match
token_limiter = .99 # percent of non-single tokens to tokenize, where rare tokens are at the bottom and common at the top
name_weight = .75 #note that this is really .75 * 4 because there are 4 org name simularity metrics
state_weight = 1
zip_weight = 1
phone_weight = 1
composite_score_min = 3.5 #minimum composite match score to be considered a match

start_time = time.time()
print "LOADING INITIAL DATAFRAME..."

df = pd.read_csv('RecipientTableUpdated_1.30.19_utf.csv',keep_default_na=False)

df.rename(columns={org_id:'id',org_name:'org_name',org_address1:'address1',org_city:'city',org_state:'state',org_zip:'postal_code',org_web:'web'}, inplace=True)

print("Dataframe loaded --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "PRE-PROCESSING: NORMALIZE STATES..."
#normalize state codes
state_lkup = pd.read_csv('state_lkup.csv',keep_default_na=False)

from collections import defaultdict
state_dict = defaultdict(list)
for state, acronym in zip(state_lkup.state.values,state_lkup.acronym.values):
    state_dict[state].append(acronym)

df.state = df.state.str.lower()
df.state = df.state.replace(state_dict)
df.to_csv('org_dup_df.csv')

print("states normalized --- %s seconds ---" % (time.time() - start_time))
print ""

chunking_time = time.time()
print "TOKENIZING, IDENTIFYING CANDIDATE MATCH PAIRS..."

n=0
dfs_list = []
for df in pd.read_csv('org_dup_df.csv',keep_default_na=False,chunksize=10000):
    
    n=n+1
    start_time = time.time()
    print n
    
    left_df_chunk = df.copy()
    left_df_chunk.rename(columns={'org_name':'l_org_name','address1':'l_address1','city':'l_city','state':'l_state','postal_code':'l_postal_code','web':'l_web'}, inplace=True)

        # for the left dataset
    left_tokenized_columns = [
        'l_org_name',
        #'l_acronym',
        #'l_alt_name',
        #'l_address1',
        #'l_address2',
        'l_city', 
        'l_state', 
        'l_postal_code',
        'l_web' 
        #'l_phone'
    ]

    # lowercase the name and split on spaces, remove non-alphanumeric chars
    def tokenize_name(name):
        if isinstance(name, basestring) is True:
            clean_name = ''.join(c if c.isalnum() else ' ' for c in name)
            return clean_name.lower().split()
        else:
            return name

    unique_tokens = [] #we treat state and zips differently because we want to include ALl state and zip tokens as these are unique

    #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< add chosen unique columns here from each df
    for word in left_df_chunk['l_state']:
        if isinstance(word, float) is False:
            unique_tokens.append(tokenize_name(str(word)))

    for word in left_df_chunk['l_postal_code']:
        if isinstance(word, float) is False:
            unique_tokens.append(tokenize_name(str(word)))

    #for word in left_df_chunk['l_acronym']:
    #    if isinstance(word, float) is False:
    #        unique_tokens.append(tokenize_name(str(word)))

    #for word in left_df_chunk['l_phone']:
    #    if isinstance(word, float) is False:
    #        unique_tokens.append(tokenize_name(str(word)))
    #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

    unique_flat_list = [item for sublist in unique_tokens for item in sublist]

    #instantiate counter and use to count word frequencies in flat list
    u_cnt = Counter()
    for token in unique_flat_list:
        u_cnt[token] += 1

    u_cnt_dict = dict(u_cnt) #convert to dictionary

    unique_tokens_df = pd.DataFrame(u_cnt_dict.items(), columns=['token', 'count'])
    unique_tokens_df = unique_tokens_df.sort_values(by='count')  #sorting by count so that we can take the first x% of tokens by rare frequency

    unique_token_flag = []
    for index, value in enumerate(unique_tokens_df['count']):
        if value == 1:
            unique_token_flag.append(0)  #for any tokens occuring only once, we exclude
        else:
            unique_token_flag.append(1)

    unique_tokens_df['flag'] = unique_token_flag        

    all_other_words = [] #creating a list of all words used in just ONE of the dfs in selected columns, for counting to determine rarity

    for word in left_df_chunk['l_org_name']:
        if isinstance(word, float) is False:
            all_other_words.append(tokenize_name(str(word)))

    #for word in left_df_chunk['l_alt_name']:
    #    if isinstance(word, float) is False:
    #        all_other_words.append(tokenize_name(str(word)))

    #for word in left_df_chunk['l_address1']:
    #    if isinstance(word, float) is False:
    #        all_other_words.append(tokenize_name(str(word)))

    for word in left_df_chunk['l_city']:
        if isinstance(word, float) is False:
            all_other_words.append(tokenize_name(str(word)))

    for word in left_df_chunk['l_web']:
        if isinstance(word, float) is False:
            all_other_words.append(tokenize_name(str(word)))

    flat_list = [item for sublist in all_other_words for item in sublist] #flatten list so it can be counted

    #instantiate counter and use to count word frequencies in flat list
    cnt = Counter()
    for token in flat_list:
        cnt[token] += 1

    cnt_dict = dict(cnt) #convert to dictionary

    main_tokens_df = pd.DataFrame(cnt_dict.items(), columns=['token', 'count'])
    main_tokens_df = main_tokens_df.sort_values(by='count')  #sorting by count so that we can take the first x% of tokens by rare frequency

    main_token_flag = []
    for index, value in enumerate(main_tokens_df['count']):
        if value == 1:
            main_token_flag.append(0)  #for any tokens occuring only once, we exclude
        elif index < int(main_tokens_df.shape[0] * token_limiter): #important line, we are cutting the top x% of frequently occuring tokens
            main_token_flag.append(1)
        else:
            main_token_flag.append(0)  #for the most common tokens, we exclude

    main_tokens_df['flag'] = main_token_flag

    all_tokens = pd.concat([unique_tokens_df, main_tokens_df])

    all_tokens.drop('count',axis=1,inplace=True)
    all_tokens['flag'] = all_tokens.flag.astype(int) #converting flags to int
    tokens_dct = all_tokens.to_dict('split') #converting tokens_df to dictionary
    tokens_dct=dict(tokens_dct['data']) #honestly can't remember why this works, something to do with conversion to dictionary

    #preparing token_ids which will be used for joining left and right dfs
    all_tokens.sort_values(by='flag',ascending=False,inplace=True)
    all_tokens.drop_duplicates(subset='token',keep='first',inplace=True)
    token_ids = all_tokens.index.get_level_values(0)
    all_tokens['token_id'] = token_ids

    all_tokens.drop('flag',axis=1,inplace=True)
    all_tokens['token_id'] = all_tokens.token_id.astype(int)
    token_id_dct = all_tokens.to_dict('split')
    tokens_id_dct=dict(token_id_dct['data'])
    
    vocabulary = np.array([w for w, c in tokens_dct.items() if c ==1]) #this works even without the ==1 and I don't know why
    cv = CountVectorizer( vocabulary=vocabulary)

    frame_list = []
    for colname in left_tokenized_columns:
        tokenmapping = cv.fit_transform(left_df_chunk[colname])
        df_row, token_id = tokenmapping.nonzero()

        frame_list.append(pd.DataFrame(np.vstack([vocabulary[token_id], left_df_chunk['id'].values[df_row]]).T, columns = ['token', 'id']))

    left_keyed = pd.concat(frame_list)

    #append token_id to token as this will be more efficient to join with
    left_token_ids = []
    for token in left_keyed.token:
        left_token_ids.append(tokens_id_dct[token])

    left_keyed['token_id'] = left_token_ids
    left_keyed.sort_values(by='token_id',inplace=True)
    left_keyed.set_index('token_id',inplace=True)
    left_keyed.drop('token',axis=1,inplace=True)

    aggregations = {
        'id_l': 'count'
    }
    
    right_keyed = left_keyed.copy()
    joined = left_keyed.join(right_keyed, how='inner',lsuffix='_l',rsuffix='_r')
    keys_grouped = joined.groupby(by=['id_l', 'id_r']).agg(aggregations)
    keys_grouped.rename(columns={'id_l':'id_l count'}, inplace=True)
    matched_records = keys_grouped[keys_grouped['id_l count'] >= token_match_min]
    matched_records.reset_index(inplace=True)
    duplicate_candidates = matched_records[matched_records['id_l'] <> matched_records['id_r']]
    
    #add chunk duplicate candidates to df list
    dfs_list.append(duplicate_candidates)
    
    print("chunk loaded --- %s seconds ---" % (time.time() - start_time))
    
dup_dfs = pd.concat(dfs_list)
print("All chunks loaded --- %s seconds ---" % (time.time() - chunking_time))
print ""

start_time = time.time()
print "REMOVING REDUNDANT DUPLICATE ID PAIRS..."

match_tuples = list(zip(dup_dfs['id_l'], dup_dfs['id_r']))

sorted_match_tuples = []
for tup in match_tuples:
    s = tuple(sorted(tup))
    sorted_match_tuples.append(s)

dup_dfs.drop(['id_l','id_r'],axis=1,inplace=True)
dup_dfs['id_tuples'] = sorted_match_tuples

new_col_list = ['id_l','id_r']
for n,col in enumerate(new_col_list):
    dup_dfs[col] = dup_dfs['id_tuples'].apply(lambda location: location[n])
    
dup_dfs.drop('id_tuples',axis=1,inplace=True)
dup_dfs.drop_duplicates(inplace=True)

print("redundant id pairs removed --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "APPENDING ORIGINAL DATA TO MATCH CANDIDATES..."

left_df = pd.read_csv('org_dup_df.csv')
right_df = left_df.copy()
left_df.rename(columns={'id':'id_l','org_name':'l_org_name','address1':'l_address1','city':'l_city','state':'l_state','postal_code':'l_postal_code','web':'l_web'}, inplace=True)
right_df.rename(columns={'id':'id_r','org_name':'r_org_name','address1':'r_address1','city':'r_city','state':'r_state','postal_code':'r_postal_code','web':'r_web'}, inplace=True)

#creating left/right dataframes which contain only the most relevant details for reviewing the match strengths
left_match_data = left_df[['id_l','l_org_name','l_city','l_state','l_postal_code','l_web']].copy()
right_match_data = right_df[['id_r','r_org_name','r_city','r_state','r_postal_code','r_web']].copy()

#making sure keys are str, results in blank df otherwise
left_match_data.id_l = left_match_data.id_l.astype('str')
right_match_data.id_r = right_match_data.id_r.astype('str')
dup_dfs.id_l = dup_dfs.id_l.astype('str')
dup_dfs.id_r = dup_dfs.id_r.astype('str')

#merging matched_records df with original record data for ease of review
l_conc = pd.merge(dup_dfs, left_match_data, on='id_l')
full_conc = pd.merge(l_conc, right_match_data, on='id_r')

print("original data concatenated with matches --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "SCORING ORG NAME SIMULARITY..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#scoring match candidates based on edit distance of org names
def jaro_simularity(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return jaro_winkler(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '')
def fuzz_partial(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return fuzz.partial_ratio(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '') / float(100)
def fuzz_sort(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return fuzz.token_sort_ratio(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '') / float(100)
def fuzz_set(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return fuzz.token_set_ratio(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '') / float(100)

full_conc['l_org_name'] = full_conc['l_org_name'].astype('str')
full_conc['r_org_name'] = full_conc['r_org_name'].astype('str')

jaro_time = time.time()
full_conc['jaro_score'] = full_conc.apply(lambda x: jaro_simularity(x.l_org_name, x.r_org_name), axis=1)
print("jaro scores done --- %s seconds ---" % (time.time() - jaro_time))
partial_time = time.time()
full_conc['fuzz_partial_score'] = full_conc.apply(lambda x: fuzz_partial(x.l_org_name, x.r_org_name), axis=1)
print("fuzz partial scores done --- %s seconds ---" % (time.time() - partial_time))
sort_time = time.time()
full_conc['fuzz_sort_score'] = full_conc.apply(lambda x: fuzz_sort(x.l_org_name, x.r_org_name), axis=1)
print("fuzz sort scores done --- %s seconds ---" % (time.time() - sort_time))
set_time = time.time()
full_conc['fuzz_set_score'] = full_conc.apply(lambda x: fuzz_set(x.l_org_name, x.r_org_name), axis=1)
print("fuzz set scores done --- %s seconds ---" % (time.time() - set_time))
print ""

print("name simularity scored --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "CHECKING FOR STATE CODE MATCHES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

def sanitize_state(state):
    if isinstance(state,basestring) is True:
        return ''.join(c for c in (state or '') if c in 'abcdefghijklmnopqrstuvwxyz')
    else:
        return ''
    
def state_match(state_a, state_b):
    sanitized_state_a = str(sanitize_state(state_a))
    sanitized_state_b = str(sanitize_state(state_b))

    # if the value is too short, means it's fubar
    if len(sanitized_state_a) < 2 or len(sanitized_state_b) < 2:
        return 0
    if state_a == state_b:
        return 1
    else:
        return 0    

full_conc['state_match'] = full_conc.apply(lambda x: state_match(x.l_state, x.r_state), axis=1)

print("state codes checked --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "CHECKING FOR POSTAL CODE MATCHES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#scoring match candidates based on matching postal code

def sanitize_postal(postal):
    if isinstance(postal, basestring) is True:
        return ''.join(c for c in (postal or '') if c in '1234567890')
    if isinstance(postal, float) is False:
        return postal

def postal_simularity(postal_a, postal_b):
    sanitized_postal_a = str(sanitize_postal(postal_a))
    sanitized_postal_b = str(sanitize_postal(postal_b))

    # if the number is too short, means it's fubar
    if len(sanitized_postal_a) < 5 or len(sanitized_postal_b) < 5:
        return 0
    if float(max(len(sub) for sub in find_common_subsequences(sanitized_postal_a, sanitized_postal_b))) / 5 >= 1:
        return 1
    else:
        return 0
    
full_conc['zip_match'] = full_conc.apply(lambda x: postal_simularity(x.l_postal_code, x.r_postal_code), axis=1)
    
print("postal codes checked --- %s seconds ---" % (time.time() - start_time))
print ""

#start_time = time.time()
#print "CHECKING FOR PHONE MATCHES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#scoring match candidates based on matching postal code

#def sanitize_phone(phone):
#    if isinstance(phone, basestring) is True:
#        return ''.join(c for c in (phone or '') if c in '1234567890')
#    if isinstance(phone, float) is False:
#        return phone

#def postal_simularity(phone_a, phone_b):
#    sanitized_phone_a = str(sanitize_phone(phone_a))
#    sanitized_phone_b = str(sanitize_phone(phone_b))

    # if the number is too short, means it's fubar
#    if len(sanitized_phone_a) < 10 or len(sanitized_phone_b) < 10:
#        return 0
#    if float(max(len(sub) for sub in find_common_subsequences(sanitized_phone_a, sanitized_phone_b))) / 10 >= 1:
#        return 1
#    else:
#        return 0
    
#full_conc['phone_match'] = full_conc.apply(lambda x: phone_simularity(x.l_phone, x.r_phone), axis=1)
    
#print("phones checked --- %s seconds ---" % (time.time() - start_time))
#print ""

#test this.  may need to make more efficient but I think it should work
start_time = time.time()
print "DISTILLING STRONG ORG DUPLICATES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#calculate composite match score based on component scores and weights
full_conc['composite_match_score'] = full_conc.jaro_score * name_weight \
+ full_conc.fuzz_partial_score * name_weight \
+ full_conc.fuzz_sort_score * name_weight \
+ full_conc.fuzz_set_score * name_weight \
+ full_conc.zip_match * zip_weight \
+ full_conc.state_match * state_weight \
#+ full_conc.phone_match * phone_weight

org_duplicates = full_conc[full_conc.composite_match_score >= composite_score_min]

print("final duplicates isolated --- %s seconds ---" % (time.time() - start_time))
print ""

#full_conc[full_conc.composite_match_score < 3].sort_values(by='composite_match_score', ascending=False)
org_duplicates.sort_values(by='composite_match_score', ascending=False)

LOADING INITIAL DATAFRAME...


IOError: File RecipientTableUpdated_1.30.19_utf.csv does not exist

In [1]:
left_keyed

NameError: name 'left_keyed' is not defined

In [3]:
#number of >3.5 dups found with 3 token minimum
org_duplicates.shape

(25699, 20)

In [4]:
org_duplicates.to_csv('salehs file.csv')

In [7]:
match_tuples

[('1', '1062'),
 ('1053', '1062'),
 ('1057', '1062'),
 ('1058', '1062'),
 ('1059', '1062'),
 ('1060', '1062'),
 ('1061', '1062'),
 ('1064', '1062'),
 ('1188', '1062'),
 ('1377', '1062'),
 ('1831', '1062'),
 ('213', '1062'),
 ('2257', '1062'),
 ('2536', '1062'),
 ('280', '1062'),
 ('3108', '1062'),
 ('3117', '1062'),
 ('3595', '1062'),
 ('3738', '1062'),
 ('3781', '1062'),
 ('417', '1062'),
 ('4687', '1062'),
 ('474', '1062'),
 ('5081', '1062'),
 ('518', '1062'),
 ('5611', '1062'),
 ('5690', '1062'),
 ('570', '1062'),
 ('5726', '1062'),
 ('6073', '1062'),
 ('6076', '1062'),
 ('6247', '1062'),
 ('6538', '1062'),
 ('6683', '1062'),
 ('6686', '1062'),
 ('688', '1062'),
 ('7062', '1062'),
 ('7091', '1062'),
 ('7202', '1062'),
 ('7205', '1062'),
 ('7206', '1062'),
 ('7207', '1062'),
 ('7217', '1062'),
 ('7218', '1062'),
 ('7220', '1062'),
 ('7222', '1062'),
 ('7567', '1062'),
 ('7756', '1062'),
 ('7774', '1062'),
 ('7777', '1062'),
 ('7782', '1062'),
 ('7786', '1062'),
 ('7798', '1062'),
 ('

In [9]:
len(unique_flag)

578412

In [10]:
full_conc.shape

(4292670, 13)

In [27]:
dup_dfs.tail()

Unnamed: 0,id_l,id_r,id_l count
392645,237891,237867,2
392646,237891,237876,2
392647,237891,237886,2
392648,237891,237888,2
392649,237891,237890,2


In [46]:
dup_dfs = pd.concat(dfs_list)

match_tuples = list(zip(dup_dfs['id_l'], dup_dfs['id_r']))

sorted_match_tuples = []
for tup in match_tuples:
    s = tuple(sorted(tup))
    sorted_match_tuples.append(s)

dup_dfs.drop(['id_l','id_r'],axis=1,inplace=True)
dup_dfs['id_tuples'] = sorted_match_tuples

new_col_list = ['id_l','id_r']
for n,col in enumerate(new_col_list):
    dup_dfs[col] = dup_dfs['id_tuples'].apply(lambda location: location[n])
    
dup_dfs.drop('id_tuples',axis=1,inplace=True)
dup_dfs.drop_duplicates(inplace=True)

In [47]:
len(sorted_match_tuples)

4292670

In [52]:
dup_dfs.shape

(2146335, 3)

In [51]:
dup_dfs.drop_duplicates(inplace=True)

In [43]:
dup_dfs[(dup_dfs.id_l <> dup_dfs.id_l_new)]

Unnamed: 0,id_l,id_r,id_l count,"(id_l_new, id_r_new)",id tuples,id_l_new,id_r_new
171,1004,1003,2,<enumerate object at 0x00000000D8258BD0>,"(1003, 1004)",1003,1004
229,1007,1006,2,<enumerate object at 0x00000000D8258BD0>,"(1006, 1007)",1006,1007
245,1008,1006,2,<enumerate object at 0x00000000D8258BD0>,"(1006, 1008)",1006,1008
246,1008,1007,2,<enumerate object at 0x00000000D8258BD0>,"(1007, 1008)",1007,1008
263,1009,1006,2,<enumerate object at 0x00000000D8258BD0>,"(1006, 1009)",1006,1009
264,1009,1007,2,<enumerate object at 0x00000000D8258BD0>,"(1007, 1009)",1007,1009
265,1009,1008,2,<enumerate object at 0x00000000D8258BD0>,"(1008, 1009)",1008,1009
318,1010,1006,2,<enumerate object at 0x00000000D8258BD0>,"(1006, 1010)",1006,1010
319,1010,1007,2,<enumerate object at 0x00000000D8258BD0>,"(1007, 1010)",1007,1010
320,1010,1008,2,<enumerate object at 0x00000000D8258BD0>,"(1008, 1010)",1008,1010


In [39]:
new_col_list = ['id_l_new','id_r_new']
for n,col in enumerate(new_col_list):
    dup_dfs[col] = dup_dfs['id tuples'].apply(lambda location: location[n])

In [28]:
test_df = pd.DataFrame(sorted_match_tuples)

In [29]:
test_df.tail()

Unnamed: 0,0,1
4292665,237867,237891
4292666,237876,237891
4292667,237886,237891
4292668,237888,237891
4292669,237890,237891


In [25]:
test_df.drop_duplicates(inplace=True)

In [26]:
test_df.shape

(2146335, 2)

In [26]:
full_conc[full_conc.id_l == '2733']

Unnamed: 0,id_l,id_r,id_l count,l_org_name,l_city,l_state,l_postal_code,l_web,r_org_name,r_city,r_state,r_postal_code,r_web,jaro_score,fuzz_partial_score,fuzz_sort_score,fuzz_set_score,state_match,zip_match,composite_match_score


In [32]:
l_conc[l_conc.id_l=='2734']

Unnamed: 0,id_l,id_r,id_l count,l_org_name,l_city,l_state,l_postal_code,l_web


In [27]:
dfs[(dfs.id_l=='2733') & (dfs.id_r=='2734')]

Unnamed: 0,id_l,id_r,id_l count
248406,2733,2734,9


In [30]:
l_conc[(l_conc.id_r=='2733') & (l_conc.id_l=='2734')]

Unnamed: 0,id_l,id_r,id_l count,l_org_name,l_city,l_state,l_postal_code,l_web


In [38]:
l_conc.head()

Unnamed: 0,id_l,id_r,id_l count,l_org_name,l_city,l_state,l_postal_code,l_web
0,230001,230002,2,LEARN TO READ OF ST JOHNS COUNTY INC,Saint Augustine,fl,,
1,230001,230004,2,LEARN TO READ OF ST JOHNS COUNTY INC,Saint Augustine,fl,,
2,230001,230005,2,LEARN TO READ OF ST JOHNS COUNTY INC,Saint Augustine,fl,,
3,230001,230008,3,LEARN TO READ OF ST JOHNS COUNTY INC,Saint Augustine,fl,,
4,230001,230012,2,LEARN TO READ OF ST JOHNS COUNTY INC,Saint Augustine,fl,,


In [34]:
dfs.shape

(61678462, 3)

In [35]:
left_match_data[left_match_data.id_l=='2733']

Unnamed: 0,id_l,l_org_name,l_city,l_state,l_postal_code,l_web


In [37]:
#define column names
org_id = 'Recipient_ID'
org_name = 'RecipientName'
org_address1 = 'AddressLine1Txt'
org_city = 'CityNm'
org_state = 'StateAbbreviationCd'
org_zip = 'Zip'
org_web = 'WebsiteAddressTxt'

#set parameters
token_match_min = 2 # minimum number of matched tokens to be considered a match
token_limiter = .99 # percent of non-single tokens to tokenize, where rare tokens are at the bottom and common at the top
name_weight = .75 #note that this is really .75 * 4 because there are 4 org name simularity metrics
state_weight = 1
zip_weight = 1
phone_weight = 1
composite_score_min = 3.5 #minimum composite match score to be considered a match

start_time = time.time()
print "LOADING INITIAL DATAFRAME..."

df = pd.read_csv('RecipientTableUpdated_1.30.19_utf.csv',keep_default_na=False)

df.rename(columns={org_id:'id',org_name:'org_name',org_address1:'address1',org_city:'city',org_state:'state',org_zip:'postal_code',org_web:'web'}, inplace=True)

print("Dataframe loaded --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "PRE-PROCESSING: NORMALIZE STATES..."
#normalize state codes
state_lkup = pd.read_csv('state_lkup.csv',keep_default_na=False)

from collections import defaultdict
state_dict = defaultdict(list)
for state, acronym in zip(state_lkup.state.values,state_lkup.acronym.values):
    state_dict[state].append(acronym)

df.state = df.state.str.lower()
df.state = df.state.replace(state_dict)
df.to_csv('org_dup_df.csv')

print("states normalized --- %s seconds ---" % (time.time() - start_time))
print ""

chunking_time = time.time()
print "TOKENIZING, IDENTIFYING CANDIDATE MATCH PAIRS..."

n=0
dfs_list = []
for df in pd.read_csv('org_dup_df.csv',keep_default_na=False,chunksize=10000):
    
n=n+1
start_time = time.time()
print n

left_df_chunk = df.copy()
left_df_chunk.rename(columns={'org_name':'l_org_name','address1':'l_address1','city':'l_city','state':'l_state','postal_code':'l_postal_code','web':'l_web'}, inplace=True)

    # for the left dataset
left_tokenized_columns = [
    'l_org_name',
    #'l_acronym',
    #'l_alt_name',
    #'l_address1',
    #'l_address2',
    'l_city', 
    'l_state', 
    'l_postal_code',
    'l_web' 
    #'l_phone'
]

# lowercase the name and split on spaces, remove non-alphanumeric chars
def tokenize_name(name):
    if isinstance(name, basestring) is True:
        clean_name = ''.join(c if c.isalnum() else ' ' for c in name)
        return clean_name.lower().split()
    else:
        return name

unique_tokens = [] #we treat state and zips differently because we want to include ALl state and zip tokens as these are unique

#<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< add chosen unique columns here from each df
for word in left_df_chunk['l_state']:
    if isinstance(word, float) is False:
        unique_tokens.append(tokenize_name(str(word)))

for word in left_df_chunk['l_postal_code']:
    if isinstance(word, float) is False:
        unique_tokens.append(tokenize_name(str(word)))

#for word in left_df_chunk['l_acronym']:
#    if isinstance(word, float) is False:
#        unique_tokens.append(tokenize_name(str(word)))

#for word in left_df_chunk['l_phone']:
#    if isinstance(word, float) is False:
#        unique_tokens.append(tokenize_name(str(word)))
#<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

unique_flat_list = [item for sublist in unique_tokens for item in sublist]

#instantiate counter and use to count word frequencies in flat list
u_cnt = Counter()
for token in unique_flat_list:
    u_cnt[token] += 1

u_cnt_dict = dict(u_cnt) #convert to dictionary

unique_tokens_df = pd.DataFrame(u_cnt_dict.items(), columns=['token', 'count'])
unique_tokens_df = unique_tokens_df.sort_values(by='count')  #sorting by count so that we can take the first x% of tokens by rare frequency

unique_token_flag = []
for index, value in enumerate(unique_tokens_df['count']):
    if value == 1:
        unique_token_flag.append(0)  #for any tokens occuring only once, we exclude
    else:
        unique_token_flag.append(1)

unique_tokens_df['flag'] = unique_token_flag        

all_other_words = [] #creating a list of all words used in just ONE of the dfs in selected columns, for counting to determine rarity

for word in left_df_chunk['l_org_name']:
    if isinstance(word, float) is False:
        all_other_words.append(tokenize_name(str(word)))

#for word in left_df_chunk['l_alt_name']:
#    if isinstance(word, float) is False:
#        all_other_words.append(tokenize_name(str(word)))

#for word in left_df_chunk['l_address1']:
#    if isinstance(word, float) is False:
#        all_other_words.append(tokenize_name(str(word)))

for word in left_df_chunk['l_city']:
    if isinstance(word, float) is False:
        all_other_words.append(tokenize_name(str(word)))

for word in left_df_chunk['l_web']:
    if isinstance(word, float) is False:
        all_other_words.append(tokenize_name(str(word)))

flat_list = [item for sublist in all_other_words for item in sublist] #flatten list so it can be counted

#instantiate counter and use to count word frequencies in flat list
cnt = Counter()
for token in flat_list:
    cnt[token] += 1

cnt_dict = dict(cnt) #convert to dictionary

main_tokens_df = pd.DataFrame(cnt_dict.items(), columns=['token', 'count'])
main_tokens_df = main_tokens_df.sort_values(by='count')  #sorting by count so that we can take the first x% of tokens by rare frequency

main_token_flag = []
for index, value in enumerate(main_tokens_df['count']):
    if value == 1:
        main_token_flag.append(0)  #for any tokens occuring only once, we exclude
    elif index < int(main_tokens_df.shape[0] * token_limiter): #important line, we are cutting the top x% of frequently occuring tokens
        main_token_flag.append(1)
    else:
        main_token_flag.append(0)  #for the most common tokens, we exclude

main_tokens_df['flag'] = main_token_flag

all_tokens = pd.concat([unique_tokens_df, main_tokens_df])

all_tokens.drop('count',axis=1,inplace=True)
all_tokens['flag'] = all_tokens.flag.astype(int) #converting flags to int
tokens_dct = all_tokens.to_dict('split') #converting tokens_df to dictionary
tokens_dct=dict(tokens_dct['data']) #honestly can't remember why this works, something to do with conversion to dictionary

#preparing token_ids which will be used for joining left and right dfs
all_tokens.sort_values(by='flag',ascending=False,inplace=True)
all_tokens.drop_duplicates(subset='token',keep='first',inplace=True)
token_ids = all_tokens.index.get_level_values(0)
all_tokens['token_id'] = token_ids

all_tokens.drop('flag',axis=1,inplace=True)
all_tokens['token_id'] = all_tokens.token_id.astype(int)
token_id_dct = all_tokens.to_dict('split')
tokens_id_dct=dict(token_id_dct['data'])

vocabulary = np.array([w for w, c in tokens_dct.items() if c ==1]) #this works even without the ==1 and I don't know why
cv = CountVectorizer( vocabulary=vocabulary)

frame_list = []
for colname in left_tokenized_columns:
    tokenmapping = cv.fit_transform(left_df_chunk[colname])
    df_row, token_id = tokenmapping.nonzero()

    frame_list.append(pd.DataFrame(np.vstack([vocabulary[token_id], left_df_chunk['id'].values[df_row]]).T, columns = ['token', 'id']))

left_keyed = pd.concat(frame_list)

#append token_id to token as this will be more efficient to join with
left_token_ids = []
for token in left_keyed.token:
    left_token_ids.append(tokens_id_dct[token])

left_keyed['token_id'] = left_token_ids
left_keyed.sort_values(by='token_id',inplace=True)
left_keyed.set_index('token_id',inplace=True)
left_keyed.drop('token',axis=1,inplace=True)

aggregations = {
    'id_l': 'count'
}

right_keyed = left_keyed.copy()
joined = left_keyed.join(right_keyed, how='inner',lsuffix='_l',rsuffix='_r')
keys_grouped = joined.groupby(by=['id_l', 'id_r']).agg(aggregations)
keys_grouped.rename(columns={'id_l':'id_l count'}, inplace=True)
matched_records = keys_grouped[keys_grouped['id_l count'] >= token_match_min]
matched_records.reset_index(inplace=True)
duplicate_candidates = matched_records[matched_records['id_l'] <> matched_records['id_r']]

#add chunk duplicate candidates to df list
dfs_list.append(duplicate_candidates)

print("chunk loaded --- %s seconds ---" % (time.time() - start_time))
    
dup_dfs = pd.concat(dfs_list)
print("All chunks loaded --- %s seconds ---" % (time.time() - chunking_time))
print ""

start_time = time.time()
print "REMOVING REDUNDANT DUPLICATE ID PAIRS..."

match_tuples = list(zip(dup_dfs['id_l'], dup_dfs['id_r']))

sorted_match_tuples = []
for tup in match_tuples:
    s = tuple(sorted(tup))
    sorted_match_tuples.append(s)

dup_dfs.drop(['id_l','id_r'],axis=1,inplace=True)
dup_dfs['id_tuples'] = sorted_match_tuples

new_col_list = ['id_l','id_r']
for n,col in enumerate(new_col_list):
    dup_dfs[col] = dup_dfs['id_tuples'].apply(lambda location: location[n])
    
dup_dfs.drop('id_tuples',axis=1,inplace=True)
dup_dfs.drop_duplicates(inplace=True)

print("redundant id pairs removed --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "APPENDING ORIGINAL DATA TO MATCH CANDIDATES..."

left_df = pd.read_csv('org_dup_df.csv')
right_df = left_df.copy()
left_df.rename(columns={'id':'id_l','org_name':'l_org_name','address1':'l_address1','city':'l_city','state':'l_state','postal_code':'l_postal_code','web':'l_web'}, inplace=True)
right_df.rename(columns={'id':'id_r','org_name':'r_org_name','address1':'r_address1','city':'r_city','state':'r_state','postal_code':'r_postal_code','web':'r_web'}, inplace=True)

#creating left/right dataframes which contain only the most relevant details for reviewing the match strengths
left_match_data = left_df[['id_l','l_org_name','l_city','l_state','l_postal_code','l_web']].copy()
right_match_data = right_df[['id_r','r_org_name','r_city','r_state','r_postal_code','r_web']].copy()

#making sure keys are str, results in blank df otherwise
left_match_data.id_l = left_match_data.id_l.astype('str')
right_match_data.id_r = right_match_data.id_r.astype('str')
dup_dfs.id_l = dup_dfs.id_l.astype('str')
dup_dfs.id_r = dup_dfs.id_r.astype('str')

#merging matched_records df with original record data for ease of review
l_conc = pd.merge(dup_dfs, left_match_data, on='id_l')
full_conc = pd.merge(l_conc, right_match_data, on='id_r')

print("original data concatenated with matches --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "SCORING ORG NAME SIMULARITY..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#scoring match candidates based on edit distance of org names
def jaro_simularity(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return jaro_winkler(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '')
def fuzz_partial(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return fuzz.partial_ratio(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '') / float(100)
def fuzz_sort(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return fuzz.token_sort_ratio(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '') / float(100)
def fuzz_set(left_record, right_record):
    if isinstance(left_record, numbers.Integral) is False and isinstance(right_record, numbers.Integral) is False:
        return fuzz.token_set_ratio(unicode(left_record, 'utf-8') or '', unicode(right_record, 'utf-8') or '') / float(100)

full_conc['l_org_name'] = full_conc['l_org_name'].astype('str')
full_conc['r_org_name'] = full_conc['r_org_name'].astype('str')

jaro_time = time.time()
full_conc['jaro_score'] = full_conc.apply(lambda x: jaro_simularity(x.l_org_name, x.r_org_name), axis=1)
print("jaro scores done --- %s seconds ---" % (time.time() - jaro_time))
partial_time = time.time()
full_conc['fuzz_partial_score'] = full_conc.apply(lambda x: fuzz_partial(x.l_org_name, x.r_org_name), axis=1)
print("fuzz partial scores done --- %s seconds ---" % (time.time() - partial_time))
sort_time = time.time()
full_conc['fuzz_sort_score'] = full_conc.apply(lambda x: fuzz_sort(x.l_org_name, x.r_org_name), axis=1)
print("fuzz sort scores done --- %s seconds ---" % (time.time() - sort_time))
set_time = time.time()
full_conc['fuzz_set_score'] = full_conc.apply(lambda x: fuzz_set(x.l_org_name, x.r_org_name), axis=1)
print("fuzz set scores done --- %s seconds ---" % (time.time() - set_time))
print ""

print("name simularity scored --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "CHECKING FOR STATE CODE MATCHES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

def sanitize_state(state):
    if isinstance(state,basestring) is True:
        return ''.join(c for c in (state or '') if c in 'abcdefghijklmnopqrstuvwxyz')
    else:
        return ''
    
def state_match(state_a, state_b):
    sanitized_state_a = str(sanitize_state(state_a))
    sanitized_state_b = str(sanitize_state(state_b))

    # if the value is too short, means it's fubar
    if len(sanitized_state_a) < 2 or len(sanitized_state_b) < 2:
        return 0
    if state_a == state_b:
        return 1
    else:
        return 0    

full_conc['state_match'] = full_conc.apply(lambda x: state_match(x.l_state, x.r_state), axis=1)

print("state codes checked --- %s seconds ---" % (time.time() - start_time))
print ""

start_time = time.time()
print "CHECKING FOR POSTAL CODE MATCHES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#scoring match candidates based on matching postal code

def sanitize_postal(postal):
    if isinstance(postal, basestring) is True:
        return ''.join(c for c in (postal or '') if c in '1234567890')
    if isinstance(postal, float) is False:
        return postal

def postal_simularity(postal_a, postal_b):
    sanitized_postal_a = str(sanitize_postal(postal_a))
    sanitized_postal_b = str(sanitize_postal(postal_b))

    # if the number is too short, means it's fubar
    if len(sanitized_postal_a) < 5 or len(sanitized_postal_b) < 5:
        return 0
    if float(max(len(sub) for sub in find_common_subsequences(sanitized_postal_a, sanitized_postal_b))) / 5 >= 1:
        return 1
    else:
        return 0
    
full_conc['zip_match'] = full_conc.apply(lambda x: postal_simularity(x.l_postal_code, x.r_postal_code), axis=1)
    
print("postal codes checked --- %s seconds ---" % (time.time() - start_time))
print ""

#start_time = time.time()
#print "CHECKING FOR PHONE MATCHES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#scoring match candidates based on matching postal code

#def sanitize_phone(phone):
#    if isinstance(phone, basestring) is True:
#        return ''.join(c for c in (phone or '') if c in '1234567890')
#    if isinstance(phone, float) is False:
#        return phone

#def postal_simularity(phone_a, phone_b):
#    sanitized_phone_a = str(sanitize_phone(phone_a))
#    sanitized_phone_b = str(sanitize_phone(phone_b))

    # if the number is too short, means it's fubar
#    if len(sanitized_phone_a) < 10 or len(sanitized_phone_b) < 10:
#        return 0
#    if float(max(len(sub) for sub in find_common_subsequences(sanitized_phone_a, sanitized_phone_b))) / 10 >= 1:
#        return 1
#    else:
#        return 0
    
#full_conc['phone_match'] = full_conc.apply(lambda x: phone_simularity(x.l_phone, x.r_phone), axis=1)
    
#print("phones checked --- %s seconds ---" % (time.time() - start_time))
#print ""

#test this.  may need to make more efficient but I think it should work
start_time = time.time()
print "DISTILLING STRONG ORG DUPLICATES..." #<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

#calculate composite match score based on component scores and weights
full_conc['composite_match_score'] = full_conc.jaro_score * name_weight \
+ full_conc.fuzz_partial_score * name_weight \
+ full_conc.fuzz_sort_score * name_weight \
+ full_conc.fuzz_set_score * name_weight \
+ full_conc.zip_match * zip_weight \
+ full_conc.state_match * state_weight \
#+ full_conc.phone_match * phone_weight

org_duplicates = full_conc[full_conc.composite_match_score >= composite_score_min]

print("final duplicates isolated --- %s seconds ---" % (time.time() - start_time))
print ""

#full_conc[full_conc.composite_match_score < 3].sort_values(by='composite_match_score', ascending=False)
org_duplicates.sort_values(by='composite_match_score', ascending=False)

(7891, 16)