In [1]:
import re
import csv
import pandas as pd
from natasha import AddrExtractor, MorphVocab

In [2]:
morph_vocab = MorphVocab()
addr_extractor = AddrExtractor(morph_vocab)

In [3]:
# Подготовка текста
def normalize_text(text:str) -> str:
    # Знак тильды обозначает четные диапазоны номеров, а звездочка нечетные
    text = text.replace('(нечет ', '* ').replace('(четн.)', '~ ').replace('(нечетн.)', '* ').replace('(неч.)', '* ')
    text = text.replace(' ~', '~').replace(' *', '*')
    text = text.replace(',', ' ')

    # Отрезаем часть, которая следует после текста 'без ХВС', если такой имеется
    if 'БЕЗ ХВС' in text.upper() or 'ХВС' in text.upper():
      text = re.findall(r"без ХВС(.*)", text, re.IGNORECASE)
      text = ' '.join(text)
    if 'Х.В.С' in text.upper():
      text = re.findall(r"без Х.В.С(.*)", text, re.IGNORECASE)
      text = ' '.join(text)

    # Чистим текст
    text = re.sub(r'\((?:(?!к.|к|корп|корпуса)[^()])+\)', ' ', text)
    text = re.sub(r'(?<=\d)\.(?=\d)', ' ', text)

    text = text.replace("Ульяновск г.", "").replace("Ульяновск г", "").replace("г. Ульяновск", "").replace("г Ульяновск", "").replace("Ульяновск", "")
    
    # Удаляем множественных пробелов
    text = ' '.join(text.split())

    # Заменяем пробелы, начало и конец строки спец.символом '|' для дальнейшей обработки Natasha
    text = text.replace(' ', '|')
    if len(text) < 2:
        return ''
    if text[0] != '|':
        text = '|' + text
    if text[-1] != '|':
        text = text + '|'
    
    return text

# Вспомогательная функция для определения начала адреса
def is_beginning_of_address(_type:str) -> bool:
    return _type == "улица" or _type == "посёлок"

# Нахождение и получение типов частей адресов и их значений
def finding_address_types(text:str) -> list:
    matches = addr_extractor(text)
    facts = [i.fact.as_json for i in matches]
    addresses = []
    current_address = []
    for i in range(len(facts)):
        tmp = list(facts[i].values())
        _type = ""
        _value = ""
        if len(tmp) == 2:
            _value = tmp[0]
            _type = tmp[1]
        else:
            _value = tmp[0]
            _type = "корпус"

        if is_beginning_of_address(_type):
            if current_address: 
                addresses.append(current_address.copy()) 
            current_address = []

        current_address.append({"type": _type, "value": _value})

    if current_address:
        addresses.append(current_address.copy())

    return addresses

In [4]:
# Упорядочивание адресов и нахождение их УИДов
def extracting_address(addresses:list, addresses_df:pd.DataFrame) -> list:
    houses_uuids = []
    
    for address_object in addresses:
        village = None
        street = None
        numbers = []
        for token in address_object:
            if token['type'] == 'посёлок':
                village = token['value'].replace('|', ' ')

            elif token['type'] == 'улица':
                street = token['value'].replace('|', ' ')

            elif token['type'] == 'дом':
                if '-' in token['value']:

                    if re.search(r'[а-я]', token['value']):
                        letter = token['value'][-1]
                        if re.match(r'[а-яА-Я]', letter):
                            start, end = map(str, token['value'][:-1].split('-'))
                            if re.search(r'[а-я]', start) and start[-1] == letter:
                                numbers_diapason_with_letter = [str(i)+letter for i in range(int(start[:-1]), int(end) + 1)]
                                numbers.extend(numbers_diapason_with_letter)
                            else:
                                numbers_diapason = [str(i)+letter for i in range(int(start), int(end) + 1)]
                                numbers_diapason_with_letter  = [str(i)+letter for i in range(int(start), int(end) + 1)]
                                numbers.extend(numbers_diapason)
                                numbers.extend(numbers_diapason_with_letter)
                        else:
                            continue

                    elif '/' in token['value']:
                        diapason, slash = map(str, token['value'].split('/'))
                        start, end = map(int, diapason.split('-'))
                        numbers_diapason = [str(i) for i in range(start, end + 1)]
                        numbers_diapason_with_slash = [str(i)+'/'+slash for i in range(start, end + 1)]
                        numbers.extend(numbers_diapason)
                        numbers.extend(numbers_diapason_with_slash)

                    elif '~' in token['value']:
                        start, end = map(int, token['value'].replace('~', '').split('-'))
                        numbers_diapason = [str(i) for i in range(start, end + 1) if i % 2 == 0]
                        numbers.extend(numbers_diapason)

                    elif '*' in token['value']:
                        start, end = map(int, token['value'].replace('*', '').split('-'))
                        numbers_diapason = [str(i) for i in range(start, end + 1) if i % 2 != 0]
                        numbers.extend(numbers_diapason)

                    else:
                        start, end = map(int, token['value'].split('-'))
                        numbers_diapason = [str(i) for i in range(start, end + 1)]
                        numbers.extend(numbers_diapason)
                else:
                    numbers.append(token['value'].upper())

            elif token['type'] == 'корпус':
                numbers_with_corp = []
                for number in numbers:
                    for corp in token['value'].split('|'):
                        numbers_with_corp.append(f'{number} к. {corp}')
                numbers = numbers_with_corp

        if street:
            if numbers:
                for number in numbers:
                    if village != None:
                        uuids = addresses_df[
                            (addresses_df['village'].str.strip() == village.strip()) &
                            (addresses_df['street'].str.strip() == street.strip()) &
                            (addresses_df['home'].str.strip() == number.strip())
                                            ]['house_uuid'].tolist()
                        houses_uuids.extend(uuids)
                    else:
                        uuids = addresses_df[
                            (addresses_df['village'] == '') &
                            (addresses_df['street'].str.strip() == street.strip()) &
                            (addresses_df['home'].str.strip() == number.strip())
                                            ]['house_uuid'].tolist()
                        houses_uuids.extend(uuids)
            else: 
                if village != None:
                    uuids = addresses_df[
                        (addresses_df['village'].str.strip() == village.strip()) &
                        (addresses_df['street'].str.strip() == street.strip())
                                        ]['house_uuid'].tolist()
                    houses_uuids.extend(uuids)
                else:
                    uuids = addresses_df[
                        (addresses_df['village'] == '') &
                        (addresses_df['street'].str.strip() == street.strip())
                                        ]['house_uuid'].tolist()
                    houses_uuids.extend(uuids)

    return houses_uuids

In [5]:
#Загружаем файл с диспетчерскими комментариями
task_df = pd.read_csv(r'volgait2024-semifinal-task.csv', sep=';')

In [6]:
#Загружаем файл с УИДами и полными адресами
addresses_df = pd.read_csv(r'volgait2024-semifinal-addresses.csv', sep=';')
split_full_address = pd.DataFrame()
add_df = pd.DataFrame()

#Очищаем строчки столбца 'house_full_address' от избыточной информации
split_full_address["house_full_address"] = (
    addresses_df["house_full_address"]
    .str.split("Ульяновск г,").str[-1]
    .str.replace(r" (двлд.)", "").str.replace(r" (влд.)", "").str.replace(r" (соор)", "").str.replace(r" (зд.)", "")
    .str.replace(r" с,", ",").str.replace(r" п,", ",").str.replace(r" д,", ",").str.replace(r" ул,", ",")
    .str.replace(r" б-р,", ",").str.replace(r" пер,", ",").str.replace(r" ш,", ",").str.replace(r" пр-кт,", ",")
    .str.replace(r" с-к,", ",").str.replace(r" тер. СНТ,", ",").str.replace(r" пр-д,", ",").str.replace(r" ул.,", ",")
    .str.replace(r" тер. ГСК,", ",").str.replace(r" снт,", ",").str.replace(r" тер.,", ",").str.replace(r" пер.,", ",")
    .str.replace(r"стр.", "стр").str.replace(r" проезд,", ",").str.replace(r" пл,", ",").str.replace(r" спуск,", ",")
    .str.replace(r"ст,", ",").str.replace(r" ш,", ",")
    .str.strip())

#Задаем правило разделения и разделяем столбец 'house_full_address' на три новых
addresses_by_columns = (split_full_address["house_full_address"].str.split(", ", expand=True))

addresses_by_columns = addresses_by_columns.fillna('')
mask_bull = addresses_by_columns[2] == ''
addresses_by_columns[2] = addresses_by_columns[2].where(~mask_bull, addresses_by_columns[1])
addresses_by_columns[1] = addresses_by_columns[1].where(~mask_bull, addresses_by_columns[0])
addresses_by_columns[0] = addresses_by_columns[0].where(~mask_bull, '')

#Соединяем новых столбцы со столбцом 'house_uuid' и даем им наименования
addresses_df = pd.concat([addresses_df['house_uuid'], addresses_by_columns], axis=1).rename(columns={0: 'village', 1: 'street', 2: 'home'})
addresses_df['home'] = addresses_df['home'].str.replace(r'\(.*?\)', '', regex=True).str.strip()

In [None]:
# Создание файла volgait2024-semifinal-result-test.csv и запись в него
# Индекс, на котором закончилась предыдущая запись (позволяет продолжить после ошибки)
begin_index = 0
mode = 'w' if begin_index == 0 else 'a'
with open('volgait2024-semifinal-result-test.csv', mode=mode, newline='', encoding='utf-8') as csvfile:
    writer = csv.writer(csvfile, delimiter=';',)

    if begin_index == 0:
        writer.writerow(['shutdown_id', 'house_uuids'])

    for comment, shutdown_id in zip(task_df['comment'], task_df['shutdown_id']):
        if begin_index > shutdown_id:
            continue
        normalized_comment = normalize_text(comment)
        addresses = finding_address_types(normalized_comment)
        houses_uuids = extracting_address(addresses, addresses_df)
        house_uuid_str = ', '.join(houses_uuids)
        print(shutdown_id)

        writer.writerow([shutdown_id, house_uuid_str])