Skip to content

Commit

Permalink
Merge 8ee57d9 into be02c5d
Browse files Browse the repository at this point in the history
  • Loading branch information
fgregg committed Jan 20, 2022
2 parents be02c5d + 8ee57d9 commit e50b0bb
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 98 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonpackage.yml
Expand Up @@ -5,6 +5,7 @@ on: [push, pull_request]
jobs:
test:
timeout-minutes: 10
continue-on-error: true
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand Down
6 changes: 3 additions & 3 deletions dedupe/api.py
Expand Up @@ -94,7 +94,7 @@ class IntegralMatching(Matching):
"""

def score(self,
pairs: RecordPairs) -> numpy.memmap:
pairs: RecordPairs) -> Union[numpy.memmap, numpy.ndarray]:
"""
Scores pairs of records. Returns pairs of tuples of records id and
associated probabilities that the pair of records are match
Expand Down Expand Up @@ -179,7 +179,7 @@ def partition(self,
clusters = list(clusters)

try:
mmap_file = pair_scores.filename
mmap_file = pair_scores.filename # type: ignore
except AttributeError:
pass
else:
Expand Down Expand Up @@ -511,7 +511,7 @@ def join(self,
links = list(links)

try:
mmap_file = pair_scores.filename
mmap_file = pair_scores.filename # type: ignore
except AttributeError:
pass
else:
Expand Down
4 changes: 3 additions & 1 deletion dedupe/backport.py
Expand Up @@ -8,5 +8,7 @@
Process = ctx.Process # type: ignore
Pool = ctx.Pool
SimpleQueue = ctx.SimpleQueue
Lock = ctx.Lock
RLock = ctx.RLock
else:
from multiprocessing import Process, Pool, Queue, SimpleQueue # type: ignore # noqa
from multiprocessing import Process, Pool, Queue, SimpleQueue, Lock, RLock # type: ignore # noqa
155 changes: 61 additions & 94 deletions dedupe/core.py
Expand Up @@ -9,6 +9,7 @@
import functools
import multiprocessing
import multiprocessing.dummy
import queue
from typing import (Iterator,
Tuple,
Mapping,
Expand Down Expand Up @@ -47,7 +48,6 @@ class BlockingError(Exception):


_Queue = Union[multiprocessing.dummy.Queue, multiprocessing.Queue]
_SimpleQueue = Union[multiprocessing.dummy.Queue, multiprocessing.SimpleQueue]
IndicesIterator = Iterator[Tuple[int, int]]


Expand Down Expand Up @@ -120,30 +120,33 @@ def __init__(self,
data_model,
classifier,
records_queue: _Queue,
score_queue: _SimpleQueue):
exception_queue: _Queue,
score_file_path: str,
dtype: numpy.dtype,
offset):
self.data_model = data_model
self.classifier = classifier
self.records_queue = records_queue
self.score_queue = score_queue
self.exception_queue = exception_queue
self.score_file_path = score_file_path
self.dtype = dtype
self.offset = offset

def __call__(self) -> None:

while True:
empty = False
while not empty:
record_pairs: Optional[RecordPairs] = self.records_queue.get()
if record_pairs is None:
break

try:
filtered_pairs: Optional[Tuple] = self.fieldDistance(record_pairs)
if filtered_pairs is not None:
self.score_queue.put(filtered_pairs)
empty = self.fieldDistance(record_pairs)
except Exception as e:
self.score_queue.put(e)
self.exception_queue.put(e)
raise

self.score_queue.put(None)

def fieldDistance(self, record_pairs: RecordPairs) -> Optional[Tuple]:
def fieldDistance(self, record_pairs: RecordPairs) -> bool:

record_ids, records = zip(*(zip(*record_pair) for record_pair in record_pairs))

Expand All @@ -153,79 +156,38 @@ def fieldDistance(self, record_pairs: RecordPairs) -> Optional[Tuple]:
scores = self.classifier.predict_proba(distances)[:, -1]

if scores.any():
id_type = sniff_id_type(record_ids)
ids = numpy.array(record_ids, dtype=id_type)

dtype = numpy.dtype([('pairs', id_type, 2),
('score', 'f4')])

temp_file, file_path = tempfile.mkstemp()
os.close(temp_file)

scored_pairs: numpy.memmap
scored_pairs = numpy.memmap(file_path,
shape=len(scores),
dtype=dtype)

scored_pairs['pairs'] = ids
scored_pairs['score'] = scores
with self.offset.get_lock():

return file_path, dtype

return None


def mergeScores(score_queue: _SimpleQueue,
result_queue: _SimpleQueue,
stop_signals: int):
scored_pairs_file, file_path = tempfile.mkstemp()
os.close(scored_pairs_file)
fp: numpy.memmap
fp = numpy.memmap(self.score_file_path,
mode='w+',
dtype=self.dtype,
offset=self.offset.value,
shape=(len(record_ids), ))
fp['pairs'] = record_ids
fp['score'] = scores

seen_signals = 0
end = 0
fp.flush()

while seen_signals < stop_signals:
self.offset.value += len(record_ids) * self.dtype.itemsize

score_chunk = score_queue.get()
return False

if isinstance(score_chunk, Exception):
result_queue.put(score_chunk)
raise
elif score_chunk is None:
seen_signals += 1
else:
score_file, dtype = score_chunk
score_chunk = numpy.memmap(score_file, mode='r', dtype=dtype)

chunk_size = len(score_chunk)

fp: numpy.memmap
fp = numpy.memmap(file_path, dtype=dtype,
offset=(end * dtype.itemsize),
shape=(chunk_size, ))

fp[:chunk_size] = score_chunk

end += chunk_size

del score_chunk
os.remove(score_file)

if end:
result_queue.put((file_path, dtype, end))
else:
result_queue.put(None)
return True


def scoreDuplicates(record_pairs: RecordPairs,
data_model,
classifier,
num_cores: int = 1):
num_cores: int = 1) -> Union[numpy.memmap, numpy.ndarray]:
if num_cores < 2:
from multiprocessing.dummy import Process, Queue
SimpleQueue = Queue
else:
from .backport import Process, SimpleQueue, Queue # type: ignore
from .backport import Process, Queue # type: ignore

from .backport import RLock

first, record_pairs = peek(record_pairs)
if first is None:
Expand All @@ -234,50 +196,55 @@ def scoreDuplicates(record_pairs: RecordPairs,
"the data you trained on?")

record_pairs_queue: _Queue = Queue(2)
score_queue: _SimpleQueue = SimpleQueue()
result_queue: _SimpleQueue = SimpleQueue()
exception_queue: _Queue = Queue(1)
scored_pairs_file, score_file_path = tempfile.mkstemp()
os.close(scored_pairs_file)

# explicitly defining the lock from the "spawn context" seems to
# be necessary for python 3.7 on mac os.
offset = multiprocessing.Value('Q', lock=RLock())

with offset.get_lock():
offset.value = 0 # type: ignore

id_type = sniff_id_type(first)
dtype = numpy.dtype([('pairs', id_type, 2),
('score', 'f4')])

n_map_processes = max(num_cores, 1)
score_records = ScoreDupes(data_model,
classifier,
record_pairs_queue,
score_queue)
exception_queue,
score_file_path,
dtype,
offset)
map_processes = [Process(target=score_records)
for _ in range(n_map_processes)]

for process in map_processes:
process.start()

reduce_process = Process(target=mergeScores,
args=(score_queue,
result_queue,
n_map_processes))
reduce_process.start()

fillQueue(record_pairs_queue, record_pairs, n_map_processes)

result = result_queue.get()
if isinstance(result, Exception):
raise ChildProcessError
for process in map_processes:
process.join()

try:
exc = exception_queue.get_nowait()
except queue.Empty:
pass
else:
raise ChildProcessError from exc

scored_pairs: Union[numpy.memmap, numpy.ndarray]

if result:
scored_pairs_file, dtype, size = result

scored_pairs = numpy.memmap(scored_pairs_file,
dtype=dtype,
shape=(size,))
if offset.value: # type: ignore
scored_pairs = numpy.memmap(score_file_path,
dtype=dtype)
else:
dtype = numpy.dtype([('pairs', object, 2),
('score', 'f4', 1)])
scored_pairs = numpy.array([], dtype=dtype)

reduce_process.join()

for process in map_processes:
process.join()

return scored_pairs


Expand Down

0 comments on commit e50b0bb

Please sign in to comment.