In [1]:
datafile = '../../examples/notebooks/parking-violations/jt7v-77mi.tsv.gz'

from openclean.pipeline import stream

df = stream(datafile)

df.select('Street').head()

Unnamed: 0,Street
0,E 5 ST
1,MARINE AVENUE
2,56 ST
3,56 ST
4,ANDREWS AVE
5,BROADWAY
6,34 AVE
7,THOMAS BOYLAND ST
8,7 AVE
9,FT HAMILTON PKWY


In [2]:
streets = list(df.select('Street').distinct())
print('{} distinct street names'.format(len(streets)))

115567 distinct street names


In [3]:
from openclean.function.token.key import Fingerprint
import time

In [4]:
# Runtime for computig the fingerprint for all street values.

f = Fingerprint()

total_time = 0
for i in range(10):
    start = time.perf_counter()
    for str in streets:
        f(str)
    end = time.perf_counter()
    total_time += (end - start)
    
print(total_time/10)

1.0051497947999906


In [5]:
# Use a standard list to collect key-value pairs and then sort the
# list for cluster generation.

total_time = 0
for i in range(10):
    start = time.perf_counter()
    values = [(f(val), val) for val in streets]
    values.sort(key=lambda v:v[0])
    result = list()
    pk, v = values[0]
    cluster = [v]
    for i in range(1, len(values)):
        k, v = values[i]
        if k == pk:
            cluster.append(v)
        else:
            if len(cluster) > 1:
                result.append(cluster)
            cluster = [v]
            pk = k
    if len(cluster) > 1:
        result.append(cluster)
    end = time.perf_counter()
    total_time += (end - start)
    
print(total_time/10)

1.169703000499976


In [6]:
# Use a sorted list to collect key-value pairs for cluster generation.

from sortedcontainers import SortedList

total_time = 0
for i in range(10):
    start = time.perf_counter()
    values = SortedList(key=lambda v:v[0])
    for val in streets:
        values.add((f(val), val))
    result = list()
    pk, v = values[0]
    cluster = [v]
    for i in range(1, len(values)):
        k, v = values[i]
        if k == pk:
            cluster.append(v)
        else:
            if len(cluster) > 1:
                result.append(cluster)
            cluster = [v]
            pk = k
    if len(cluster) > 1:
        result.append(cluster)
    end = time.perf_counter()
    total_time += (end - start)
    
print(total_time/10)

1.8657956258000012


In [7]:
# Use a default dictionary to generate clusters without sorting.

from collections import defaultdict

total_time = 0

for i in range(10):
    start = time.perf_counter()
    values = defaultdict(list)
    for val in streets:
        values[f(val)].append(val)
    result = [c for c in values.values() if len(c) > 1]
    end = time.perf_counter()
    total_time += (end - start)
    
print(total_time/10)

1.169829982400006


In [10]:
# Parallel key generation.

import multiprocessing as mp

f = Fingerprint()

def key_value_pair(value):
    return f(val), val
    

for cpus in [2, 3, 4]:
    p = mp.Pool(processes=cpus)
    total_time = 0
    for i in range(10):
        start = time.perf_counter()
        values = p.map(key_value_pair, streets)
        end = time.perf_counter()
        total_time += (end - start)

    print('{} for {} CPUs'.format(total_time/10, cpus))

0.7203427761000512 for 2 CPUs
0.6470907872001135 for 3 CPUs
0.628707971599988 for 4 CPUs
