In [1]:
import pandas as pd
import numpy as np
import logging
import time

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded

from fuzzywuzzy import fuzz
from fuzzywuzzy import process

np.random.seed(0)

from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

import dask
lazy_results = []

#### Load 1000 randomly generated names and surnames

In [2]:
csv1 = pd.read_csv(r"./data/MOCK_DATA.csv")

In [3]:
csv1.shape

(28, 3)

In [4]:
csv1.head(5)

Unnamed: 0,id,full name,email
0,1,Mirelle Spire Kirtley,mkirtley0@geocities.jp
1,2,Detelina R. Labed,dlenz1@mapquest.com
2,3,Bryanty Wolford,bwolford2@ucoz.ru
3,4,Elijah von Hagt,evon3@who.int
4,5,Mathe Sivier,msivier4@google.ru


#### Load first names

In [5]:
csv2_first_names = pd.read_csv(r"./data/first_names.txt", header=None, names=['first_name'])

In [6]:
csv2_first_names.shape

(19948, 1)

In [7]:
csv2_first_names.head(5)

Unnamed: 0,first_name
0,Añaterve
1,Añes
2,Aadil
3,Aali
4,Aaliyah


#### Load surnames

In [8]:
csv2_surnames = pd.read_csv(r"./data/surnames.txt", error_bad_lines=False, header=None, names=['surname'])

In [9]:
csv2_surnames.shape

(88025, 1)

In [10]:
csv2_surnames.head(5)

Unnamed: 0,surname
0,Ñeco
1,Ñiguez
2,Açaola
3,Añaños
4,Añale


#### Let's mix first names and surnames randomly

In [11]:
sample = csv2_first_names['first_name'].iloc[np.random.choice(csv2_first_names.shape[0])]
sample

'Billi'

In [12]:
%%time
csv2 = pd.DataFrame()
csv2["full_name"] = csv2_surnames['surname'].apply(lambda x : '{} {}'.format(csv2_first_names['first_name'].iloc[np.random.choice(csv2_first_names.shape[0])], x))

Wall time: 6.52 s


In [13]:
csv2.tail(5)

Unnamed: 0,full_name
88020,Ludivina Zwolenksy
88021,Sherley Zydz
88022,Chad Zylinsk
88023,Gian Zylstra
88024,Rashad Zywiyask


Shortcut to load and save the full name, instead of randomly generating them on each run

In [14]:
csv2 = pd.read_csv(r"./data/full_names.txt", header=None, names=['full_name'])

In [None]:
# csv2.to_csv(r'./data/full_names.txt', index=False)

#### Start comparing

In [None]:
compare = pd.MultiIndex.from_product([csv1["full name"], csv2["full_name"]]).to_series()

In [None]:
def distances(vals):
    return pd.Series([fuzz.ratio(*vals), fuzz.token_sort_ratio(*vals)], ['ratio', 'token'])

In [None]:
%%time
results = compare.apply(distances)

In [None]:
results.sort_values('ratio', ascending=False).head(10)

In [None]:
results["Value"] = results["token"]

In [None]:
results.sort_values('token', ascending=False).head(10)

#### Show final results

In [None]:
%%time
output = results.unstack().idxmax(0).unstack(0)
output.head(5)

In [None]:
output = results.unstack()
output.head(5)

#### Distribute it with Dask!

In [15]:
compare = pd.MultiIndex.from_product([csv1["full name"], csv2["full_name"]]).to_series()
dcompare = dd.from_pandas(compare.reset_index(), npartitions=8)

In [16]:
dcompare

Unnamed: 0_level_0,full name,full_name,0
npartitions=8,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,object,object,object
308091,...,...,...
...,...,...,...
2156637,...,...,...
2464727,...,...,...


In [None]:
%%time
results = dcompare.apply(distances, axis=1)

In [None]:
%%time
ddcompare.compute(scheduler='processes')

In [None]:
dcompare

In [None]:
dcompare.shape

In [22]:
def ddistances(vals):
    return pd.Series([fuzz.ratio(*vals[["full name", "full_name"]]), fuzz.token_sort_ratio(*vals[["full name", "full_name"]])], ['ratio', 'token'])

In [23]:
%%time
results = dcompare.apply(ddistances, axis=1)

Wall time: 12 ms


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta={'ratio': 'int64', 'token': 'int64'})



In [None]:
results.shape

In [24]:
%%time
dcompare.compute(scheduler='processes') 

Wall time: 315 ms


Unnamed: 0,full name,full_name,0
0,Mirelle Spire Kirtley,full_name,"(Mirelle Spire Kirtley, full_name)"
1,Mirelle Spire Kirtley,Karttikeya Ñeco,"(Mirelle Spire Kirtley, Karttikeya Ñeco)"
2,Mirelle Spire Kirtley,Zakari Ñiguez,"(Mirelle Spire Kirtley, Zakari Ñiguez)"
3,Mirelle Spire Kirtley,Nagore Açaola,"(Mirelle Spire Kirtley, Nagore Açaola)"
4,Mirelle Spire Kirtley,Jianmei Añaños,"(Mirelle Spire Kirtley, Jianmei Añaños)"
...,...,...,...
2464723,Helen Maria Tiez,Sherley Zwolenksy,"(Helen Maria Tiez, Sherley Zwolenksy)"
2464724,Helen Maria Tiez,Chad Zydz,"(Helen Maria Tiez, Chad Zydz)"
2464725,Helen Maria Tiez,Gian Zylinsk,"(Helen Maria Tiez, Gian Zylinsk)"
2464726,Helen Maria Tiez,Rashad Zylstra,"(Helen Maria Tiez, Rashad Zylstra)"


In [None]:
results.head()

In [None]:
def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['ratio'] = slave_df.apply(lambda x: fuzzy_score(x,orig_string))
    return slave_df
    #return my_value corresponding to the highest score
#     return slave_df.loc[slave_df.ratio.idxmax(),'ratio']

dcsv1 = dd.from_pandas(csv1["full name"], npartitions=8)
# results = csv1
# results["ratio"] = dcsv1.apply(lambda x: helper(x, csv2["full_name"]),meta=('x','f8'))
results = dcsv1.apply(lambda x: helper(x, csv2["full_name"]),meta=('x','f8'))

In [None]:
%%time
dcsv1.compute(scheduler='processes') 

In [None]:
csv1.head()