In [34]:
import os
import time
import logging
import optparse
import locale
import itertools
import io
import csv
import dj_database_url
import psycopg2
import psycopg2.extras
import dedupe
import numpy
import json

from psycopg2.extensions import register_adapter, AsIs

register_adapter(numpy.int32, AsIs)
register_adapter(numpy.int64, AsIs)
register_adapter(numpy.float32, AsIs)
register_adapter(numpy.float64, AsIs)

## 0. Installing Extra Packages

In [None]:
!pip install -q dedupe
!pip install -q dj-database-url

## 1. Methods/Classes

In [1]:
class Readable(object):

    def __init__(self, iterator):

        self.output = io.StringIO()
        self.writer = csv.writer(self.output)
        self.iterator = iterator

    def read(self, size):

        self.writer.writerows(itertools.islice(self.iterator, size))

        chunk = self.output.getvalue()
        self.output.seek(0)
        self.output.truncate(0)

        return chunk

def record_pairs(result_set):

    for i, row in enumerate(result_set):
        a_record_id, a_record, b_record_id, b_record = row
        record_a = (a_record_id, a_record)
        record_b = (b_record_id, b_record)

        yield record_a, record_b

        if i % 10000 == 0:
            print(row)
            logger.info(row)

def cluster_ids(clustered_dupes):

    for cluster, scores in clustered_dupes:
        cluster_id = cluster[0]
        for donor_id, score in zip(cluster, scores):
            yield donor_id, cluster_id, score            

## 2. Setup

In [2]:
# Control verbosity
verbose = 1

if verbose == 1:
    log_level = logging.INFO
elif verbose >= 2:
    log_level = logging.DEBUG
    
logging.getLogger().setLevel(log_level)

# Preexisting settings/training file

settings_file = 'wh_dedupe_settings'
training_file = 'wh_dedupe_training.json'

In [5]:
# set environment variable DATABASE_URL
# template: %env DATABASE_URL=postgres://{user}:{password}@{host}/{db-name}
%env DATABASE_URL=postgres://test:testpassword@localhost/whitehouse-db

# Connect to DB
db_conf = dj_database_url.config()

if not db_conf:
    raise Exception(
        'set DATABASE_URL environment variable with your connection, e.g. '
        'export DATABASE_URL=postgres://user:password@host/mydatabase'
    )
    
read_con = psycopg2.connect(database=db_conf['NAME'],
                            user=db_conf['USER'],
                            password=db_conf['PASSWORD'],
                            host=db_conf['HOST'],
                            cursor_factory=psycopg2.extras.RealDictCursor)
write_con = psycopg2.connect(database=db_conf['NAME'],
                             user=db_conf['USER'],
                             password=db_conf['PASSWORD'],
                             host=db_conf['HOST'])

env: DATABASE_URL=postgres://test:testpassword@localhost/whitehouse-db


## 3. Deduplication

In [6]:
fields =  [{'field': 'firstname', 'variable name': 'firstname',
               'type': 'String','has missing': True},
              {'field': 'lastname', 'variable name': 'lastname',
               'type': 'String','has missing': True},
              {'field': 'uin', 'variable name': 'uin',
               'type': 'String','has missing': True},
              {'field': 'meeting_loc', 'variable name': 'meeting_loc',
               'type': 'String','has missing': True}
              ]

deduper = dedupe.Dedupe(fields, num_cores=4)

In [7]:
VISITOR_SELECT = """
SELECT visitor_id, lastname, firstname, uin, meeting_loc 
FROM visitors_er
"""

In [9]:
# Read from 'processed_donors'
with read_con.cursor('visitor_select') as cur:
    cur.execute(VISITOR_SELECT)
    temp_d = {i: row for i, row in enumerate(cur)}
    
    # example element of temp_d:
    # RealDictRow([('donor_id', 435),
    #         ('city', None),
    #         ('name', '12-19-02 cash deposit'),
    #         ('zip', None),
    #         ('state', 'il'),
    #        ('address', None)])

with read_con.cursor('count') as cur:
    cur.execute('SELECT COUNT(*) FROM visitors_er')
    print('Num. rows', cur.fetchone()['count'])

if os.path.exists(training_file):
    print('reading labeled examples from ', training_file)
    with open(training_file) as tf:
        deduper.prepare_training(temp_d, tf)
else:
    deduper.prepare_training(temp_d)
    
del temp_d

INFO:dedupe.api:reading training from file


reading labeled examples from  wh_dedupe_training.json


INFO:dedupe.training:Final predicate set:
INFO:dedupe.training:(SimplePredicate: (commonIntegerPredicate, uin), SimplePredicate: (commonSixGram, firstname))
INFO:dedupe.training:Final predicate set:
INFO:dedupe.training:(SimplePredicate: (commonSixGram, lastname), SimplePredicate: (sameSevenCharStartPredicate, firstname))


In [30]:
dedupe.console_label(deduper)

AttributeError: 'Dedupe' object has no attribute 'active_learner'

In [11]:
with open(training_file, 'w') as tf:
    deduper.write_training(tf)

# Train model
deduper.train(recall=0.9)

with open(settings_file, 'wb') as sf:
    deduper.write_settings(sf)
    
deduper.cleanup_training()

In [15]:
# Create blocking map with blocking text + visitor IDs
with write_con:
    with write_con.cursor() as cur:
        cur.execute("DROP TABLE IF EXISTS blocking_map")
        cur.execute("CREATE TABLE blocking_map (block_key text, visitor_id INTEGER)")

In [16]:
for field in deduper.fingerprinter.index_fields:
    with read_con.cursor('field_values') as cur:
        cur.execute("SELECT DISTINCT %s FROM visitors_er" % field)
        field_data = (row[field] for row in cur)
        deduper.fingerprinter.index(field_data, field)

In [17]:
with read_con.cursor('visitor_select') as read_cur:
    read_cur.execute(VISITOR_SELECT)
    full_data = ((row['visitor_id'], row) for row in read_cur)
    b_data = deduper.fingerprinter(full_data)
    with write_con:
        with write_con.cursor() as write_cur:
            write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV',
                                  Readable(b_data),
                                  size=10000)

INFO:dedupe.blocking:10000, 0.1721062 seconds
INFO:dedupe.blocking:20000, 0.3325722 seconds
INFO:dedupe.blocking:30000, 0.4966472 seconds
INFO:dedupe.blocking:40000, 0.6591962 seconds
INFO:dedupe.blocking:50000, 0.8156152 seconds
INFO:dedupe.blocking:60000, 0.9859512 seconds
INFO:dedupe.blocking:70000, 1.1532812 seconds
INFO:dedupe.blocking:80000, 1.3116402 seconds
INFO:dedupe.blocking:90000, 1.4630142 seconds
INFO:dedupe.blocking:100000, 1.6187132 seconds
INFO:dedupe.blocking:110000, 1.7705882 seconds
INFO:dedupe.blocking:120000, 1.9475912 seconds
INFO:dedupe.blocking:130000, 2.1053602 seconds
INFO:dedupe.blocking:140000, 2.2556522 seconds
INFO:dedupe.blocking:150000, 2.4097992 seconds
INFO:dedupe.blocking:160000, 2.5613992 seconds
INFO:dedupe.blocking:170000, 2.7214142 seconds
INFO:dedupe.blocking:180000, 2.8908032 seconds
INFO:dedupe.blocking:190000, 3.0571072 seconds
INFO:dedupe.blocking:200000, 3.2144032 seconds
INFO:dedupe.blocking:210000, 3.3689842 seconds
INFO:dedupe.blocking:2

In [18]:
# Create index for faster querying
with write_con:
    with write_con.cursor() as cur:
        cur.execute("CREATE UNIQUE INDEX ON blocking_map "
                    "(block_key text_pattern_ops, visitor_id)")

In [19]:
# Create table to store result of deduplication
with write_con:
    with write_con.cursor() as cur:
        cur.execute("DROP TABLE IF EXISTS entity_map")
        cur.execute("CREATE TABLE entity_map "
                    "(visitor_id INTEGER, canon_id INTEGER, "
                    " cluster_score FLOAT, PRIMARY KEY(visitor_id))")

creating entity_map database


In [20]:
with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur:
        read_cur.execute("""
        select 
        a.visitor_id, row_to_json((select d from (select a.firstname,
                                                         a.lastname,
                                                         a.uin,
                                                         a.meeting_loc) d)),
        b.visitor_id, row_to_json((select d from (select b.firstname,
                                                         b.lastname,
                                                         b.uin,
                                                         b.meeting_loc) d))
        from
        (select DISTINCT l.visitor_id as east, r.visitor_id as west
        from blocking_map as l
        INNER JOIN blocking_map as r
        using (block_key)
        -- to get rid of duplicates?
        where l.visitor_id < r.visitor_id) ids
        INNER JOIN visitors_er a on ids.east = a.visitor_id
        INNER JOIN visitors_er b on ids.west = b.visitor_id
                """)
        clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), threshold=0.5)
        
        with write_con:
            with write_con.cursor() as write_cur:
                write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV',
                                      Readable(cluster_ids(clustered_dupes)),
                                      size=10000)

(1, {'firstname': 'CHRISTINE', 'lastname': 'ADAMS', 'uin': 'U51772', 'meeting_loc': 'WH'}, 265902, {'firstname': 'KRISTEN', 'lastname': 'ADAMS', 'uin': 'U70858', 'meeting_loc': 'WH'})
(39007, {'firstname': 'JOHNNIE', 'lastname': 'WAUCHOP', 'uin': 'U47707', 'meeting_loc': 'OEOB'}, 39080, {'firstname': 'JOHNNIE', 'lastname': 'WAUCHOP', 'uin': 'U51828', 'meeting_loc': 'OEOB'})
(49853, {'firstname': 'WILLIAM', 'lastname': 'OKEEFE', 'uin': 'U47906', 'meeting_loc': 'OEOB'}, 49930, {'firstname': 'WILLIAM', 'lastname': 'OKEEFE', 'uin': 'U47898', 'meeting_loc': 'OEOB'})
(83247, {'firstname': 'ANNIE', 'lastname': 'WARD', 'uin': 'U44328', 'meeting_loc': 'WH'}, 226525, {'firstname': 'JANE', 'lastname': 'WARD', 'uin': 'U65994', 'meeting_loc': 'WH'})
(109676, {'firstname': 'JASON', 'lastname': 'LEVITIS', 'uin': 'U57773', 'meeting_loc': 'OEOB'}, 116802, {'firstname': 'JASON', 'lastname': 'LEVITIS', 'uin': 'U57723', 'meeting_loc': 'OEOB'})
(139293, {'firstname': 'JULIA', 'lastname': 'SMOOT', 'uin': 'U

(183041, {'firstname': 'JAY', 'lastname': 'ANGOFF', 'uin': 'U63850', 'meeting_loc': 'OEOB'}, 310664, {'firstname': 'JAY', 'lastname': 'ANGOFF', 'uin': 'U78506', 'meeting_loc': 'OEOB'})
(262783, {'firstname': 'JEANNE', 'lastname': 'LAMBREW', 'uin': 'U65280', 'meeting_loc': 'OEOB'}, 322530, {'firstname': 'JEANNE', 'lastname': 'LAMBREW', 'uin': 'U80431', 'meeting_loc': 'OEOB'})
(302277, {'firstname': 'EMILY', 'lastname': 'LOCKE', 'uin': 'U75774', 'meeting_loc': 'WH'}, 376301, {'firstname': 'EMILY', 'lastname': 'LOCKE', 'uin': 'U89266', 'meeting_loc': 'WH'})
(380747, {'firstname': 'JOSHUA', 'lastname': 'FREELY', 'uin': 'U89931', 'meeting_loc': 'OEOB'}, 409278, {'firstname': 'JOSHUA', 'lastname': 'FREELY', 'uin': 'U90889', 'meeting_loc': 'NEOB'})
(35640, {'firstname': 'ANDREA', 'lastname': 'CERNICH', 'uin': 'U47825', 'meeting_loc': 'WH'}, 306315, {'firstname': 'ANDREA', 'lastname': 'CERNICH', 'uin': 'U73640', 'meeting_loc': 'OEOB'})
(47187, {'firstname': 'CATHERINE', 'lastname': 'LIVINGSTON

## 4. Results

In [41]:
with read_con.cursor() as read_cur:
    read_cur.execute("""
    SELECT DISTINCT COUNT(canon_id) 
    FROM (SELECT DISTINCT canon_id FROM entity_map) a
    """)
    print("Distinct entities:", read_cur.fetchone()['count'])
    read_cur.execute("SELECT COUNT(*) FROM entity_map")
    print("All references:", read_cur.fetchone()['count'])
    
    print()
    
    # Get top 10 duplicated IDs
    read_cur.execute("""
    SELECT canon_id, COUNT(*) 
    FROM entity_map 
    GROUP by canon_id
    ORDER BY count DESC
    LIMIT 10
    """)
    print('Top 10 duplicated IDs:\n', json.dumps(read_cur.fetchall(), indent=4))

Distinct entities: 62215
All references: 188927

Top 10 duplicated IDs:
 [
    {
        "canon_id": 101338,
        "count": 243
    },
    {
        "canon_id": 46807,
        "count": 199
    },
    {
        "canon_id": 46770,
        "count": 158
    },
    {
        "canon_id": 264888,
        "count": 154
    },
    {
        "canon_id": 50380,
        "count": 150
    },
    {
        "canon_id": 50433,
        "count": 147
    },
    {
        "canon_id": 50456,
        "count": 116
    },
    {
        "canon_id": 46879,
        "count": 111
    },
    {
        "canon_id": 46827,
        "count": 110
    },
    {
        "canon_id": 50400,
        "count": 99
    }
]


In [42]:
# Change 'query_canon_id' to check quality of deduplication

query_canon_id = 101338
query_limit = 10

with read_con.cursor() as cur:
    cur.execute(f"""
    SELECT *
    FROM visitors_er
    WHERE visitor_id = ANY (SELECT visitor_id FROM entity_map WHERE canon_id = {query_canon_id})
    LIMIT {query_limit}
    """)
    
    print(f'Entries for canon_id {query_canon_id}:\n', json.dumps(cur.fetchall(), indent=4))

Entries for canon_id 101338:
 [
    {
        "visitor_id": 101338,
        "lastname": "FREELY",
        "firstname": "JOSHUA",
        "uin": "U56901",
        "apptmade": "11/5/2010 13:21",
        "apptstart": "734084",
        "apptend": "734084",
        "meeting_loc": "NEOB"
    },
    {
        "visitor_id": 101596,
        "lastname": "FREELY",
        "firstname": "JOSHUA",
        "uin": "U56896",
        "apptmade": "11/5/2010 13:13",
        "apptstart": "734084",
        "apptend": "734084",
        "meeting_loc": "NEOB"
    },
    {
        "visitor_id": 103225,
        "lastname": "FREELY",
        "firstname": "JOSHUA",
        "uin": "U56897",
        "apptmade": "11/5/2010 13:18",
        "apptstart": "734085",
        "apptend": "734085",
        "meeting_loc": "NEOB"
    },
    {
        "visitor_id": 104539,
        "lastname": "FREELY",
        "firstname": "JOSHUA",
        "uin": "U56038",
        "apptmade": "11/2/2010 17:03",
        "apptstart": "734085",
  