From f1f25c88dee53fee6072f835790eb34825d800d2 Mon Sep 17 00:00:00 2001 From: Derek Eder Date: Mon, 17 Dec 2012 12:28:07 -0600 Subject: [PATCH] major performance improvement in creating TFIDF canopies + storing idf and occurrences in memory (inverted_index dict) + storing raw tokens for corpus in token_vector + removed select_function from createCanopies (reading this info from memory now) + ignoring high occurrence stop words in TFIDF canopy creation - resolves #59 + creating canopies for 100,000 records in ~300 seconds --- dedupe/blocking.py | 81 +++++++++++++---- dedupe/tfidf.py | 4 +- examples/sqlite_example/sqlite_example.py | 102 +++++++++++----------- 3 files changed, 117 insertions(+), 70 deletions(-) diff --git a/dedupe/blocking.py b/dedupe/blocking.py index a71781c23..9153c6418 100644 --- a/dedupe/blocking.py +++ b/dedupe/blocking.py @@ -7,6 +7,7 @@ import tfidf from random import sample, random, choice, shuffle import types +import math class Blocker: def __init__(self, predicates, df_index): @@ -16,8 +17,9 @@ def __init__(self, predicates, df_index): self.tfidf_thresholds = set([]) self.mixed_predicates = [] self.tfidf_fields = set([]) - self.inverted_index = defaultdict(lambda: defaultdict(set)) + self.inverted_index = defaultdict(lambda: defaultdict(lambda: {'idf':0, 'occurrences' : {}})) self.shim_tfidf_thresholds = [] + self.token_vector = defaultdict(dict) self.corpus_ids = set([]) @@ -54,11 +56,40 @@ def invertIndex(self, data_d) : for record_id, record in data_d : for _, field in self.tfidf_thresholds : tokens = tfidf.getTokens(record[str(field)]) - for token in set(tokens) : - self.inverted_index[field][token].add(record_id) - self.corpus_ids.add(record_id) - - def createCanopies(self, select_function, field, threshold) : + tokens = dict((token, tokens.count(token)) for token in set(tokens)) + for token, token_count in tokens.iteritems(): + self.inverted_index[field][token]['idf'] += 1 + self.inverted_index[field][token]['occurrences'].update({record_id: token_count}) + + self.corpus_ids.add(record_id) # candidate for removal + self.token_vector[field][record_id] = tokens + + # ignore stop words in TFIDF canopy creation + num_docs = len(self.token_vector[field]) + + stop_word_threshold = num_docs * 0.1 + for field in self.inverted_index: + for token in self.inverted_index[field]: + if (self.inverted_index[field][token]['idf'] > stop_word_threshold + or len(self.inverted_index[field][token]['occurrences']) < 2): + self.inverted_index[field][token]['occurrences'] = [] + + # calculating inverted document frequency for each token in each field + + for field in self.inverted_index: + for token in self.inverted_index[field]: + self.inverted_index[field][token]['idf'] = math.log((num_docs + 0.5) / (float(self.inverted_index[field][token]['idf']) + 0.5)) + + for field in self.token_vector: + for record_id, tokens in self.token_vector[field].iteritems(): + norm = 0.0 + for token, count in tokens.iteritems(): + token_idf = self.inverted_index[field][token]['idf'] + norm += (count * token_idf) * (count * token_idf) + norm = math.sqrt(norm) + self.token_vector[field][record_id] = (tokens, norm) + + def createCanopies(self, field, threshold) : """ A function that returns a field value of a record with a particular doc_id, doc_id @@ -69,36 +100,48 @@ def createCanopies(self, select_function, field, threshold) : seen_set = set([]) corpus_ids = self.corpus_ids.copy() - + token_vectors = self.token_vector[field] while corpus_ids : center_id = corpus_ids.pop() - blocked_data[center_id] = center_id - doc_id, center = list(select_function([center_id]))[0] + blocked_data[center_id] = [center_id] + + doc_id = center_id + center_vector, center_norm = token_vectors[center_id] seen_set.add(center_id) + + if not center_norm : + continue #print "center_id", center_id # print doc_id, center - if not center : - continue + # if not center : + # continue # initialize the potential block with center candidate_set = set([]) - tokens = tfidf.getTokens(center) - center_dict = tfidf.tfidfDict(center, self.df_index) - for token in tokens : - candidate_set.update(self.inverted_index[field][token]) + for token in center_vector : + candidate_set.update(self.inverted_index[field][token]['occurrences']) # print candidate_set candidate_set = candidate_set - seen_set - for doc_id, candidate_field in select_function(candidate_set) : + for doc_id in candidate_set : #print doc_id, candidate_field - candidate_dict = tfidf.tfidfDict(candidate_field, self.df_index) + candidate_vector, candidate_norm = token_vectors[doc_id] + if not candidate_norm : + continue + + common_tokens = set(center_vector.keys()).intersection(candidate_vector.keys()) + + dot_product = 0 + for token in common_tokens : + token_idf = self.inverted_index[field][token]['idf'] + dot_product += (center_vector[token] * token_idf) * (candidate_vector[token] * token_idf) - similarity = tfidf.cosineSimilarity(candidate_dict, center_dict) + similarity = dot_product / (center_norm * candidate_norm) if similarity > threshold.threshold : - blocked_data[doc_id] = center_id + blocked_data[center_id].append(doc_id) seen_set.add(doc_id) corpus_ids.remove(doc_id) diff --git a/dedupe/tfidf.py b/dedupe/tfidf.py index d4e510f26..e8b8467ac 100644 --- a/dedupe/tfidf.py +++ b/dedupe/tfidf.py @@ -51,8 +51,8 @@ def documentFrequency(corpus) : term_num_docs[term] = 0 stop_words.append(term) - print 'stop words' - print stop_words + if stop_words: + print 'stop words:', stop_words term_num_docs_default = collections.defaultdict(lambda: math.log((num_docs + 0.5)/0.5)) # term : num_docs_containing_term term_num_docs_default.update(term_num_docs) diff --git a/examples/sqlite_example/sqlite_example.py b/examples/sqlite_example/sqlite_example.py index 58d2a1493..636416fbe 100644 --- a/examples/sqlite_example/sqlite_example.py +++ b/examples/sqlite_example/sqlite_example.py @@ -119,63 +119,67 @@ def selector(doc_ids) : full_data = ((row['donor_id'], row) for row in con.execute("SELECT * FROM donors LIMIT 1000")) blocker.invertIndex(full_data) + +# print 'token vector', blocker.token_vector +# print 'inverted index', blocker.inverted_index + print 'creating canopies' blocker.canopies = {} counter = 1 for threshold, field in blocker.tfidf_thresholds : print (str(counter) + "/" + str(len(blocker.tfidf_thresholds))), threshold.threshold, field - selector = createSelector(field, con) - canopy = blocker.createCanopies(selector, field, threshold) + # selector = createSelector(field, con) + canopy = blocker.createCanopies(field, threshold) blocker.canopies[threshold.__name__ + field] = canopy counter += 1 -print 'writing blocking map' -def block_data() : - full_data = ((row['donor_id'], row) for row in con.execute("SELECT * FROM donors LIMIT 1000")) - for donor_id, record in full_data : - if donor_id % 10000 == 0 : - print donor_id - for key in blocker((donor_id, record)): - yield (str(key), donor_id) - - -con.executemany("INSERT OR IGNORE INTO blocking_map VALUES (?, ?)", - block_data()) - -con.commit() - -print 'writing largest blocks to file' - -with open('sqlite_example_block_sizes.txt', 'a') as f: - con.row_factory = None - f.write(time.asctime()) - f.write('\n') - for row in con.execute("SELECT key, COUNT(donor_id) AS block_size " - "FROM blocking_map GROUP BY key " - "ORDER BY block_size DESC LIMIT 10") : - - print row - f.write(str(row)) - f.write('\n') - con.row_factory = dict_factory - -print 'reading blocked data' -con.row_factory = blocking_factory -cur = con.cursor() -cur.execute('select * from donors join ' - '(select key, donor_id from blocking_map ' - 'join (select key, count(donor_id) num_candidates from blocking_map ' - 'group by key having num_candidates > 1) ' - 'as bucket using (key)) as candidates using (donor_id)') -blocked_data = defaultdict(list) -for k, v in cur : - blocked_data[k].append(v) - -print 'clustering...' -clustered_dupes = deduper.duplicateClusters(blocked_data) - -print '# duplicate sets' -print len(clustered_dupes) +# print 'writing blocking map' +# def block_data() : +# full_data = ((row['donor_id'], row) for row in con.execute("SELECT * FROM donors LIMIT 1000")) +# for donor_id, record in full_data : +# if donor_id % 10000 == 0 : +# print donor_id +# for key in blocker((donor_id, record)): +# yield (str(key), donor_id) + + +# con.executemany("INSERT OR IGNORE INTO blocking_map VALUES (?, ?)", +# block_data()) + +# con.commit() + +# print 'writing largest blocks to file' + +# with open('sqlite_example_block_sizes.txt', 'a') as f: +# con.row_factory = None +# f.write(time.asctime()) +# f.write('\n') +# for row in con.execute("SELECT key, COUNT(donor_id) AS block_size " +# "FROM blocking_map GROUP BY key " +# "ORDER BY block_size DESC LIMIT 10") : + +# print row +# f.write(str(row)) +# f.write('\n') +# con.row_factory = dict_factory + +# print 'reading blocked data' +# con.row_factory = blocking_factory +# cur = con.cursor() +# cur.execute('select * from donors join ' +# '(select key, donor_id from blocking_map ' +# 'join (select key, count(donor_id) num_candidates from blocking_map ' +# 'group by key having num_candidates > 1) ' +# 'as bucket using (key)) as candidates using (donor_id)') +# blocked_data = defaultdict(list) +# for k, v in cur : +# blocked_data[k].append(v) + +# print 'clustering...' +# clustered_dupes = deduper.duplicateClusters(blocked_data) + +# print '# duplicate sets' +# print len(clustered_dupes) cur.close() con.close()