Для задачи дедупликации будем использовать библиотеку dedupe, в котором используется машинное обучение.

Основной код для запуска алгоритма взят из документации: https://dedupeio.github.io/dedupe-examples/docs/mysql_example.html

In [1]:
import os
import itertools
import logging
import json
from getpass import getpass

import MySQLdb
import MySQLdb.cursors

import dedupe
import dedupe.backport

In [2]:
def record_pairs(result_set):
    for i, row in enumerate(result_set):
        a_record_id, a_record, b_record_id, b_record = row
        record_a = (a_record_id, json.loads(a_record))
        record_b = (b_record_id, json.loads(b_record))

        yield record_a, record_b

        if i % 10000 == 0:
            print(i)

            
def cluster_ids(clustered_dupes):

    for cluster, scores in clustered_dupes:
        cluster_id = cluster[0]
        for id, score in zip(cluster, scores):
            yield id, cluster_id, score

Вводим логин и пароль для подключения к MySQL и указываем путь к файлам, используемым dedupe (если файлов нет, dedupe запустит процесс обучения и создаст файлы).

In [3]:
db_config = {
    'host': 'localhost',
    'user': input('Enter MySQL username: '),
    'password': getpass('Enter MySQL password: '),
}

log_level = logging.DEBUG
logging.getLogger().setLevel(log_level)

settings_file = 'mysql_example_settings'
training_file = 'mysql_example_training.json'

Enter MySQL username: root
Enter MySQL password: ········


Подключаемся к базе данных.

In [4]:
read_con = MySQLdb.connect(**db_config, database='task3',
                           charset='utf8',
                           cursorclass=MySQLdb.cursors.SSDictCursor)

write_con = MySQLdb.connect(**db_config, database='task3',
                            charset='utf8')

Создаем новую базу данных task3, внутри нее создаем таблицу outlets, заполняем ее с помощью запроса outlets.sql.

Далее создаем таблицу с предобработанными данными (строки переводятся в нижний регистр).

In [5]:
with write_con.cursor() as cur:
    cur.execute("drop database if exists task3")
    cur.execute("create database task3")
    cur.execute("use task3")

    with open('outlets.sql', encoding='utf-8', mode='r') as query:
        cur.execute(query.read())
        
    cur.execute("CREATE TABLE processed_outlets AS " 
          "(SELECT id, " 
          " LOWER(`Город дистрибьютора`) AS city, " 
          " LOWER(Торг_точка_грязная) AS name, " 
          " LOWER(Торг_точка_грязная_адрес) AS address" 
          " FROM outlets)")
 
    cur.execute("CREATE INDEX processed_idx ON processed_outlets (id)")
        
write_con.commit()

Запускаем dedupe: 
* Выбираем поля, которые важны при определении дубликатов;
* Если файлов для обучения нет, запускаем процесс обучения (может занять много времени) - алгоритм будет выдавать пары записей и спрашивать, являются ли они одной и той же сущностью (ответы на выбор: да/нет/не уверен/завершить);
* Если файлы уже есть (в данном случае мы уже провели обучение), то алгоритм просто считывает информацию из них.

In [6]:
OUTLET_SELECT = "SELECT id, city, name, address " \
               "from processed_outlets"

if os.path.exists(settings_file):
    print('reading from ', settings_file)
    with open(settings_file, 'rb') as sf:
        deduper = dedupe.StaticDedupe(sf, num_cores=4)
else:

    fields = [{'field': 'name', 'type': 'String'},
              {'field': 'address', 'type': 'String'},
              {'field': 'city', 'type': 'String'},
              ]

    deduper = dedupe.Dedupe(fields, num_cores=None)

    with read_con.cursor() as cur:
        cur.execute(OUTLET_SELECT)
        temp_d = {i: row for i, row in enumerate(cur)}
    
    print('PRINT')
    
    if os.path.exists(training_file):
        print('reading labeled examples from ', training_file)
        with open(training_file) as tf:
            deduper.prepare_training(temp_d, training_file=tf)
    else:
        deduper.prepare_training(temp_d)

    del temp_d
    
    print('starting active labeling...')

    dedupe.convenience.console_label(deduper)

    with open(training_file, 'w') as tf:
        deduper.write_training(tf)

    deduper.train(recall=0.90)

    with open(settings_file, 'wb') as sf:
        deduper.write_settings(sf)

    deduper.cleanup_training()

INFO:dedupe.api:Predicate set:
INFO:dedupe.api:(LevenshteinCanopyPredicate: (1, name), TfidfTextCanopyPredicate: (0.6, name), SimplePredicate: (oneGramFingerprint, name))
INFO:dedupe.api:(SimplePredicate: (firstIntegerPredicate, address), TfidfNGramCanopyPredicate: (0.2, name), SimplePredicate: (fingerprint, address))
INFO:dedupe.api:(SimplePredicate: (sameSevenCharStartPredicate, name), SimplePredicate: (nearIntegersPredicate, name), SimplePredicate: (oneGramFingerprint, address))
INFO:dedupe.api:(LevenshteinCanopyPredicate: (4, name), LevenshteinCanopyPredicate: (3, address), SimplePredicate: (sortedAcronym, name))
INFO:dedupe.api:(TfidfNGramCanopyPredicate: (0.6, address), SimplePredicate: (doubleMetaphone, name), SimplePredicate: (firstIntegerPredicate, address))
INFO:dedupe.api:(TfidfNGramCanopyPredicate: (0.6, name), SimplePredicate: (commonIntegerPredicate, name), SimplePredicate: (metaphoneToken, name))
INFO:dedupe.api:(SimplePredicate: (commonThreeTokens, name), SimplePredicat

reading from  mysql_example_settings


In [7]:
print('blocking...')

print('creating blocking_map database')
with write_con.cursor() as cur:
    cur.execute("DROP TABLE IF EXISTS blocking_map")
    cur.execute("CREATE TABLE blocking_map "
                "(block_key VARCHAR(200), id INTEGER) "
                "CHARACTER SET utf8 COLLATE utf8_unicode_ci")

write_con.commit()

blocking...
creating blocking_map database


In [8]:
print('creating inverted index')

for field in deduper.fingerprinter.index_fields:
    with read_con.cursor() as cur:
        cur.execute("SELECT DISTINCT {field} FROM processed_outlets "
                    "WHERE {field} IS NOT NULL".format(field=field))
        field_data = (list(row.values())[0] for row in cur)
        deduper.fingerprinter.index(field_data, field)

creating inverted index


DEBUG:dedupe.blocking:Canopy: LevenshteinCanopyPredicate: (1, name)
DEBUG:dedupe.blocking:Canopy: LevenshteinCanopyPredicate: (4, name)
INFO:dedupe.canopy_index:Removing stop word ип
INFO:dedupe.canopy_index:Removing stop word а
INFO:dedupe.canopy_index:Removing stop word н
INFO:dedupe.canopy_index:Removing stop word в
INFO:dedupe.canopy_index:Removing stop word ооо
INFO:dedupe.canopy_index:Removing stop word г
INFO:dedupe.canopy_index:Removing stop word и
DEBUG:dedupe.blocking:Canopy: TfidfTextCanopyPredicate: (0.6, name)
INFO:dedupe.canopy_index:Removing stop word  г
INFO:dedupe.canopy_index:Removing stop word в 
INFO:dedupe.canopy_index:Removing stop word ип
INFO:dedupe.canopy_index:Removing stop word ов
INFO:dedupe.canopy_index:Removing stop word  м
INFO:dedupe.canopy_index:Removing stop word ва
INFO:dedupe.canopy_index:Removing stop word ма
INFO:dedupe.canopy_index:Removing stop word  а
INFO:dedupe.canopy_index:Removing stop word ас
INFO:dedupe.canopy_index:Removing stop word н 
I

In [9]:
print('writing blocking map')

with read_con.cursor() as read_cur:
    read_cur.execute(OUTLET_SELECT)
    full_data = ((row['id'], row) for row in read_cur)
    b_data = deduper.fingerprinter(full_data)

    with write_con.cursor() as write_cur:

        write_cur.executemany("INSERT INTO blocking_map VALUES (%s, %s)",
                              b_data)

write_con.commit()

deduper.fingerprinter.reset_indices()

writing blocking map


INFO:dedupe.blocking:10000, 36.3222082 seconds
INFO:dedupe.blocking:20000, 87.5578712 seconds


In [10]:
print('creating index')
with write_con.cursor() as cur:
    cur.execute("CREATE UNIQUE INDEX bm_idx ON blocking_map (block_key, id)")

write_con.commit()
read_con.commit()

creating index


Вычисляем, к какому кластеру принадлежит каждая запись, и сохраняем в таблицу entity_map.
Задача почти решена.

In [11]:
with read_con.cursor(MySQLdb.cursors.SSCursor) as read_cur:

    read_cur.execute("""
           select a.id,
                  json_object('city', a.city,
                              'name', a.name,
                              'address', a.address),
                  b.id,
                  json_object('city', b.city,
                              'name', b.name,
                              'address', b.address)
           from (select DISTINCT l.id as east, r.id as west
                 from blocking_map as l
                 INNER JOIN blocking_map as r
                 using (block_key)
                 where l.id < r.id) ids
           INNER JOIN processed_outlets a on ids.east=a.id
           INNER JOIN processed_outlets b on ids.west=b.id
           """)

    print('clustering...')
    clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)),
                                      threshold=0.3)
    
    with write_con.cursor() as write_cur:

        write_cur.execute("DROP TABLE IF EXISTS entity_map")

        print('creating entity_map database')
        write_cur.execute("CREATE TABLE entity_map "
                          "(id INTEGER, canon_id INTEGER, "
                          " cluster_score FLOAT, PRIMARY KEY(id))")

        write_cur.executemany('INSERT INTO entity_map VALUES (%s, %s, %s)',
                              cluster_ids(clustered_dupes))
        
write_con.commit()

with write_con.cursor() as cur:
    cur.execute("CREATE INDEX head_index ON entity_map (canon_id)")

write_con.commit()
read_con.commit()

clustering...
0
10000
20000
30000


DEBUG:dedupe.api:matching done, begin clustering


creating entity_map database


С использованием entity_map обновляем сначала outlets_clean, а затем и outlets.

In [12]:
with write_con.cursor() as cur:
    cur.execute('''
    insert into outlets_clean (Торг_точка_чистый_адрес, id)
    select distinct t1.Торг_точка_грязная_адрес, t2.canon_id
    from outlets as t1
    right join entity_map as t2
    on t1.id = t2.canon_id;

    update outlets
    inner join (select t1.id as id, t2.canon_id as canon_id
    from outlets as t1
    left join entity_map as t2
    on t1.id = t2.id) as t
    on outlets.id = t.id
    set outlets.outlet_clean_id = t.canon_id;
    ''')

write_con.commit()

read_con.close()
write_con.close()