# End to End Matching Example

Please run [Record-Linkage-Example.ipynb](Record-Linkage-Example.ipynb) before this one in order to get the trained model at `../trained-models/notebooks/rl/rl-model.ckpt`.

## Boilerplate

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from importlib import reload
import logging
reload(logging)
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%H:%M:%S')

In [3]:
import sys

sys.path.insert(0, '..')

In [4]:
import entity_embed

In [5]:
import torch
import numpy as np

random_seed = 42
torch.manual_seed(random_seed)
np.random.seed(random_seed)

## Loading Test Data

In [6]:
import json

def load_pair_set(filepath):
    with open(filepath, 'r') as f:
        test_pos_pair_set = json.load(f)
        return set(tuple(pair) for pair in test_pos_pair_set)

train_pos_pair_set = load_pair_set('../trained-models/notebooks/rl/rl-train-pos-pairs.json')
valid_pos_pair_set = load_pair_set('../trained-models/notebooks/rl/rl-valid-pos-pairs.json')
test_pos_pair_set = load_pair_set('../trained-models/notebooks/rl/rl-test-pos-pairs.json')

In [7]:
import json

def load_record_dict(filepath):
    with open(filepath, 'r') as f:
        record_dict = json.load(f)
        return {int(id_): record for id_, record in record_dict.items()}

train_record_dict = load_record_dict('../trained-models/notebooks/rl/rl-train-records.json')
valid_record_dict = load_record_dict('../trained-models/notebooks/rl/rl-valid-records.json')
test_record_dict = load_record_dict('../trained-models/notebooks/rl/rl-test-records.json')

## Loading Model

In [8]:
from entity_embed import LinkageEmbed

model = LinkageEmbed.load_from_checkpoint('../trained-models/notebooks/rl/rl-model.ckpt')

## Blocking

In [9]:
%%time

eval_batch_size = 64
ann_k = 100
sim_threshold = 0.3

train_found_pair_set = model.predict_pairs(
    record_dict=train_record_dict,
    batch_size=eval_batch_size,
    ann_k=ann_k,
    sim_threshold=sim_threshold,
    show_progress=True,
)

# batch embedding:   0%|          | 0/15 [00:00<?, ?it/s]

CPU times: user 43.7 s, sys: 1.86 s, total: 45.5 s
Wall time: 8.23 s


In [10]:
%%time

valid_found_pair_set = model.predict_pairs(
    record_dict=valid_record_dict,
    batch_size=eval_batch_size,
    ann_k=ann_k,
    sim_threshold=sim_threshold,
    show_progress=True,
)

# batch embedding:   0%|          | 0/15 [00:00<?, ?it/s]

CPU times: user 40.5 s, sys: 1.48 s, total: 41.9 s
Wall time: 7.61 s


In [11]:
%%time

test_found_pair_set = model.predict_pairs(
    record_dict=test_record_dict,
    batch_size=eval_batch_size,
    ann_k=ann_k,
    sim_threshold=sim_threshold,
    show_progress=True,
)

# batch embedding:   0%|          | 0/43 [00:00<?, ?it/s]

CPU times: user 1min 40s, sys: 4.57 s, total: 1min 44s
Wall time: 18.2 s


In [12]:
from entity_embed.evaluation import pair_entity_ratio

pair_entity_ratio(len(test_found_pair_set), len(test_record_dict))

9.463104325699746

In [13]:
from entity_embed.evaluation import precision_and_recall

precision_and_recall(test_found_pair_set, test_pos_pair_set)

(0.029693081857642224, 0.993573264781491)

## Matching: Compare

Make a dataframe `df` with all records (train, valid, test) to add additional features:

In [14]:
record_dict = {**train_record_dict, **valid_record_dict, **test_record_dict}

In [15]:
import pandas as pd

df = pd.DataFrame.from_dict(record_dict, orient='index')
df = df.drop(columns='id')

In [16]:
df['all'] = df.agg('{0[description]} - {0[manufacturer]} - {0[price]} - {0[name]}'.format, axis=1)
df['price'] = pd.to_numeric(df['price'].str.replace(' ', ''), errors='coerce')
df.head(3)

Unnamed: 0,name,description,manufacturer,price,__source,cluster,all
1728,filemaker pro 8 . 5 5 - user pack,system requirements macintosh macintosh comput...,,1149.0,google,1561,system requirements macintosh macintosh comput...
255,diskeeper 2007 pro premier 5 - lic pack,- marketing information : diskeeper pro premie...,diskeeper corporation,736.66,amazon,1207,- marketing information : diskeeper pro premie...
2889,emc securid appl mnt - ent / std 17mo per u 2k...,,,16.0,google,2231,- - 16 - emc securid appl mnt - ent / std 17...


Replace all `record_dict`s (train, valid, test) to add additional features:

In [17]:
train_record_dict = df.loc[train_record_dict.keys()].to_dict(orient='index')
valid_record_dict = df.loc[valid_record_dict.keys()].to_dict(orient='index')
test_record_dict = df.loc[test_record_dict.keys()].to_dict(orient='index')

del df

In [18]:
import textdistance as td
import math

def token_ops(func):
    def new_func(x, y):
        return func(x.split(), y.split())    
    return new_func

def empty_num(x, y):
    return int(not x or not y or math.isnan(x) or math.isnan(y))

def zero_if_empty(x):
    if not x or math.isnan(x):
        return 0
    else:
        return x

def abs_diff(x, y):
    return abs(zero_if_empty(x) - zero_if_empty(y))

def abs_diff_log10(x, y, min_val=math.log10(1e-5)):
    diff = abs_diff(x, y)
    if diff > 0:
        return max(math.log10(diff), min_val)
    else:
        return min_val

SIM_FUNC_DICT = {
    ("all", "jaccard"): token_ops(td.jaccard.normalized_similarity),
    ("all", "overlap"): token_ops(td.overlap.normalized_similarity),
    ("all", "damerau_levenshtein"): td.damerau_levenshtein.normalized_similarity,
    ("all", "jaro_winkler"): td.jaro_winkler.normalized_similarity,
    ("description", "jaccard"): token_ops(td.jaccard.normalized_similarity),
    ("description", "overlap"): token_ops(td.overlap.normalized_similarity),
    ("description", "damerau_levenshtein"): td.damerau_levenshtein.normalized_similarity,
    ("description", "jaro_winkler"): td.jaro_winkler.normalized_similarity,
    ("manufacturer", "jaccard"): token_ops(td.jaccard.normalized_similarity),
    ("manufacturer", "overlap"): token_ops(td.overlap.normalized_similarity),
    ("manufacturer", "damerau_levenshtein"): td.damerau_levenshtein.normalized_similarity,
    ("manufacturer", "jaro_winkler"): td.jaro_winkler.normalized_similarity,
    ("name", "jaccard"): token_ops(td.jaccard.normalized_similarity),
    ("name", "overlap"): token_ops(td.overlap.normalized_similarity),
    ("name", "damerau_levenshtein"): td.damerau_levenshtein.normalized_similarity,
    ("name", "jaro_winkler"): td.jaro_winkler.normalized_similarity,
    ("price", "empty_num"): empty_num,
    ("price", "abs_diff"): abs_diff,
    ("price", "abs_diff_log10"): abs_diff_log10,
}

def record_sim_func(record_pair):
    record_left, record_right = record_pair
    feature_dict = {}
    
    for (field, sim_func_name), sim_func in SIM_FUNC_DICT.items():
        sim = sim_func(record_left[field], record_right[field])
        feature_dict[f"{field}_{sim_func_name}"] = sim
    
    return feature_dict

In [19]:
%%time

pair = next(iter(test_pos_pair_set))
id_left, id_right = pair
feature_dict = record_sim_func((test_record_dict[id_left], test_record_dict[id_right]))

# display(test_record_dict[id_left], test_record_dict[id_right])
feature_dict

CPU times: user 3.3 ms, sys: 173 µs, total: 3.47 ms
Wall time: 3.28 ms


{'all_jaccard': 0.3416666666666667,
 'all_overlap': 0.8541666666666666,
 'all_damerau_levenshtein': 0.4013888888888889,
 'all_jaro_winkler': 0.8590624064579049,
 'description_jaccard': 0.3366336633663367,
 'description_overlap': 0.9714285714285714,
 'description_damerau_levenshtein': 0.37821482602118006,
 'description_jaro_winkler': 0.8748461524552321,
 'manufacturer_jaccard': 0.0,
 'manufacturer_overlap': 0.0,
 'manufacturer_damerau_levenshtein': 0.0,
 'manufacturer_jaro_winkler': 0.0,
 'name_jaccard': 0.19999999999999996,
 'name_overlap': 0.4,
 'name_damerau_levenshtein': 0.5121951219512195,
 'name_jaro_winkler': 0.6423441734417343,
 'price_empty_num': 1,
 'price_abs_diff': 62920.89,
 'price_abs_diff_log10': 4.798794856979781}

In [20]:
from collections import defaultdict
import multiprocessing
from tqdm.auto import tqdm

def compare_pairs(record_dict, found_pair_set):
    all_feature_dict = defaultdict(list)
    chunksize = 100
    tasks = (
        (record_dict[id_left], record_dict[id_right])
        for (id_left, id_right)
        in found_pair_set
    )

    with multiprocessing.Pool() as pool:
        for feature_dict in tqdm(
            pool.imap_unordered(record_sim_func, tasks, chunksize=chunksize),
            total=len(found_pair_set)
        ):
            for feature, val in feature_dict.items():
                all_feature_dict[feature].append(val)

        pool.close()
        pool.join()
    
    return pd.DataFrame(all_feature_dict, index=pd.MultiIndex.from_tuples(found_pair_set))

In [21]:
%%time

train_feature_df = compare_pairs(train_record_dict, train_found_pair_set)

  0%|          | 0/2608 [00:00<?, ?it/s]

CPU times: user 125 ms, sys: 136 ms, total: 261 ms
Wall time: 934 ms


In [22]:
%%time

valid_feature_df = compare_pairs(valid_record_dict, valid_found_pair_set)

  0%|          | 0/2954 [00:00<?, ?it/s]

CPU times: user 172 ms, sys: 121 ms, total: 293 ms
Wall time: 1.1 s


In [23]:
%%time

test_feature_df = compare_pairs(test_record_dict, test_found_pair_set)

  0%|          | 0/26033 [00:00<?, ?it/s]

CPU times: user 1.59 s, sys: 300 ms, total: 1.89 s
Wall time: 8.01 s


## Matching: Compare - Additional Features

In [24]:
from sklearn.feature_extraction.text import TfidfVectorizer

def get_tfidf_vectorizer(train_record_dict, valid_record_dict, field='all'):
    tfidf_vectorizer = TfidfVectorizer(
        analyzer='char',
        ngram_range=(2,4),
        min_df=2
    )
    train_valid_record_dict = {**train_record_dict, **valid_record_dict}
    tfidf_vectorizer.fit(record[field] for record in train_valid_record_dict.values())
    return tfidf_vectorizer

tfidf_vectorizer = get_tfidf_vectorizer(train_record_dict, valid_record_dict)
tfidf_vectorizer

TfidfVectorizer(analyzer='char', min_df=2, ngram_range=(2, 4))

In [25]:
import numpy as np

def add_tfidf_feature(feature_df, record_dict, found_pair_set, field='all'):
    tfidf_matrix = tfidf_vectorizer.transform(record[field] for record in record_dict.values())

    id_to_idx = {id_: idx for idx, id_ in enumerate(record_dict.keys())}
    left_idx = [id_to_idx[left_id] for left_id, __ in found_pair_set]
    right_idx = [id_to_idx[right_id] for __, right_id in found_pair_set]
    tfidf_sim = tfidf_matrix[left_idx].multiply(tfidf_matrix[right_idx]).sum(axis=1)

    feature_df[f'{field}_tfidf'] = tfidf_sim

In [26]:
%%time

add_tfidf_feature(train_feature_df, train_record_dict, train_found_pair_set)

CPU times: user 399 ms, sys: 3.92 ms, total: 402 ms
Wall time: 402 ms


In [27]:
%%time

add_tfidf_feature(valid_feature_df, valid_record_dict, valid_found_pair_set)

CPU times: user 409 ms, sys: 4.03 ms, total: 413 ms
Wall time: 412 ms


In [28]:
%%time

add_tfidf_feature(test_feature_df, test_record_dict, test_found_pair_set)

CPU times: user 1.43 s, sys: 56 ms, total: 1.48 s
Wall time: 1.48 s


## Matching: Classify

In [29]:
test_feature_df.head(3)

Unnamed: 0,Unnamed: 1,all_jaccard,all_overlap,all_damerau_levenshtein,all_jaro_winkler,description_jaccard,description_overlap,description_damerau_levenshtein,description_jaro_winkler,manufacturer_jaccard,manufacturer_overlap,manufacturer_damerau_levenshtein,manufacturer_jaro_winkler,name_jaccard,name_overlap,name_damerau_levenshtein,name_jaro_winkler,price_empty_num,price_abs_diff,price_abs_diff_log10,all_tfidf
536,4293,0.090909,0.234375,0.291391,0.679216,0.065217,0.191489,0.256881,0.653471,0.0,0.0,0.0,0.0,0.052632,0.111111,0.267606,0.57945,0,10.04,1.001734,0.290869
1186,1931,0.079268,0.206349,0.234247,0.594955,0.033784,0.09434,0.212575,0.59372,0.0,0.0,0.0,0.0,0.181818,0.5,0.4,0.828095,0,899.01,2.953765,0.186789
1273,3805,0.131034,0.339286,0.265116,0.666864,0.103175,0.333333,0.233553,0.636223,0.0,0.0,0.0,0.0,0.0,0.0,0.168675,0.49759,0,19.05,1.279895,0.290889


## Clustering