In [1]:
import pandas as pd
import numpy as np
import re
from typing import List, Tuple, Dict, Any, Set, Callable, Hashable

In [2]:
column_names = [
  # 'rec_id',
  'given_name',
  'surname',
  'street_number',
  'address_1',
  'address_2',
  'suburb',
  # 'postcode',
  'state',
  'date_of_birth',
  # 'age',
  'phone_number',
  # 'soc_sec_id',
  # 'blocking_number'
]

In [3]:

def clean_address(address: str) -> str:
    # Список слов и сокращений для удаления
    words_to_remove = [
        "улица", "ул\\. ?", "проспект", "пр-т\\. ?", "пр\\. ?", "бульвар", "б-р\\. ?", "переулок", "пер\\. ?", "набережная", "наб\\. ?",
        "шоссе", "площадь", "пл\\. ?", "дом", "д\\. ?", "квартира", "кв\\. ?", "корпус", "корп\\. ?", "строение", "стр\\. ?", "область",
        "обл\\. ?", "город", "г\\. ?", "поселок", "пос\\. ?", "деревня", "дер\\. ?",
        "street", "st\\. ?", "avenue", "ave\\. ?", "boulevard", "blvd\\. ?", "alley", "al\\. ?", "drive", "dr\\. ?",
        "square", "sq\\. ?", "house", "h\\. ?", "apartment", "apt\\. ?", "building", "bldg\\. ?", "county", "co\\. ?",
        "city", "ct\\. ?", "village", "vil\\. ?", "township", "twp\\. ?", "road", "rd\\. ?"
    ]

    # Создаем регулярное выражение из списка слов и сокращений
    pattern = r'\b(?:{})\b'.format('|'.join(words_to_remove))

    # Заменяем найденные слова и сокращения на пустую строку
    cleaned_address = re.sub(pattern, '', address, flags=re.IGNORECASE)

    # Удаляем лишние пробелы и возвращаем очищенную строку
    return re.sub(r'\s+', ' ', cleaned_address).strip()

In [4]:
def get_key(x: str) -> str:
    # Формируем ключ из значений полей
    return  '-'.join([x.split('-')[0], x.split('-')[1]])

In [5]:
data = pd.read_csv('test.csv', dtype=str)
data = data.applymap(lambda x:  x.strip() if isinstance(x, str) else x )
# data['address_1'] = data['address_1'].apply(lambda x:  clean_address(x) if isinstance(x, str) else x )
# data['address_2'] = data['address_2'].apply(lambda x:  clean_address(x) if isinstance(x, str) else x )
# data = data.applymap(lambda x:  clean_address(x) if isinstance(x, str) else x )
data = data.replace({np.nan:None})
data['rec_common_id'] = data.apply(lambda x: get_key(x['rec_id']), axis=1)
data_list = data.to_dict('records')

In [6]:
data_by_ids = {}
expected_res = {}
for record in data_list:
    if record['rec_common_id'] not in data_by_ids:
        data_by_ids[record['rec_common_id']] = []
        expected_res[record['rec_common_id']] = set()
    data_by_ids[record['rec_common_id']].append(record)
    expected_res[record['rec_common_id']].add(record['rec_id'])

In [7]:
from distances.levenshtein import levenstein_similarity
from distances.jaro import jaro_winkler_similarity
from distances.damerau_levenstein import damerau_levenshtein_similarity
from distances.jaccard import jaccard_similarity_str

In [8]:
def get_sim_mean(a: Dict[Hashable, Any], b: Dict[Hashable, Any], sim: Callable[[str, str], float]) -> float:
  sm = 0
  cnt = 0
  for k in column_names:
    if a[k] is not None and b[k] is not None and isinstance(a[k], str) and a[k] != '' and isinstance(b[k], str) and b[k] != '':
      sm += sim(a[k], b[k])
      cnt += 1
  if cnt == 0:
    return 0
  return sm / cnt
def get_lev_sim(a: Dict[Hashable, Any], b: Dict[Hashable, Any]) -> float:
  return get_sim_mean(a, b, levenstein_similarity)
def get_dam_lev_sim(a: Dict[Hashable, Any], b: Dict[Hashable, Any]) -> float:
  return get_sim_mean(a, b, damerau_levenshtein_similarity)
def get_jaro_sim(a: Dict[Hashable, Any], b: Dict[Hashable, Any]) -> float:
  return get_sim_mean(a, b, jaro_winkler_similarity)
def get_jaccard_sim(a: Dict[Hashable, Any], b: Dict[Hashable, Any]) -> float:
  return get_sim_mean(a, b, jaccard_similarity_str)

In [22]:
test_data = data_by_ids['rec-148']
main_rec = test_data[-1]
test_data = test_data[:-1]
test_data.append(data_by_ids['rec-100'][0])
for td in test_data:
  sim = get_lev_sim(main_rec, td)
  print(f'main_rec_id: {main_rec["rec_id"]}, td_rec_id: {td["rec_id"]}, similarity: {sim}')

main_rec_id: rec-148-dup-2, td_rec_id: rec-148-dup-0, similarity: 0.6753246753246753
main_rec_id: rec-148-dup-2, td_rec_id: rec-148-org, similarity: 0.6854256854256854
main_rec_id: rec-148-dup-2, td_rec_id: rec-148-dup-1, similarity: 0.613997113997114
main_rec_id: rec-148-dup-2, td_rec_id: rec-148-dup-3, similarity: 0.608044733044733
main_rec_id: rec-148-dup-2, td_rec_id: rec-100-dup-3, similarity: 0.188973063973064


In [10]:
# for k in column_names:
#   if main_rec[k] is not None and isinstance(main_rec[k], str):
#     print(f'{k}: {main_rec[k]}')
# for k in column_names:
#   if test_data[2][k] is not None and isinstance(test_data[2][k], str):
#     print(f'{k}: {test_data[2][k]}')

In [11]:
def test_distances(data: List[Dict[Hashable, Any]], expected_res: Dict[str, Set[str]], similarity: Callable[[Dict[Hashable, Any], Dict[Hashable, Any]], float],threshold: float = 0.85) -> Tuple[int, int, int]:
  tp = 0
  fp = 0
  fn = 0
  res = []
  for i in range(len(data)):
    for j in range(i + 1, len(data)):
      sim = similarity(data[i], data[j])
      if sim > threshold:
        res.append((data[i]['rec_id'], data[j]['rec_id'], sim))
  res_by_ids = {}
  for (a_id, b_id, sim) in res:
    a_key = get_key(a_id)
    b_key = get_key(b_id)
    if a_key != b_key:
      print(f'ERROR: {a_id}, {b_id}, {sim}')
      fp += 1
      continue
    if a_key not in res_by_ids:
      res_by_ids[a_key] = set()
    res_by_ids[a_key].add(a_id)
    res_by_ids[a_key].add(b_id)
  for key in expected_res:
    if key not in res_by_ids:
      if len(expected_res[key]) > 1:
        # print(f'ERROR: not found {key}')
        fn += len(expected_res[key])
      continue
    # if len(expected_res[key]) != len(res_by_ids[key]):
    #   print(f'ERROR: not full {key}')
    fn += len(expected_res[key]) - len(res_by_ids[key])
    tp += len(res_by_ids[key])
  return tp, fp, fn

In [12]:
(tp, fp, fn) = test_distances(data_list, expected_res, get_lev_sim)
print('LEVENSTEIN')
print(f'tp: {tp}, fp: {fp}, fn: {fn}')
precision = tp / (tp + fp)
recall = tp / (tp + fn)
print(f'precision: {precision}')
print(f'recall: {recall}')
print(f'f1: {2 * precision * recall / (precision + recall)}')

LEVENSTEIN
tp: 429, fp: 0, fn: 122
precision: 1.0
recall: 0.778584392014519
f1: 0.8755102040816326


In [13]:
(tp, fp, fn) = test_distances(data_list, expected_res, get_jaro_sim)
print('JARO')
print(f'tp: {tp}, fp: {fp}, fn: {fn}')
precision = tp / (tp + fp)
recall = tp / (tp + fn)
print(f'precision: {precision}')
print(f'recall: {recall}')
print(f'f1: {2 * precision * recall / (precision + recall)}')

JARO
tp: 469, fp: 0, fn: 82
precision: 1.0
recall: 0.8511796733212341
f1: 0.919607843137255


In [14]:
(tp, fp, fn) = test_distances(data_list, expected_res, get_dam_lev_sim)
print('DAMERAU LEVENSTEIN')
print(f'tp: {tp}, fp: {fp}, fn: {fn}')
precision = tp / (tp + fp)
recall = tp / (tp + fn)
print(f'precision: {precision}')
print(f'recall: {recall}')
print(f'f1: {2 * precision * recall / (precision + recall)}')

DAMERAU LEVENSTEIN
tp: 433, fp: 0, fn: 118
precision: 1.0
recall: 0.7858439201451906
f1: 0.8800813008130082


In [15]:
(tp, fp, fn) = test_distances(data_list, expected_res, get_jaccard_sim)
print('JACCARD')
print(f'tp: {tp}, fp: {fp}, fn: {fn}')
precision = tp / (tp + fp)
recall = tp / (tp + fn)
print(f'precision: {precision}')
print(f'recall: {recall}')
print(f'f1: {2 * precision * recall / (precision + recall)}')

JACCARD
tp: 435, fp: 0, fn: 116
precision: 1.0
recall: 0.7894736842105263
f1: 0.8823529411764706


In [16]:
from dcs import dcs_plus_plus
from dm_soundex import encode

kroiakaeschpovunmolak -> 5915477668
kroiagaeschbofumnalak -> 5915477668
kroiakaeschpovunmolak -> 5915477668
kroiagaeschbofumnalak -> 5915477668


In [41]:
def dcs_key(r: Dict[Hashable, Any]) -> str:
  name = r['given_name']
  sur = r['surname']
  if name is None:
    name = ''
  # elif name != '':
  #   name = encode(name, max_length=10, zero_pad=True)
  if sur is None:
    sur = ''
  # elif sur != '':
  #   sur = encode(sur, max_length=10, zero_pad=True)
  s = f'{name}{sur}'
  if s == '':
    return '0'*10
  # print(encode(r['surname'], max_length=10, zero_pad=True))
  return s
def is_duplicate(r1: Dict[Hashable, Any], r2: Dict[Hashable, Any]) -> bool:
  return get_jaro_sim(r1, r2) > 0.85

In [42]:
res = dcs_plus_plus(data_list, dcs_key, is_duplicate=is_duplicate, w=20)

['c5e99a60-c3a8-48e0-a14b-94992d2aab14', '2c5e98be-5122-4e58-9e46-86c48ef302b3', 'b2c91609-4fa8-458e-9cb3-944063fa2e50', 'a2d32b94-24cb-4e20-8717-a6a7a62dcd9c', '6f0ddfec-ac6f-4ae0-9c94-cbd08ed0ae53', '72aae635-6f85-4220-8ad9-6a527f6ddec5', '412605f3-680a-41a3-8234-8d4cba0747e8', '738fb943-7f2b-4b64-809d-85a6f42a4b14', '8a35c9eb-f29d-4443-97c6-b6016cf91eb2', '39409a63-e99e-49bf-9b86-43c002c30557']


In [43]:
tp = 0
fp = 0
fn = 0
res_by_ids = {}
for (a, b) in res:
  a_id = a['rec_id']
  b_id = b['rec_id']
  if a_id == b_id:
    print(f'ERROR: {a_id}, {b_id}')
    continue
  a_key = get_key(a_id)
  b_key = get_key(b_id)
  if a_key != b_key:
    # print(f'ERROR: {a_id}, {b_id}, {sim}')
    fp += 1
    continue
  if a_key not in res_by_ids:
    res_by_ids[a_key] = set()
  res_by_ids[a_key].add(a_id)
  res_by_ids[a_key].add(b_id)
for key in expected_res:
  if key not in res_by_ids:
    if len(expected_res[key]) > 1:
      # print(f'ERROR: not found {key}')
      fn += len(expected_res[key])
    continue
  # if len(expected_res[key]) != len(res_by_ids[key]):
  #   print(f'ERROR: not full {key}')
  fn += len(expected_res[key]) - len(res_by_ids[key])
  tp += len(res_by_ids[key])
print(f'tp: {tp}, fp: {fp}, fn: {fn}')
precision = tp / (tp + fp)
recall = tp / (tp + fn)
print(f'precision: {precision}')
print(f'recall: {recall}')
print(f'f1: {2 * precision * recall / (precision + recall)}')

tp: 440, fp: 0, fn: 111
precision: 1.0
recall: 0.7985480943738656
f1: 0.887991927346115
