Skip to content

Commit

Permalink
refactored for performance improvements
Browse files Browse the repository at this point in the history
+ using sqlite3.Row factory
+ replaced simple for loops with list comprehension
+ using SELECT IN statements to reduce number of sqlite calls for TFIDF createCanopies
+ using slpit instead of regex for greating TFIDF tokens
  • Loading branch information
derekeder committed Dec 12, 2012
1 parent 788ba66 commit 4de1a52
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 86 deletions.
23 changes: 10 additions & 13 deletions dedupe/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ def __call__(self, instance) :
def invertIndex(self, data_d) :
for record_id, record in data_d :
for _, field in self.tfidf_thresholds :
tokens = tfidf.getTokens(record[field])
tokens = tfidf.getTokens(record[str(field)])
for token in set(tokens) :
self.inverted_index[field][token].add(record_id)
self.corpus_ids.add(record_id)


def createCanopies(self, select_function, field, threshold) :
"""
A function that returns
Expand All @@ -74,14 +73,13 @@ def createCanopies(self, select_function, field, threshold) :
while corpus_ids :
center_id = corpus_ids.pop()
blocked_data[center_id] = center_id
center = select_function(center_id)
doc_id, center = list(select_function([center_id]))[0]

seen_set.add(center_id)
print center_id
#print "center_id", center_id
# print doc_id, center
if not center :
continue



# initialize the potential block with center
candidate_set = set([])
Expand All @@ -93,8 +91,9 @@ def createCanopies(self, select_function, field, threshold) :

# print candidate_set
candidate_set = candidate_set - seen_set
for doc_id in candidate_set :
candidate_dict = tfidf.tfidfDict(select_function(doc_id), self.df_index)
for doc_id, candidate_field in select_function(candidate_set) :
#print doc_id, candidate_field
candidate_dict = tfidf.tfidfDict(candidate_field, self.df_index)

similarity = tfidf.cosineSimilarity(candidate_dict, center_dict)

Expand Down Expand Up @@ -251,7 +250,7 @@ def trainBlocking(self, disjunctive=True):
max_block_sizes = {}
for pred, blocking in distinct_blocks.iteritems() :
max_block_size = max(len(v) for v in blocking.values())
print max_block_size
#print max_block_size

max_block_sizes[pred] = max_block_size

Expand All @@ -265,11 +264,12 @@ def trainBlocking(self, disjunctive=True):
# Only consider predicates that cover at least one duplicate pair
self.predicate_set = found_dupes.keys()


# We want to throw away the predicates that puts together too
# many distinct pairs
[self.predicate_set.remove(predicate)
for predicate
in found_distinct
in set(self.predicate_set).intersection(found_distinct.keys())
if max_block_sizes[predicate] >= self.coverage_threshold]

# Expected number of predicates that should cover a duplicate
Expand All @@ -295,9 +295,6 @@ def trainBlocking(self, disjunctive=True):
print 'No predicate found!'
raise



#@profile
def predicateCoverage(self, pairs, return_blocks=False):
coverage = defaultdict(list)
blocks = defaultdict(lambda: defaultdict(set))
Expand Down
6 changes: 5 additions & 1 deletion dedupe/dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,11 @@ def _learnBlocking(self, data_d, eta, epsilon):
tfidf_thresholds = [0.2, 0.4, 0.6, 0.8]
full_string_records = {}
for k, v in data_d.iteritems() :
full_string_records[k] = " ".join(v.values())
document = ''
for field in self.data_model['fields'].keys() :
document += v[field]
document += ' '
full_string_records[k] = document

self.df_index = tfidf.documentFrequency(full_string_records)

Expand Down
24 changes: 4 additions & 20 deletions dedupe/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def invertedIndex(corpus):
return inverted_index

def getTokens(str):
return re.findall(r"[\w'@#]+", str.lower())
return str.lower().split()

def createCanopies(corpus_original, df_index, threshold) :
blocked_data = []
Expand Down Expand Up @@ -107,11 +107,8 @@ def createCanopies(corpus_original, df_index, threshold) :
return blocked_data

def cosineSimilarity(doc_dict_1, doc_dict_2) :
dot_product = 0

common_keys = set(doc_dict_1.keys()) & set(doc_dict_2.keys())
for key in common_keys :
dot_product += doc_dict_1[key] * doc_dict_2[key]
dot_product = sum(doc_dict_1[key] * doc_dict_2[key] for key in common_keys)

if dot_product == 0 :
return 0
Expand All @@ -123,25 +120,12 @@ def cosineSimilarity(doc_dict_1, doc_dict_2) :
return dot_product / (norm_1 * norm_2)

def calculateNorm(doc_dict) :
norm = 0
for value in doc_dict.values() :
norm += value*value

norm = sum(value*value for value in doc_dict.values())
return math.sqrt(norm)

def tfidfDict(doc, df_index) :
tokens = getTokens(doc)
doc_dict = {}
for token in set(tokens) :
try:
doc_dict[token] = tokens.count(token) * df_index[token]
except KeyError :
print token
print tokens
print df_index.__class__
raise

return doc_dict
return dict((token, tokens.count(token) * df_index[token]) for token in set(tokens))

# testing basic TF-IDF
# corpus = {1: "Forest is cool and stuff", 2: "Derek is cool and maybe other stuff"}
Expand Down
104 changes: 52 additions & 52 deletions examples/sqlite_example/sqlite_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def blocking_factory(cursor, row):

def get_sample(cur, size):
cur.execute("SELECT * FROM donors ORDER BY RANDOM() LIMIT ?", (size,))
return dict(cur.fetchall())
return dict([(row['donor_id'], row) for row in cur])


settings_file = 'sqlite_example_settings.json'
Expand All @@ -48,7 +48,7 @@ def get_sample(cur, size):

print 'selecting random sample from donors table...'
con = sqlite3.connect("examples/sqlite_example/illinois_contributions.db")
con.row_factory = dict_factory
con.row_factory = sqlite3.Row
cur = con.cursor()

data_d = {}
Expand Down Expand Up @@ -89,7 +89,7 @@ def get_sample(cur, size):
blocker = deduper.blockingFunction(eta=0.001, epsilon=5)

deduper.writeSettings(settings_file)
print 'blocked in ', time.time() - t_block, 'seconds'
print 'blocked in', time.time() - t_block, 'seconds'

print 'deleting existing blocking map'
cur.execute("DROP TABLE IF EXISTS blocking_map")
Expand All @@ -101,89 +101,89 @@ def get_sample(cur, size):
cur.execute("CREATE INDEX itx_index ON blocking_map (key, donor_id)")

def createSelector(field, con) :

cur = con.cursor()
def selector(doc_id) :
sql = "SELECT %s FROM donors WHERE donor_id = %s" % (field, doc_id)
#print sql
cur.execute(sql)
field_value = cur.fetchone()
#print field_value

return field_value[1][field]
def selector(doc_ids) :

doc_ids = ', '.join([str(doc_id) for doc_id in doc_ids ])
sql = "SELECT donor_id, %s, address_1, address_2, last_name FROM donors WHERE donor_id IN (%s)" % (field, doc_ids)
#print sql
for row in cur.execute(sql) :
#print row
yield (row['donor_id'], row[str(field)])

return selector


print 'creating inverted index'
blocker.invertIndex(con.execute("SELECT * FROM donors"))
full_data = ((row['donor_id'], row) for row in con.execute("SELECT * FROM donors"))
blocker.invertIndex(full_data)

print 'creating canopies'
blocker.canopies = {}
counter = 1
for threshold, field in blocker.tfidf_thresholds :
print threshold.threshold, field
print str(counter) + "/" + str(len(blocker.tfidf_thresholds))
print (str(counter) + "/" + str(len(blocker.tfidf_thresholds))), threshold.threshold, field
selector = createSelector(field, con)
canopy = blocker.createCanopies(selector, field, threshold)
blocker.canopies[threshold.__name__ + field] = canopy
counter += 1

print 'writing blocking map'
def block_data() :
for donor_id, record in con.execute("SELECT * FROM donors") :
if donor_id % 10000 == 0 :
print donor_id
for key in blocker((donor_id, record)):
yield (str(key), donor_id)
# print 'writing blocking map'
# def block_data() :
# for donor_id, record in con.execute("SELECT * FROM donors LIMIT 1000") :
# if donor_id % 10000 == 0 :
# print donor_id
# for key in blocker((donor_id, record)):
# yield (str(key), donor_id)


con.executemany("INSERT OR IGNORE INTO blocking_map VALUES (?, ?)",
block_data())
# con.executemany("INSERT OR IGNORE INTO blocking_map VALUES (?, ?)",
# block_data())



con.commit()
# con.commit()




print 'writing largest blocks to file'
# print 'writing largest blocks to file'



with open('sqlite_example_block_sizes.txt', 'a') as f:
con.row_factory = None
f.write(time.asctime())
f.write('\n')
for row in con.execute("SELECT key, COUNT(donor_id) AS block_size "
"FROM blocking_map GROUP BY key "
"ORDER BY block_size DESC LIMIT 10") :
# with open('sqlite_example_block_sizes.txt', 'a') as f:
# con.row_factory = None
# f.write(time.asctime())
# f.write('\n')
# for row in con.execute("SELECT key, COUNT(donor_id) AS block_size "
# "FROM blocking_map GROUP BY key "
# "ORDER BY block_size DESC LIMIT 10") :

print row
f.write(str(row))
f.write('\n')
con.row_factory = dict_factory
# print row
# f.write(str(row))
# f.write('\n')
# con.row_factory = dict_factory


print 'reading blocked data'
con.row_factory = blocking_factory
cur = con.cursor()
cur.execute('select * from donors join '
'(select key, donor_id from blocking_map '
'join (select key, count(donor_id) num_candidates from blocking_map '
'group by key having num_candidates > 1) '
'as bucket using (key)) as candidates using (donor_id)')
blocked_data = defaultdict(list)
for k, v in cur :
blocked_data[k].append(v)
# print 'reading blocked data'
# con.row_factory = blocking_factory
# cur = con.cursor()
# cur.execute('select * from donors join '
# '(select key, donor_id from blocking_map '
# 'join (select key, count(donor_id) num_candidates from blocking_map '
# 'group by key having num_candidates > 1) '
# 'as bucket using (key)) as candidates using (donor_id)')
# blocked_data = defaultdict(list)
# for k, v in cur :
# blocked_data[k].append(v)

print 'clustering...'
clustered_dupes = deduper.duplicateClusters(blocked_data)
# print 'clustering...'
# clustered_dupes = deduper.duplicateClusters(blocked_data)

print '# duplicate sets'
print len(clustered_dupes)
# print '# duplicate sets'
# print len(clustered_dupes)

cur.close()
con.close()
print 'ran in ', time.time() - t0, 'seconds'
print 'ran in', time.time() - t0, 'seconds'

0 comments on commit 4de1a52

Please sign in to comment.