Skip to content

Commit

Permalink
record link sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
fgregg committed Mar 25, 2022
1 parent f6d9b23 commit 3770697
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 27 deletions.
79 changes: 58 additions & 21 deletions dedupe/labeler.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def __init__(self, data_model,
super().__init__(data_model)

N_SAMPLED_RECORDS = 5000
N_SAMPLED_RECORD_PAIRS = 5000
N_SAMPLED_RECORD_PAIRS = 10000

index_data = Sample(data, 50000)
sampled_records = Sample(index_data, N_SAMPLED_RECORDS)
Expand All @@ -260,8 +260,8 @@ def __init__(self, data_model,
sampled_records,
index_data)

self.candidates = self._candidates(sampled_records,
N_SAMPLED_RECORD_PAIRS)
self.candidates = self._sample(sampled_records,
N_SAMPLED_RECORD_PAIRS)
examples_to_index = self.candidates.copy()

if index_include:
Expand All @@ -282,7 +282,7 @@ def _index_predicates(self, candidates):
for pred in blocker.index_predicates:
pred.freeze(records)

def _candidates(self, data, sample_size):
def _sample(self, data, sample_size):

weights = {}
for predicate, covered in self.block_learner.comparison_cover.items():
Expand All @@ -299,11 +299,14 @@ def _candidates(self, data, sample_size):

# consider using a reservoir sampling strategy, which would
# be more memory efficient and probably about as fast
normalized_weights = numpy.fromiter(weights.values(), dtype=float)/sum(weights.values())
sample_indices = numpy.random.choice(len(weights),
size=sample_size,
replace=False,
p=normalized_weights)
normalized_weights = (numpy.fromiter(weights.values(),
dtype=float)
/ sum(weights.values()))
rng = numpy.random.default_rng()
sample_indices = rng.choice(len(weights),
size=sample_size,
replace=False,
p=normalized_weights)
keys = list(weights.keys())
return [(data[keys[i][0]], data[keys[i][1]])
for i in sample_indices]
Expand All @@ -313,16 +316,18 @@ class RecordLinkBlockLearner(BlockLearner):

def __init__(self,
data_model,
candidates,
data_1,
data_2,
index_include):

super().__init__(data_model, candidates)
super().__init__(data_model)

N_SAMPLED_RECORDS = 1000
N_SAMPLED_RECORD_PAIRS = 5000

sampled_records_1 = Sample(data_1, 600)
sampled_records_1 = Sample(data_1, N_SAMPLED_RECORDS)
index_data = Sample(data_2, 50000)
sampled_records_2 = Sample(index_data, 600)
sampled_records_2 = Sample(index_data, N_SAMPLED_RECORDS)

preds = self.data_model.predicates(canopies=False)

Expand All @@ -331,7 +336,11 @@ def __init__(self,
sampled_records_2,
index_data)

examples_to_index = candidates.copy()
self.candidates = self._sample(sampled_records_1,
sampled_records_2,
N_SAMPLED_RECORD_PAIRS)
examples_to_index = self.candidates.copy()

if index_include:
examples_to_index += index_include

Expand All @@ -352,6 +361,37 @@ def _index_predicates(self, candidates):
for pred in blocker.index_predicates:
pred.freeze(A, B)

def _sample(self, data_1, data_2, sample_size):

weights = {}
for predicate, covered in self.block_learner.comparison_cover.items():
# each predicate gets to vote for every record pair it covers. the
# strength of that vote is in inverse proportion to the number of
# records the predicate covers.
#
# if a predicate only covers a few record pairs, the value of
# the vote it puts on those few pairs will be worth more than
# a predicate that covers almost all the record pairs
if not len(covered):
print(predicate)
weight = 1 / len(covered)
for pair in covered:
weights[pair] = weights.get(pair, 0) + weight

# consider using a reservoir sampling strategy, which would
# be more memory efficient and probably about as fast
normalized_weights = (numpy.fromiter(weights.values(),
dtype=float)
/ sum(weights.values()))
rng = numpy.random.default_rng()
sample_indices = rng.choice(len(weights),
size=sample_size,
replace=False,
p=normalized_weights)
keys = list(weights.keys())
return [(data_1[keys[i][0]], data_2[keys[i][1]])
for i in sample_indices]


class DisagreementLearner(ActiveLearner):

Expand Down Expand Up @@ -487,22 +527,19 @@ def __init__(self,
offset = len(data_1)
data_2 = core.index(data_2, offset)

self.candidates = self._sample(data_1,
data_2,
blocked_proportion,
sample_size)

random_pair = random.choice(self.candidates)
random_pair = (random.choice(list(data_1.values())),
random.choice(list(data_2.values())))
exact_match = (random_pair[0], random_pair[0])

index_include = index_include.copy()
index_include.append(exact_match)

self.blocker = RecordLinkBlockLearner(data_model,
self.candidates,
data_1,
data_2,
index_include)
self.candidates = self.blocker.candidates

self.classifier = RLRLearner(self.data_model)
self.classifier.candidates = self.candidates

Expand Down
12 changes: 6 additions & 6 deletions dedupe/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from typing import (Dict, Sequence, Iterable, Tuple, List,
Union, FrozenSet, Optional)

from . import blocking, core
from . import blocking
from .predicates import Predicate

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -199,8 +199,7 @@ def __init__(self, predicates, sampled_records_1, sampled_records_2, data_2):

def coveredPairs(self, blocker, records_1, records_2):
cover = {}

pair_enumerator = core.FullEnumerator(len(records_2))
pair_cover = {}

for predicate in blocker.predicates:
cover[predicate] = collections.defaultdict(lambda: (set(), set()))
Expand All @@ -217,12 +216,13 @@ def coveredPairs(self, blocker, records_1, records_2):

for predicate, blocks in cover.items():
pairs = frozenset(
pair_enumerator[pair]
pair
for A, B in blocks.values()
for pair in itertools.product(A, B))
cover[predicate] = pairs
if pairs:
pair_cover[predicate] = pairs

return cover
return pair_cover


class BranchBound(object):
Expand Down

0 comments on commit 3770697

Please sign in to comment.