Skip to content

Commit

Permalink
major performance improvement in creating TFIDF canopies
Browse files Browse the repository at this point in the history
+ storing idf and occurrences in memory (inverted_index dict)
+ storing raw tokens for corpus in token_vector
+ removed select_function from createCanopies (reading this info from memory now)
+ ignoring high occurrence stop words in TFIDF canopy creation - resolves #59
+ creating canopies for 100,000 records in ~300 seconds
  • Loading branch information
derekeder committed Dec 17, 2012
1 parent 24471a0 commit f5c5145
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 70 deletions.
81 changes: 62 additions & 19 deletions dedupe/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tfidf
from random import sample, random, choice, shuffle
import types
import math

class Blocker:
def __init__(self, predicates, df_index):
Expand All @@ -16,8 +17,9 @@ def __init__(self, predicates, df_index):
self.tfidf_thresholds = set([])
self.mixed_predicates = []
self.tfidf_fields = set([])
self.inverted_index = defaultdict(lambda: defaultdict(set))
self.inverted_index = defaultdict(lambda: defaultdict(lambda: {'idf':0, 'occurrences' : {}}))
self.shim_tfidf_thresholds = []
self.token_vector = defaultdict(dict)

self.corpus_ids = set([])

Expand Down Expand Up @@ -54,11 +56,40 @@ def invertIndex(self, data_d) :
for record_id, record in data_d :
for _, field in self.tfidf_thresholds :
tokens = tfidf.getTokens(record[str(field)])
for token in set(tokens) :
self.inverted_index[field][token].add(record_id)
self.corpus_ids.add(record_id)

def createCanopies(self, select_function, field, threshold) :
tokens = dict((token, tokens.count(token)) for token in set(tokens))
for token, token_count in tokens.iteritems():
self.inverted_index[field][token]['idf'] += 1
self.inverted_index[field][token]['occurrences'].update({record_id: token_count})

self.corpus_ids.add(record_id) # candidate for removal
self.token_vector[field][record_id] = tokens

# ignore stop words in TFIDF canopy creation
num_docs = len(self.token_vector[field])

stop_word_threshold = num_docs * 0.1
for field in self.inverted_index:
for token in self.inverted_index[field]:
if (self.inverted_index[field][token]['idf'] > stop_word_threshold
or len(self.inverted_index[field][token]['occurrences']) < 2):
self.inverted_index[field][token]['occurrences'] = []

# calculating inverted document frequency for each token in each field

for field in self.inverted_index:
for token in self.inverted_index[field]:
self.inverted_index[field][token]['idf'] = math.log((num_docs + 0.5) / (float(self.inverted_index[field][token]['idf']) + 0.5))

for field in self.token_vector:
for record_id, tokens in self.token_vector[field].iteritems():
norm = 0.0
for token, count in tokens.iteritems():
token_idf = self.inverted_index[field][token]['idf']
norm += (count * token_idf) * (count * token_idf)
norm = math.sqrt(norm)
self.token_vector[field][record_id] = (tokens, norm)

def createCanopies(self, field, threshold) :
"""
A function that returns
a field value of a record with a particular doc_id, doc_id
Expand All @@ -69,36 +100,48 @@ def createCanopies(self, select_function, field, threshold) :
seen_set = set([])
corpus_ids = self.corpus_ids.copy()


token_vectors = self.token_vector[field]
while corpus_ids :
center_id = corpus_ids.pop()
blocked_data[center_id] = center_id
doc_id, center = list(select_function([center_id]))[0]
blocked_data[center_id] = [center_id]

doc_id = center_id
center_vector, center_norm = token_vectors[center_id]

seen_set.add(center_id)

if not center_norm :
continue
#print "center_id", center_id
# print doc_id, center
if not center :
continue
# if not center :
# continue

# initialize the potential block with center
candidate_set = set([])
tokens = tfidf.getTokens(center)
center_dict = tfidf.tfidfDict(center, self.df_index)

for token in tokens :
candidate_set.update(self.inverted_index[field][token])
for token in center_vector :
candidate_set.update(self.inverted_index[field][token]['occurrences'])

# print candidate_set
candidate_set = candidate_set - seen_set
for doc_id, candidate_field in select_function(candidate_set) :
for doc_id in candidate_set :
#print doc_id, candidate_field
candidate_dict = tfidf.tfidfDict(candidate_field, self.df_index)
candidate_vector, candidate_norm = token_vectors[doc_id]
if not candidate_norm :
continue

common_tokens = set(center_vector.keys()).intersection(candidate_vector.keys())

dot_product = 0
for token in common_tokens :
token_idf = self.inverted_index[field][token]['idf']
dot_product += (center_vector[token] * token_idf) * (candidate_vector[token] * token_idf)

similarity = tfidf.cosineSimilarity(candidate_dict, center_dict)
similarity = dot_product / (center_norm * candidate_norm)

if similarity > threshold.threshold :
blocked_data[doc_id] = center_id
blocked_data[center_id].append(doc_id)
seen_set.add(doc_id)
corpus_ids.remove(doc_id)

Expand Down
4 changes: 2 additions & 2 deletions dedupe/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def documentFrequency(corpus) :
term_num_docs[term] = 0
stop_words.append(term)

print 'stop words'
print stop_words
if stop_words:
print 'stop words:', stop_words

term_num_docs_default = collections.defaultdict(lambda: math.log((num_docs + 0.5)/0.5)) # term : num_docs_containing_term
term_num_docs_default.update(term_num_docs)
Expand Down
102 changes: 53 additions & 49 deletions examples/sqlite_example/sqlite_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,63 +119,67 @@ def selector(doc_ids) :
full_data = ((row['donor_id'], row) for row in con.execute("SELECT * FROM donors LIMIT 1000"))
blocker.invertIndex(full_data)


# print 'token vector', blocker.token_vector
# print 'inverted index', blocker.inverted_index

print 'creating canopies'
blocker.canopies = {}
counter = 1
for threshold, field in blocker.tfidf_thresholds :
print (str(counter) + "/" + str(len(blocker.tfidf_thresholds))), threshold.threshold, field
selector = createSelector(field, con)
canopy = blocker.createCanopies(selector, field, threshold)
# selector = createSelector(field, con)
canopy = blocker.createCanopies(field, threshold)
blocker.canopies[threshold.__name__ + field] = canopy
counter += 1

print 'writing blocking map'
def block_data() :
full_data = ((row['donor_id'], row) for row in con.execute("SELECT * FROM donors LIMIT 1000"))
for donor_id, record in full_data :
if donor_id % 10000 == 0 :
print donor_id
for key in blocker((donor_id, record)):
yield (str(key), donor_id)


con.executemany("INSERT OR IGNORE INTO blocking_map VALUES (?, ?)",
block_data())

con.commit()

print 'writing largest blocks to file'

with open('sqlite_example_block_sizes.txt', 'a') as f:
con.row_factory = None
f.write(time.asctime())
f.write('\n')
for row in con.execute("SELECT key, COUNT(donor_id) AS block_size "
"FROM blocking_map GROUP BY key "
"ORDER BY block_size DESC LIMIT 10") :

print row
f.write(str(row))
f.write('\n')
con.row_factory = dict_factory

print 'reading blocked data'
con.row_factory = blocking_factory
cur = con.cursor()
cur.execute('select * from donors join '
'(select key, donor_id from blocking_map '
'join (select key, count(donor_id) num_candidates from blocking_map '
'group by key having num_candidates > 1) '
'as bucket using (key)) as candidates using (donor_id)')
blocked_data = defaultdict(list)
for k, v in cur :
blocked_data[k].append(v)

print 'clustering...'
clustered_dupes = deduper.duplicateClusters(blocked_data)

print '# duplicate sets'
print len(clustered_dupes)
# print 'writing blocking map'
# def block_data() :
# full_data = ((row['donor_id'], row) for row in con.execute("SELECT * FROM donors LIMIT 1000"))
# for donor_id, record in full_data :
# if donor_id % 10000 == 0 :
# print donor_id
# for key in blocker((donor_id, record)):
# yield (str(key), donor_id)


# con.executemany("INSERT OR IGNORE INTO blocking_map VALUES (?, ?)",
# block_data())

# con.commit()

# print 'writing largest blocks to file'

# with open('sqlite_example_block_sizes.txt', 'a') as f:
# con.row_factory = None
# f.write(time.asctime())
# f.write('\n')
# for row in con.execute("SELECT key, COUNT(donor_id) AS block_size "
# "FROM blocking_map GROUP BY key "
# "ORDER BY block_size DESC LIMIT 10") :

# print row
# f.write(str(row))
# f.write('\n')
# con.row_factory = dict_factory

# print 'reading blocked data'
# con.row_factory = blocking_factory
# cur = con.cursor()
# cur.execute('select * from donors join '
# '(select key, donor_id from blocking_map '
# 'join (select key, count(donor_id) num_candidates from blocking_map '
# 'group by key having num_candidates > 1) '
# 'as bucket using (key)) as candidates using (donor_id)')
# blocked_data = defaultdict(list)
# for k, v in cur :
# blocked_data[k].append(v)

# print 'clustering...'
# clustered_dupes = deduper.duplicateClusters(blocked_data)

# print '# duplicate sets'
# print len(clustered_dupes)

cur.close()
con.close()
Expand Down

0 comments on commit f5c5145

Please sign in to comment.