# Parking Violations - Street Name Clustering (Performance comparison)

This notebook evaluates run times for clustering large datasets. It compares two options:

- create clusters using the stream operator, and
- extract distinct value stes using the stream operator and then cluster separately using multiple threads,

In [1]:
# Download the full 'NYC Parking Violations Issued - Fiscal Year 2014' dataset.
# Note that the downloaded full dataset file is about 380 MB in size! Use the
# alternative data file with 10,000 rows that is included in the repository if
# you do not want to download the full data file.

import gzip
import humanfriendly
import os

from openclean.data.source.socrata import Socrata

dataset = Socrata().dataset('jt7v-77mi')
datafile = './jt7v-77mi.tsv.gz'

# Download file only if it does not exist already.
if not os.path.isfile(datafile):
    with gzip.open(datafile, 'wb') as f:
        print('Downloading ...\n')
        dataset.write(f)


# As an alternative, you can also use the smaller dataset sample that is
# included in the repository.
#
#datafile = './data/jt7v-77mi.tsv.gz'

fsize = humanfriendly.format_size(os.stat(datafile).st_size)
print("Using '{}' in file {} of size {}".format(dataset.name, datafile, fsize))

Using 'Parking Violations Issued - Fiscal Year 2014' in file ./jt7v-77mi.tsv.gz of size 379.19 MB


In [2]:
# Create data stream for the downloaded file.

from openclean.pipeline import stream

ds = stream(datafile)

## Stream Clusters vs. Mutiple-Threads

Compare run time for using the stream operator to perform the clustering or extracting the list of distinct street names first and then run clustering in parallel (using up to 4 threads).

In [3]:
import time

from openclean.cluster.key import KeyCollision
from openclean.function.value.key.fingerprint import Fingerprint
from openclean_geo.address.usstreet import StandardizeUSStreetName

In [4]:
# Extract list of distinct street names.

start_parse = time.perf_counter()

streets = ds.distinct_values('Street')

end_parse = time.perf_counter()

print('Parse time {:0.4f} sec. ({} streets)'.format(end_parse - start_parse, len(streets)))

Parse time 57.0615 sec. (115567 streets)


In [5]:
# Standardize street names in parallel.

f = StandardizeUSStreetName(characters='upper', alphanum=True, repeated=False)

for threads in range(1,5):
    start_std = time.perf_counter()
    streets_std = f.apply(streets, threads=threads)
    count = len(streets_std)
    end_std = time.perf_counter()
    exec_time = end_std - start_std
    print('Standardization time (using {} threads) {:0.4f} sec. ({} streets)'.format(threads, exec_time, count))

Standardization time (using 1 threads) 3.0328 sec. (115567 streets)
Standardization time (using 2 threads) 2.2598 sec. (115567 streets)
Standardization time (using 3 threads) 1.6198 sec. (115567 streets)
Standardization time (using 4 threads) 1.6093 sec. (115567 streets)


In [6]:
# Cluster standardized streen names.

for threads in range(1,5):
    f = KeyCollision(func=Fingerprint(), threads=threads)
    start_clstr = time.perf_counter()
    clusters = f.clusters(streets_std)
    count = len(clusters)
    end_clstr = time.perf_counter()
    exec_time = end_clstr - start_clstr
    print('Cluster time (using {} threads) {:0.4f} sec. ({} clusters)'.format(threads, exec_time, count))

Cluster time (using 1 threads) 1.6068 sec. (761 clusters)
Cluster time (using 2 threads) 1.1699 sec. (761 clusters)
Cluster time (using 3 threads) 1.0394 sec. (761 clusters)
Cluster time (using 4 threads) 1.0042 sec. (761 clusters)


In [7]:
# Run standardization and cluster generation as part of the
# stream processing.

start = time.perf_counter()

streets = ds\
    .select('Street')\
    .update('Street', StandardizeUSStreetName(characters='upper', alphanum=True, repeated=False))

clusters = streets.cluster(clusterer=KeyCollision(func=Fingerprint()))

end = time.perf_counter()

print('Runtime {:0.4f} sec. ({} clusters)'.format(end - start, len(clusters)))

Runtime 310.8050 sec. (761 clusters)
