In [None]:
!pip install duckdb

In [None]:
import duckdb
import time
import pandas as pd

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from py_duckdb.similarity_join import tokenizers
from py_duckdb.similarity_join import jaccard_join, jaccard_join_brute_force
from py_duckdb.similarity_join import evaluate

In [None]:
def join_fn_exec_time(n, join_fn, *args, **kwargs):
    exec_time = []
    for i in range(0, n):
        start_time = time.time()
        join_fn(*args, **kwargs)
        end_time = time.time()
        exec_time.append(end_time - start_time)
        print(end_time - start_time, 's')
    return exec_time

In [None]:
import string
import numpy as np

def test_vs_brute_force(
        con: duckdb.DuckDBPyConnection,
        l_table: string,
        r_table: string,
        l_key_attr: string,
        r_key_attr: string,
        l_join_attr: string,
        r_join_attr: string,
        tokenizer: tokenizers.Tokenizer,
        threshold: float,
        out_table_name: string,
        n=1
):
    exec_times = []
    exec_times_bf = []

    print("FILTERED EXECUTIONS")
    exec_times = join_fn_exec_time(
        n, jaccard_join,
        con, l_table, r_table, l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizer, threshold, out_table_name
    )
    print("Average execution time:", np.average(exec_times))

    print()
    print("BRUTE FORCE EXECUTIONS")
    exec_times_bf = join_fn_exec_time(
        n, jaccard_join_brute_force,
        con, l_table, r_table, l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizer, threshold, "bf_" + out_table_name
    )
    print("Average execution time:", np.average(exec_times_bf))

    print()
    cmp_join = con.execute(
        "select * "
        f"from {out_table_name} m "
        f"full outer join bf_{out_table_name} b "
        "on (b.rid1 = m.rid1 and b.rid2 = m.rid2) "
        "or (b.rid1 = m.rid2 and b.rid2 = m.rid1) "
        "where m.rid1 is null "
        "or b.rid1 is null"
    ).fetchall()
    if len(cmp_join) == 0:
        print("SUCCESS! Filtered join and Brute force join returned the same result")
    else:
        print("ERROR! There are mismatches between Filtered and Brute force joins:", cmp_join)

    return {
        'exec_time': exec_times,
        'exec_time_bf': exec_times_bf
    }

In [None]:
def test_confusion_mtx(
        con: duckdb.DuckDBPyConnection,
        l_table: string,
        r_table: string,
        l_key_attr: string,
        r_key_attr: string,
        l_join_attr: string,
        r_join_attr: string,
        tokenizer: tokenizers.Tokenizer,
        threshold: float,
        out_table_name: string,
        ground_truth_table: string,
        n=1
):
    exec_time = join_fn_exec_time(
        n, jaccard_join,
        con, l_table, r_table, l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizer, threshold, out_table_name
    )
    print("Average execution time:", np.average(exec_time))
    print(evaluate(con, ground_truth_table, out_table_name))
    print()
    return exec_time

In [None]:
con = duckdb.connect(database=':memory:')

# Test case: Actors

In [None]:
con.execute("drop table if exists src1").execute(
    "CREATE TABLE src1 AS "
    "SELECT id as rid, concat(given_name, ' ', surname, ' ', date_of_birth) as val "
    "FROM 'data/S1_clean_.csv'"
).execute("select * from src1").fetchall()

In [None]:
con.execute("drop table if exists src2").execute(
    "CREATE TABLE src2 AS "
    "SELECT id as rid, concat(given_name, ' ', surname, ' ', date_of_birth) as val "
    "FROM 'data/S2_clean_.csv'"
).execute("select * from src2").fetchall()

In [None]:
con.execute("drop table if exists src3").execute(
    "CREATE TABLE src3 AS "
    "SELECT id as rid, concat(given_name, ' ', surname, ' ', date_of_birth) as val "
    "FROM 'data/S3_clean_.csv'"
).execute("select * from src3").fetchall()

In [None]:
con.execute("drop view if exists srcall").execute(
    "create view srcall as "
    "select * from src1 "
    "union "
    "select * from src2 "
    "union "
    "select * from src3 "
).execute("select * from srcall").fetchall()

In [None]:
# function args
l_table = 'src1'
r_table = 'src2'

l_key_attr = 'rid'
r_key_attr = 'rid'
l_join_attr = 'val'
r_join_attr = 'val'
# tokenizer = tokenizers.WordsTokzr(r"' '")
tokenizer = tokenizers.QGramsTokzr(3)
threshold = 0.5
out_table_name = 'matches'

In [None]:
test_vs_brute_force(
    con, 'srcall', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizer, threshold, out_table_name, 10
)

In [None]:
test_vs_brute_force(
    con, 'src1', 'src2', l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizer, threshold, out_table_name, 10
)

# Test case: NCVR

In [None]:
to_concat = ", ' ', ".join(["entity", "rec_id", "first_name", "last_name", "sex", "age", "birth_place", "house_num", "county_desc", "street_name", "zip_code", "phone_num"])
to_concat

In [None]:
con.execute("drop table if exists src1").execute(
    "CREATE TABLE src1 AS "
    f"SELECT id as rid, concat({to_concat}) as val "
    "FROM 'data/NCVR_AF_clean.csv'"
).execute("select * from src1").fetchall()

In [None]:
con.execute("drop table if exists src2").execute(
    "CREATE TABLE src2 AS "
    f"SELECT id as rid, concat({to_concat}) as val "
    "FROM 'data/NCVR_BF_clean.csv'"
).execute("select * from src2").fetchall()

In [None]:
con.execute("drop table if exists src3").execute(
    "CREATE TABLE src3 AS "
    f"SELECT id as rid, concat({to_concat}) as val "
    "FROM 'data/NCVR_CF_clean.csv'"
).execute("select * from src3").fetchall()

In [None]:
con.execute("drop view if exists srcall").execute(
    "create view srcall as "
    "select * from src1 "
    "union "
    "select * from src2 "
    "union "
    "select * from src3 "
).execute("select * from srcall").fetchall()

In [None]:
l_key_attr = 'rid'
r_key_attr = 'rid'
l_join_attr = 'val'
r_join_attr = 'val'
tokenizer = tokenizers.WordsTokzr("' '")
# tokenizer = tokenizers.QGramsTokzr(3)
threshold = 0.5
out_table_name = 'matches'

In [None]:
test_vs_brute_force(
    con, 'srcall', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizer, threshold, out_table_name, 10
)

In [None]:
test_vs_brute_force(
    con, 'src1', 'src2', l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizer, threshold, out_table_name, 10
)

In [None]:
con.execute("drop table if exists src1")
con.execute("drop table if exists src2")
con.execute("drop table if exists src3")
con.execute("drop view if exists srcall")

## Test Case: Profiles

In [None]:
df10 = pd.read_json("data/10Kprofiles.json", lines=True, orient='records', typ='frame')

In [None]:
df10.head()

In [None]:
to_concat = ", ' ', ".join(["date_of_birth", "surname", "address_1", "street_number", "postcode", "soc_sec_id", "suburb", "phone_number", "state", "given_name", "age", "address_2"])
to_concat

In [None]:
con.execute("drop table if exists db10").execute(
    "CREATE TABLE db10 AS "
    f"SELECT realProfileID as rid, concat ({to_concat}) as val "
    "FROM df10"
).execute("select * from db10").fetchall()

In [None]:
df10gt = pd.read_json("data/10KIdDuplicates.json", lines=True, orient='records', typ='frame')
df10gt.head()

In [None]:
con.execute("drop table if exists db10gt").execute(
    "CREATE TABLE db10gt AS "
    "SELECT d1Id as rid1, d2Id as rid2 "
    "FROM df10gt "
).execute("select count(*) from db10gt").fetchall()

In [None]:
l_key_attr = 'rid'
r_key_attr = 'rid'
l_join_attr = 'val'
r_join_attr = 'val'
out_table_name = 'matches'

In [None]:
con.execute("drop table if exists db10_sample").execute(
    "CREATE TABLE db10_sample AS "
    "SELECT * "
    "FROM db10 "
    "using sample 2000"
).execute("select * from db10_sample").fetchall()

In [None]:
con.execute("drop view if exists db10gt_sample").execute(
    "create view db10gt_sample as "
    "select gt.* "
    "from db10_sample s1, db10_sample s2, db10gt gt "
    "where s1.rid = gt.rid1 "
    "and s2.rid = gt.rid2"
).execute("select count(*) from db10gt_sample").fetchall()

In [None]:
test_vs_brute_force(
    con, 'db10_sample', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr,
    tokenizers.WordsTokzr("' '"),
    0.5,
    out_table_name, 5
)
evaluate(con, 'db10gt_sample', out_table_name)

In [None]:
test_confusion_mtx(
    con, 'db10_sample', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr,
    tokenizers.QGramsTokzr(5),
    0.5,
    out_table_name, 'db10gt_sample', 5
)

In [None]:
for t in [0.6, 0.5, 0.4, 0.3, 0.2]:
    print("threshold =", t)
    test_confusion_mtx(con, 'db10_sample', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr,
          # tokenizers.QGramsTokzr(5),
          tokenizers.WordsTokzr("' '"),
          t, out_table_name, "db10gt_sample", 5)

In [None]:
for t in [0.6, 0.5, 0.4, 0.3, 0.2]:
    print("threshold =", t)
    test_confusion_mtx(con, 'db10', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr,
          # tokenizers.QGramsTokzr(5),
          tokenizers.WordsTokzr("' '"),
          t, out_table_name, "db10gt", 5)

## Test case: larger Profiles datasets

In [None]:
df50 = pd.read_json("data/50Kprofiles.json", lines=True, orient='records', typ='frame')

In [None]:
con.execute("drop table if exists src2").execute(
    "CREATE TABLE src2 AS "
    f"SELECT realProfileID as rid, concat ({to_concat}) as val "
    "FROM df50"
).execute("select * from src2").fetchall()

In [None]:
df100 = pd.read_json("data/100Kprofiles.json", lines=True, orient='records', typ='frame')

In [None]:
con.execute("drop table if exists src3").execute(
    "CREATE TABLE src3 AS "
    f"SELECT realProfileID as rid, concat ({to_concat}) as val "
    "FROM df100"
).execute("select * from src3").fetchall()

In [None]:
start_time = time.time()
jaccard_join(
    con, 'src2', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizers.WordsTokzr("' '"), 0.5, out_table_name
)
time.time() - start_time

In [None]:
start_time = time.time()
jaccard_join(
    con, 'src3', '', l_key_attr, r_key_attr, l_join_attr, r_join_attr, tokenizers.QGramsTokzr("' '"), 0.5, out_table_name
)
time.time() - start_time