**Table of contents**<a id='toc0_'></a>    
- [Общие методы](#toc1_)    
  - [Обработка дат](#toc1_1_)    
  - [Обработка уровней орагнизации](#toc1_2_)    
  - [Обработка КБК](#toc1_3_)    
  - [Поиск и замена плохих адресов](#toc1_4_)    
  - [Декомпозиция адресов](#toc1_5_)    
- [Метод для обработки ORG](#toc2_)    
- [Метод для обработки contract](#toc3_)    
- [Класс для обработки данных](#toc4_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

In [1]:
import os
import re
import datetime
import copy

import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from tqdm.auto import tqdm
from pandarallel import pandarallel
from pullenti.address.AddressService import AddressService

AddressService.set_server_connection("http://localhost:2222")

tqdm.pandas()
pandarallel.initialize(progress_bar=False)
# Сброс ограничений на число столбцов
pd.set_option("display.max_columns", 200)

# Сброс ограничений на количество символов в записи
pd.set_option("display.max_colwidth", 200)

INFO: Pandarallel will run on 32 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


  from .autonotebook import tqdm as notebook_tqdm


# <a id='toc1_'></a>[Общие методы](#toc0_)

In [2]:
df = pd.read_csv("../data/raw_data/contract/2014/0.csv", sep="|", dtype="str")

In [3]:
df.columns

Index(['number_contract', 'address_customer', 'full_name_customer',
       'short_name_customer', 'code', 'code_type', 'id_customer',
       'inn_customer', 'kpp_customer', 'code_form_org', 'okpo_code',
       'municipal_code', 'budget_name', 'extrabudget_name', 'budget_level',
       'contract_status', 'notice', 'ikz_code', 'id_contract_electronic',
       'unique_number_plan', 'method_determinig_supplier', 'date_summarizing',
       'date_posting', 'grouds_single_supplier', 'document_details',
       'info_support', 'date_contract', 'date_performance',
       'date_contract_registry', 'date_update_registry',
       'date_start_performance', 'date_end_performance', 'contract_item',
       'contract_price', 'contract_price_nds', 'prepayment_amount',
       'performance_security', 'size_performance_quality', 'warranty_period',
       'place_performance', 'full_name_supplier', 'inn_supplier',
       'kpp_supplier', 'code_okpo_supplier', 'date_registration_supplier',
       'country_suppl

In [4]:
# path = "../data/raw_data/contract/2014/"
# for file_name in tqdm(os.listdir(path)):
#     file_name = os.path.join(path, file_name)
#     df = pd.read_csv(file_name, sep="|", dtype="str")
#     df.columns = [
#         "number_contract",
#         "address_customer",
#         "full_name_customer",
#         "short_name_customer",
#         "code",
#         "code_type",
#         "id_customer",
#         "inn_customer",
#         "kpp_customer",
#         "code_form_org",
#         "okpo_code",
#         "municipal_code",
#         "budget_name",
#         "extrabudget_name",
#         "budget_level",
#         "contract_status",
#         "notice",
#         "ikz_code",
#         "id_contract_electronic",
#         "unique_number_plan",
#         "method_determinig_supplier",
#         "date_summarizing",
#         "date_posting",
#         "grouds_single_supplier",
#         "document_details",
#         "info_support",
#         "date_contract",
#         "date_performance",
#         "date_contract_registry",
#         "date_update_registry",
#         "date_start_performance",
#         "date_end_performance",
#         "contract_item",
#         "contract_price",
#         "contract_price_nds",
#         "prepayment_amount",
#         "performance_security",
#         "size_performance_quality",
#         "warranty_period",
#         "place_performance",
#         "full_name_supplier",
#         "inn_supplier",
#         "kpp_supplier",
#         "code_okpo_supplier",
#         "date_registration_supplier",
#         "country_supplier",
#         "code_country_supplier",
#         "address_supplier",
#         "postal_address_supplier",
#         "contact",
#         "status_supplier",
#         "kbk",
#     ]
#     df.to_csv(file_name, sep="|", index=False)

## <a id='toc1_1_'></a>[Обработка дат](#toc0_)

In [5]:
dict_month = {
    "января": "01",
    "февраля": "02",
    "марта": "03",
    "апреля": "04",
    "мая": "05",
    "июня": "06",
    "июля": "07",
    "августа": "08",
    "сентября": "09",
    "октября": "10",
    "ноября": "11",
    "декабря": "12",
}


def date_extract(date: str):
    if not date or date == "--.--.----" or type(date) != str:
        return None

    date = date.replace("Загрузка ...", "").strip()
    try:
        return datetime.datetime.strptime(date, "%d.%m.%Y").date()
    except ValueError:
        pass

    try:
        return datetime.datetime.strptime(date[:10], "%d.%m.%Y").date()
    except ValueError:
        pass

    try:
        return datetime.datetime.strptime(date.split()[0], "%d.%m.%Y").date()
    except ValueError:
        pass

    for key, value in dict_month.items():
        if key in date:
            date = date.replace(key, value)
            date = ".".join(date.split())

    date = date.split(".")

    if len(date) == 2:
        date = ".".join(["01"] + date)
        return datetime.datetime.strptime(date[:10], "%d.%m.%Y").date()
    # логи

## <a id='toc1_2_'></a>[Обработка уровней орагнизации](#toc0_)

In [6]:
list_local = ["муниципальный уровень", "местный бюджет"]
list_sub = [
    "уровень субъекта рф",
    "бюджет субъекта российской федерации",
    "бюджет территориального государственного внебюджетного фонда",
    "бюджет территориального государственного внебюджетного фонда",
]
list_fed = [
    "федеральный уровень",
    "федеральный бюджет",
    "бюджет пенсионного фонда российской федерации",
    "бюджет федерального фонда обязательного медицинского страхования",
    "бюджет фонда социального страхования российской федерации",
]

list_fed_2 = [
    "войскавая",
    "войсковая",
    "воениз",
    "федеральн",
    "район водных путей и судоходства",
    "следвест",
    "пенсион",
    "росган",
    "фгбу",
    "всероссийс",
    " фбу",
    "прокурату",
    "университ",
    "научно-исследователь",
    "государственное научное учреждение",
    "росграниц",
    "российской академии наук",
    "внутренних дел",
    "мвд",
    "суд",
    "управление министерства промышленности",
    "торговли российской федерации",
    "таможенный пост",
    "российской федерации",
    "таможня",
    "таможенного",
    "российская академия образования",
    "российская академия образования",
    "российская академия художеств",
]

list_local_2 = [
    "городская администрация",
    "муниципал",
    "школа",
    "детский сад",
    "городского поселения",
    "городского округа",
    "администрация рабочего поселка",
    "совет",
    "управление образованием администрации г",
    "поселок",
    "поселк",
    "частное учереждение",
    "администрации зато",
    "администрация пгт",
    "территориальная избирательная комиссия",
    "районный исполнительный комитет",
]
list_sub_2 = [
    "центр занятости населения",
    "област",
    "республи",
    "края",
    "край",
    "краев",
    "города",
    "автоном",
    "oбластное",
    "здравоохране",
    "больниц",
    "родильный дом",
    "профессиональн",
    "детский дом",
    "дом-интернат",
    "социальн",
    "поликлиник",
    "больниц",
    "государственное бюджетное общеобразовательное учреждение",
    "государственное бюджетное образовательное учреждение",
    "медико-санитарная часть",
    "московское государственное унитарное предприятие",
    "учреждение культуры города",
    "государственное бюджетное учреждение культуры",
    "центр социального обслуживания",
    "социального обслуживания граждан",
    "стоматологическая поликлиника",
    "центр для детей-сирот и детей",
    "санкт-петербур",
    "фонд социального страхования российской федерации",
    "государственное бюджетное учреждение",
    "инспекция труда",
    "региональный",
]
list_anothe = [
    "акционерное общество",
    "завод",
    "акционерное московское общество",
    "общество с ограниченной ответственностью",
    "частное",
    "комбинат",
    "ооо ",
]
list_fed_3 = []
list_sub_3 = ["ветеринар", "колледж"]
list_local_3 = [
    "района",
    "сельск",
    "районное управление образованием",
    "районное бюджетное учреждение",
    "мбу ",
    "городской Исполнительный комитет",
    "городская избирательная комиссия",
]

inn_sub = {
    "7727795994": 'Государственное бюджетное научное учреждение "Московский институт развития образования"',
    "4205050521": 'государственное учреждение "Кузбасспассажиравтотранс"',
    "1001036026": 'БЮДЖЕТНОЕ УЧРЕЖДЕНИЕ "ЦЕНТР КУЛЬТУРНЫХ ИНИЦИАТИВ" (АГЕНТСТВО "КУЛЬТУРНАЯ СЕТЬ КАРЕЛИИ")',
    "2309102153": "ГУП КК Кубаньфармация",
}
inn_fed = {
    "7704193182": "Региональное оперативно-поисковое управление",
    "6672239827": 'ЛИНЕЙНЫЙ ОТДЕЛ МИНИСТЕРСТВА ВНУТРЕННИХ ДЕЛ РОССИЙСКОЙ ФЕДЕРАЦИИ В АЭРОПОРТУ "КОЛЬЦОВО',
}
inn_mun = {
    "3509009509": 'БЮДЖЕТНОЕ УЧРЕЖДЕНИЕ "КОММУНАЛЬЩИК"',
    "3511005766": 'БЮДЖЕТНОЕ УЧРЕЖДЕНИЕ КУЛЬТУРЫ "КИРИЛЛОВСКИЙ КИНОДОСУГОВЫЙ ЦЕНТР"',
    "4506004871": "Управление по делам образования, культуры, молодежи и спорта",
}
inn_another = {"1633002328": 'ДЕТСКИЙ ОЗДОРОВИТЕЛЬНЫЙ ЛАГЕРЬ "ЧАЙКА"'}

name_for_result = ["местный", "субъектовый", "федеральный", "иное"]


def fillna_organization_level(budget_level: str, full_name_customer: str, inn_customer: str):
    if type(budget_level) != str:
        budget_level = None
    elif type(budget_level) == str:
        budget_level = budget_level.lower()

    if type(full_name_customer) != str:
        full_name_customer = None
    elif type(full_name_customer) == str:
        full_name_customer = full_name_customer.lower()

    if type(inn_customer) != str:
        inn_customer = None

    if budget_level:
        for list_name, name in zip(
            [list_local, list_sub, list_fed], ["местный", "субъектовый", "федеральный"]
        ):
            for name_trigger in list_name:
                if name_trigger.lower() in budget_level:
                    return name

    # если не получилось выделить данные из budget_level попробуем сделать это с full_name_customer
    if full_name_customer:
        for list_name, name in zip(
            [list_fed_2, list_local_2, list_sub_2, list_anothe],
            ["федеральный", "местный", "субъектовый", "иное"],
        ):
            for name_trigger in list_name:
                if name_trigger.lower() in full_name_customer:
                    return name

        for list_name, name in zip(
            [list_fed_3, list_local_3, list_sub_3],
            ["федеральный", "местный", "субъектовый", "иное"],
        ):
            for name_trigger in list_name:
                if name_trigger.lower() in full_name_customer:
                    return name

        if (
            "администрац" in full_name_customer
            or "комитет по управлению имуществом" in full_name_customer
        ) and not all(
            [
                i in full_name_customer
                for i in ["моксв", "севастопол" "президент", "санкт-петербур"]
            ]
        ):
            return "местный"

        if "городская дума" in full_name_customer and "моксв" not in full_name_customer:
            return "местный"

    if inn_customer:
        for inn_dict, name in zip(
            [inn_mun, inn_sub, inn_fed, inn_another],
            ["местный", "субъектовый", "федеральный", "иное"],
        ):
            for inn in inn_dict.keys():
                if inn == inn_customer:
                    return name
    # добавить логги

    return None

## <a id='toc1_3_'></a>[Обработка КБК](#toc0_)

In [7]:
kbk_type = pd.read_excel("../data/kbk.xlsx", sheet_name="type", dtype="str")
kbk_np = pd.read_excel("../data/kbk.xlsx", sheet_name="np", dtype="str")
kbk_section = pd.read_excel("../data/kbk.xlsx", sheet_name="section", dtype="str")

In [8]:
def extract_data_from_kbk(kbk, year):
    dict_kbk = {
        "code_main_admin": None,
        "code_section_sub": None,
        "code_direction_expenses": None,
        "code_type_expenses": None,
        "code_national_project": None,
        "value_code_section": None,
        "value_code_sub": None,
        "value_code_type_expenses": None,
        "name_national_project": None,
        "name_fed_national_project": None,
    }
    if not kbk or type(kbk) != str:
        return dict_kbk

    if len(kbk) == 3 or kbk[:-3] == "0" * 17:
        code_type_expenses = kbk[-3:]
        value_code_type_expenses = kbk_type.loc[
            kbk_type.code == code_type_expenses, "mean"
        ].to_list()

        if len(value_code_type_expenses):
            dict_kbk["value_code_type_expenses"] = value_code_type_expenses[0]
        else:
            pass
            # логи
        dict_kbk["code_type_expenses"] = code_type_expenses
        return dict_kbk

    elif len(kbk) == 20:
        kbk_search = re.compile(r"(\S\S\S)(\S\S\S\S)(\S\S\S\S\S\S\S\S\S\S)(\S\S\S)")
        kbk_find = kbk_search.search(kbk)

        # код главного распоредителя бюджетных средств
        code_main_admin = kbk_find.group(1)
        # print('Код ГРС:', code_main_admin)
        # код раздела и подраздела
        code_section_sub = kbk_find.group(2)
        # print('Код раздела и подраздела:', code_section_sub)
        # код целевой статьи
        code_direction_expenses = kbk_find.group(3)
        # print('Код целевой статьи:', code_direction_expenses)
        # код вида расходов
        code_type_expenses = kbk_find.group(4)
        # print('Код вида расходов:', code_type_expenses)
        # код национального проекта
        code_national_project = (
            code_direction_expenses[3:5] if not code_direction_expenses[3].isdigit() else None
        )
        # print('Код национального проекта:', code_national_project)

        value_code_section = kbk_section.loc[
            (kbk_section.year == year) & (kbk_section.code == code_section_sub[:2]), "mean"
        ].to_list()
        if len(value_code_section):
            dict_kbk["value_code_section"] = value_code_section[0]
        else:
            pass
        # print('value_code_section:', value_code_section)

        value_code_sub = kbk_section.loc[
            (kbk_section.year == year) & (kbk_section.code == code_section_sub), "mean"
        ].to_list()
        if len(value_code_sub):
            dict_kbk["value_code_sub"] = value_code_sub[0]
        else:
            pass
        # print('code_type_expenses:', value_code_sub)

        value_code_type_expenses = kbk_type.loc[
            kbk_type.code == code_type_expenses, "mean"
        ].to_list()
        if len(value_code_type_expenses):
            dict_kbk["value_code_type_expenses"] = value_code_type_expenses[0]
        else:
            pass

        # print('value_code_type_expenses:', value_code_type_expenses)
        if code_national_project:
            list_national_project = kbk_np.loc[
                (kbk_np.year == year) & (kbk_np.code == code_national_project),
                ["name_national_project", "name_fed_national_project"],
            ].values
            # print('list_national_project:', list_national_project)

            if len(list_national_project):
                dict_kbk["name_national_project"] = list_national_project[0]
                dict_kbk["name_fed_national_project"] = list_national_project[1]
            else:
                pass
                # логи

        dict_kbk["code_main_admin"] = code_main_admin
        dict_kbk["code_section_sub"] = code_section_sub
        dict_kbk["code_direction_expenses"] = code_direction_expenses
        dict_kbk["code_type_expenses"] = code_type_expenses
        dict_kbk["code_national_project"] = code_national_project

        return dict_kbk

    else:
        # print(kbk)
        return dict_kbk
        # добавить логи

## <a id='toc1_4_'></a>[Поиск и замена плохих адресов](#toc0_)

In [9]:
data_org = pd.DataFrame()


def check_address(address: str, unique: str, cahce_address_customers: dict):
    if type(code) != str:
        code = None
    if type(code_type) != str:
        code_type = None

    list_check = ["Российская Федерация", "РФ", "обл", "ул", "край", "г,", "п."]
    is_nan = type(address) == float
    is_telephon = address.replace("-", "").replace(" ", "").isdigit()
    is_email = ("@" in address) and not any([i in address for i in list_check])

    need_replace = any([is_nan, is_telephon, is_email])

    if not need_replace:
        return address
    else:
        return cahce_address_customers[unique]


def apply_chech_address(address):
    if type(address) != str:
        return True
    list_check = ["Российская Федерация", "РФ", "обл", "ул", "край", "г,", "п."]
    is_nan = type(address) == float
    is_telephon = address.replace("-", "").replace(" ", "").isdigit()
    is_email = ("@" in address) and not any([i in address for i in list_check])

    return any([is_nan, is_telephon, is_email])

In [10]:
# bad_address = set()
# num_bad_address = 0
# path = '../data/raw_data/contract/2014/'
# for file_name in tqdm(os.listdir(path)):
#     file_name = os.path.join(path, file_name)
#     df_now = pd.read_csv(file_name, dtype='str', sep="|")

#     df_now['result'] = df_now['address_customer'].parallel_apply(apply_chech_address)
#     num_bad_address += len(df_now[df_now.result == True])
#     bad_address.update(set(df_now[df_now.result == True].address_customer))

## <a id='toc1_5_'></a>[Декомпозиция адресов](#toc0_)

In [73]:
class DecompositionAddress:
    def __init__(self, path_for_cache: str, year: str):
        self.path_for_cache = path_for_cache
        self.columns = [
            "country",
            "regioncity",
            "regionarea",
            "district",
            "settlement",
            "city",
            "citydistrict",
            "locality",
            "territory",
            "street",
            "plot",
            "building",
            "apartment",
            "room",
            "coef",
        ]
        self.extra_columns = ["address", "year"]
        self.year = year

        if not os.path.exists(path_for_cache):
            pd.DataFrame(columns=self.extra_columns + self.columns).to_csv(
                path_for_cache, sep="|", index=False
            )

        self.dict_cahce = pd.read_csv(
            path_for_cache,
            sep="|",
            dtype="str",
            index_col="address",
            usecols=self.columns + [self.extra_columns[0]],
        ).to_dict(orient="index")

    def address_decompose(self, address: str):
        if not address or address == "" or type(address) != str:
            return {key: None for key in self.columns}

        if address in self.dict_cahce:
            return self.dict_cahce[address]

        else:
            return self.use_pullenti(address)

    def use_pullenti(self, address: str):
        dict_res = {key: None for key in self.columns}
        process_address = AddressService.process_single_address_text(address)

        dict_res["coef"] = process_address.coef

        for address_element in process_address.items:
            level = str(address_element.level).split(".")[1].lower()
            element_address = address_element.to_string_min()
            dict_res[level] = element_address

        self.add_address_to_cache(address, dict_res)
        dict_res.pop("coef")
        return dict_res

    def add_address_to_cache(self, address, dict_result):
        self.dict_cahce[address] = dict_result
        dict_result_for_df = dict_result.copy()
        dict_result_for_df["address"] = address
        dict_result_for_df["year"] = self.year
        pd.DataFrame(dict_result_for_df, index=[0])[self.extra_columns + self.columns].to_csv(
            self.path_for_cache, sep="|", index=False, mode="a", header=False
        )

In [65]:
pd.read_csv(
    "../data/cache/cache_address.csv", sep="|", dtype="str", usecols=["address", "country"]
)

Unnamed: 0,address,country
0,"Российская Федерация, 142279, Московская обл, Серпухов г, Оболенск п, ТЕР. КВАРТАЛ А",Россия
1,"Российская Федерация, 140578, Московская обл, Озёры г, Емельяновка д, УЛИЦА САДОВАЯ",Россия
2,"Российская Федерация, 143968, Московская обл, Реутов г, Победы, 33",Россия
3,"Российская Федерация, 141930, Московская обл, Талдом г, Вербилки рп, УЛИЦА ЗАБЫРИНА, 4",
4,"Российская Федерация, 141900, Московская обл, Талдом г, ПЛОЩАДЬ К.МАРКСА, 12",Россия
...,...,...
11847,"630049 г. Новосибирск, ул. Дуси Ковальчук, д. 272/2",Россия
11848,"Российская Федерация, 630089, Новосибирская обл, Новосибирск г, ул АДРИЕНА ЛЕЖЕНА, 5/1",Россия
11849,"630090, г.Новосибирск, ул. Октябрьская, 42",Россия
11850,"Российская Федерация, 633623, Новосибирская обл, Сузун рп, ул ПАРТИЗАНСКАЯ, 214",Россия


In [12]:
address_dec = DecompositionAddress(path_for_cache="../data/cache/cache_address.csv")

In [13]:
# for i in tqdm(df.address.unique()):
#     address_dec.address_decompose(i)

In [14]:
# process_address = AddressService.process_single_address_text(
#     "Российская Федерация, 678280, Саха /Якутия/ Респ, Сунтарский у, Сарданга с, УЛ. СЕМЕНА СЕМЕНОВА, Д.43"
# )
# for address_element in process_address.items:
#     print("level", address_element.level)
#     print(address_element.to_string_min())
# print("coef:", process_address.coef)

# <a id='toc2_'></a>[Метод для обработки ORG](#toc0_)

In [15]:
df = pd.read_csv("../data/raw_data/org/2014_1/0.csv", sep="|", dtype="str")
print(df.columns)
df.shape

Index(['code', 'code_type', 'access_blocking', 'full_name', 'short_name',
       'adress', 'code_registr', 'date_registration', 'date_last_change',
       'inn', 'kpp', 'ogrn', 'oktmo', 'location', 'iky', 'date_iky',
       'code_okfs', 'name_property', 'okpf_code', 'okopf_name', 'credentials',
       'date_registration_tax', 'organization_type', 'organization_level',
       'okpo_code', 'okfd_code', 'budget_code', 'budget_name', 'telephone',
       'fax', 'postal_adress', 'email', 'site', 'contact_person', 'time_zone'],
      dtype='object')


(4499, 35)

In [16]:
df.columns = [
    "code",
    "code_type",
    "access_blocking",
    "full_name",
    "short_name",
    "address",
    "code_registr",
    "date_registration",
    "date_last_change",
    "inn",
    "kpp",
    "ogrn",
    "oktmo",
    "location",
    "iky",
    "date_iky",
    "code_okfs",
    "name_property",
    "okpf_code",
    "okopf_name",
    "credentials",
    "date_registration_tax",
    "organization_type",
    "organization_level",
    "okpo_code",
    "okfd_code",
    "budget_code",
    "budget_name",
    "telephone",
    "fax",
    "postal_address",
    "email",
    "site",
    "contact_person",
    "time_zone",
]

In [17]:
address_dec = DecompositionAddress(path_for_cache="../data/cache/cache_aderess.csv")


def processing_date_org(df: Series, columns: list):
    dict_result = {}
    # если мы получает nan при обращении к дате в DataFrame, то
    # np.nan == nan возращает False, однако у nan type float, когда у всех дат str
    inn = df["inn"]
    # unique_code = df["code"] + df['code_type']

    date_list = [i for i in columns if "date" in i]

    for date_type in date_list:
        dict_result[date_type] = date_extract(df[date_type])

    dict_result["organization_level"] = fillna_organization_level(
        df["organization_level"], df["full_name"], df["inn"]
    )

    return pd.Series(dict_result)[columns].to_list()

In [18]:
address_level = [
    "country",
    "regioncity",
    "regionarea",
    "district",
    "settlement",
    "city",
    "citydistrict",
    "locality",
    "territory",
    "street",
    "plot",
    "building",
    "apartment",
    "room",
]

In [19]:
# columns = [
#     "date_registration",
#     "date_last_change",
#     "date_registration_tax",
#     "date_iky",
#     "organization_level",
# ]
# df = pd.read_csv("../data/raw_data/org/2014_1/5.csv", sep="|", dtype="str")
# df_copy = df.copy()

# df_copy[columns] = df_copy.apply(
#     lambda x: processing_date_org(x, columns=columns), axis=1, result_type="expand"
# )
# df_copy[columns].isnull().sum()

In [20]:
# path_test = "../data/raw_data/org/2014_1/"
# prob = []
# num_problem = 0
# df_buffer = pd.DataFrame(columns=["organization_level", "full_name", "inn"])
# for file_name in tqdm(sorted(os.listdir(path_test), key=lambda x: int(x.removesuffix(".csv")))):
#     df_test_now = pd.read_csv(os.path.join(path_test, file_name), sep="|", dtype="str")
#     df_test_now[columns] = df_test_now.parallel_apply(
#         lambda x: processing_date_org(x, columns=columns), axis=1, result_type="expand"
#     )
#     problem_level_org = df_test_now.organization_level.isnull().sum()
#     num_problem += problem_level_org
#     if problem_level_org:
#         prob.append((file_name, problem_level_org))
#         df_buffer = pd.concat(
#             [
#                 df_buffer,
#                 df_test_now.loc[
#                     df_test_now.organization_level.isnull(),
#                     ["organization_level", "full_name", "inn"],
#                 ],
#             ]
#         )
# num_problem

# <a id='toc3_'></a>[Метод для обработки contract](#toc0_)

In [21]:
df = pd.read_csv("../data/raw_data/contract/2014/0.csv", sep="|", dtype="str")
df.columns

Index(['number_contract', 'address_customer', 'full_name_customer',
       'short_name_customer', 'code', 'code_type', 'id_customer',
       'inn_customer', 'kpp_customer', 'code_form_org', 'okpo_code',
       'municipal_code', 'budget_name', 'extrabudget_name', 'budget_level',
       'contract_status', 'notice', 'ikz_code', 'id_contract_electronic',
       'unique_number_plan', 'method_determinig_supplier', 'date_summarizing',
       'date_posting', 'grouds_single_supplier', 'document_details',
       'info_support', 'date_contract', 'date_performance',
       'date_contract_registry', 'date_update_registry',
       'date_start_performance', 'date_end_performance', 'contract_item',
       'contract_price', 'contract_price_nds', 'prepayment_amount',
       'performance_security', 'size_performance_quality', 'warranty_period',
       'place_performance', 'full_name_supplier', 'inn_supplier',
       'kpp_supplier', 'code_okpo_supplier', 'date_registration_supplier',
       'country_suppl

In [22]:
[i for i in df.columns if "date" in i]

['date_summarizing',
 'date_posting',
 'date_contract',
 'date_performance',
 'date_contract_registry',
 'date_update_registry',
 'date_start_performance',
 'date_end_performance',
 'date_registration_supplier']

In [23]:
def processing_date_contract(df: Series, columns: list):
    year = 2014
    dict_result = {}

    date_list = [i for i in columns if "date" in i]

    for date_type in date_list:
        dict_result[date_type] = date_extract(df[date_type])

    dict_result["organization_level"] = fillna_organization_level(
        df["budget_level"], df["full_name_customer"], df["inn_customer"]
    )

    for date in [
        "find_date_contract",
        "date_summarizing",
        "date_posting",
        "date_performance",
        "date_end_performance",
        "date_contract_registry",
    ]:
        if type(dict_result[date]) == datetime.date:
            year = str(dict_result[date].year)
            break

    dict_result.update(extract_data_from_kbk(df["kbk"], year))

    return pd.Series(dict_result)[columns].to_list()

In [24]:
# columns = [
#     "date_summarizing",
#     "date_posting",
#     "date_contract",
#     "date_performance",
#     "date_contract_registry",
#     "date_update_registry",
#     "date_start_performance",
#     "date_end_performance",
#     "date_registration_supplier",
#     "code_main_admin",
#     "code_section_sub",
#     "code_direction_expenses",
#     "code_type_expenses",
#     "code_national_project",
#     "value_code_section",
#     "value_code_sub",
#     "value_code_type_expenses",
#     "name_national_project",
#     "name_fed_national_project",
#     "organization_level",
# ]
# path_test = "../data/raw_data/contract/2014_1/"
# prob = []
# num_problem = 0
# df_buffer = pd.DataFrame(
#     columns=["budget_name", "budget_level", "full_name_customer", "inn_customer"]
# )
# for file_name in tqdm(sorted(os.listdir(path_test), key=lambda x: int(x.removesuffix(".csv")))):
#     df_test_now = pd.read_csv(os.path.join(path_test, file_name), sep="|", dtype="str")
#     df_test_now[columns] = df_test_now.parallel_apply(
#         lambda x: processing_date_contract(x, columns=columns), axis=1, result_type="expand"
#     )
#     problem_level_org = df_test_now.organization_level.isnull().sum()
#     num_problem += problem_level_org
#     if problem_level_org:
#         prob.append((file_name, problem_level_org))
#         df_buffer = pd.concat(
#             [
#                 df_buffer,
#                 df_test_now.loc[
#                     df_test_now.organization_level.isnull(),
#                     ["budget_name", "budget_level", "full_name_customer", "inn_customer"],
#                 ],
#             ]
#         )
# num_problem

# <a id='toc4_'></a>[Класс для обработки данных](#toc0_)

In [96]:
class ProcessingData:
    def __init__(
        self,
        path_cache_address: str,
        path_cache_org_address: str,
        path_kbk_table: str,
        default_year_for_kbk: str,
    ):
        self.address_dec = DecompositionAddress(path_for_cache=path_cache_address, year=default_year_for_kbk)

        self.path_cache_org_address = path_cache_org_address
        if not os.path.exists(path_cache_org_address):
            pd.DataFrame(columns=["code", "code_type", "address", "year"]).to_csv(
                path_cache_org_address, sep="|", index=False
            )

        # подготовка кэша адрессов
        df_cahce = pd.read_csv(path_cache_org_address, sep="|", dtype="str")
        self.columns_cache = list(df_cahce.columns)
        df_cahce["unique"] = df_cahce["code"] + "_" + df_cahce["code_type"]
        df_cahce = df_cahce[["unique", "address"]].set_index("unique")

        self.cache_org_address = df_cahce.to_dict(orient="index")
        self.kbk_type = pd.read_excel(path_kbk_table, sheet_name="type", dtype="str")
        self.kbk_np = pd.read_excel(path_kbk_table, sheet_name="np", dtype="str")
        self.kbk_section = pd.read_excel(path_kbk_table, sheet_name="section", dtype="str")

        self.default_year_for_kbk = default_year_for_kbk

        self.address_level = [
            "country",
            "regioncity",
            "regionarea",
            "district",
            "settlement",
            "city",
            "citydistrict",
            "locality",
            "territory",
            "street",
            "plot",
            "building",
            "apartment",
            "room",
        ]

    def date_extract(self, date: str):
        if not date or date == "--.--.----" or type(date) != str:
            return None

        date = date.replace("Загрузка ...", "").strip()
        try:
            return datetime.datetime.strptime(date, "%d.%m.%Y").date()
        except ValueError:
            pass

        try:
            return datetime.datetime.strptime(date[:10], "%d.%m.%Y").date()
        except ValueError:
            pass

        try:
            return datetime.datetime.strptime(date.split()[0], "%d.%m.%Y").date()
        except ValueError:
            pass

        for key, value in dict_month.items():
            if key in date:
                date = date.replace(key, value)
                date = ".".join(date.split())

        date = date.split(".")

        if len(date) == 2:
            date = ".".join(["01"] + date)
            return datetime.datetime.strptime(date[:10], "%d.%m.%Y").date()

    def fillna_organization_level(
        self, budget_level: str, full_name_customer: str, inn_customer: str
    ):
        if type(budget_level) != str:
            budget_level = None
        elif type(budget_level) == str:
            budget_level = budget_level.lower()

        if type(full_name_customer) != str:
            full_name_customer = None
        elif type(full_name_customer) == str:
            full_name_customer = full_name_customer.lower()

        if type(inn_customer) != str:
            inn_customer = None

        if budget_level:
            for list_name, name in zip(
                [list_local, list_sub, list_fed], ["местный", "субъектовый", "федеральный"]
            ):
                for name_trigger in list_name:
                    if name_trigger.lower() in budget_level:
                        return name

        # если не получилось выделить данные из budget_level попробуем сделать это с full_name_customer
        if full_name_customer:
            for list_name, name in zip(
                [list_fed_2, list_local_2, list_sub_2, list_anothe],
                ["федеральный", "местный", "субъектовый", "иное"],
            ):
                for name_trigger in list_name:
                    if name_trigger.lower() in full_name_customer:
                        return name

            for list_name, name in zip(
                [list_fed_3, list_local_3, list_sub_3],
                ["федеральный", "местный", "субъектовый", "иное"],
            ):
                for name_trigger in list_name:
                    if name_trigger.lower() in full_name_customer:
                        return name

            if (
                "администрац" in full_name_customer
                or "комитет по управлению имуществом" in full_name_customer
            ) and not all(
                [
                    i in full_name_customer
                    for i in ["моксв", "севастопол" "президент", "санкт-петербур"]
                ]
            ):
                return "местный"

            if "городская дума" in full_name_customer and "моксв" not in full_name_customer:
                return "местный"

        if inn_customer:
            for inn_dict, name in zip(
                [inn_mun, inn_sub, inn_fed, inn_another],
                ["местный", "субъектовый", "федеральный", "иное"],
            ):
                for inn in inn_dict.keys():
                    if inn == inn_customer:
                        return name
        # добавить логги

        return None

    def extract_data_from_kbk(self, kbk, year):
        dict_kbk = {
            "code_main_admin": None,
            "code_section_sub": None,
            "code_direction_expenses": None,
            "code_type_expenses": None,
            "code_national_project": None,
            "value_code_section": None,
            "value_code_sub": None,
            "value_code_type_expenses": None,
            "name_national_project": None,
            "name_fed_national_project": None,
        }
        if not kbk or type(kbk) != str:
            return dict_kbk

        if len(kbk) == 3 or kbk[:-3] == "0" * 17:
            code_type_expenses = kbk[-3:]
            value_code_type_expenses = kbk_type.loc[
                kbk_type.code == code_type_expenses, "mean"
            ].to_list()

            if len(value_code_type_expenses):
                dict_kbk["value_code_type_expenses"] = value_code_type_expenses[0]
            else:
                pass
                # логи
            dict_kbk["code_type_expenses"] = code_type_expenses
            return dict_kbk

        elif len(kbk) == 20:
            kbk_search = re.compile(r"(\S\S\S)(\S\S\S\S)(\S\S\S\S\S\S\S\S\S\S)(\S\S\S)")
            kbk_find = kbk_search.search(kbk)

            # код главного распоредителя бюджетных средств
            code_main_admin = kbk_find.group(1)
            # print('Код ГРС:', code_main_admin)
            # код раздела и подраздела
            code_section_sub = kbk_find.group(2)
            # print('Код раздела и подраздела:', code_section_sub)
            # код целевой статьи
            code_direction_expenses = kbk_find.group(3)
            # print('Код целевой статьи:', code_direction_expenses)
            # код вида расходов
            code_type_expenses = kbk_find.group(4)
            # print('Код вида расходов:', code_type_expenses)
            # код национального проекта
            code_national_project = (
                code_direction_expenses[3:5] if not code_direction_expenses[3].isdigit() else None
            )
            # print('Код национального проекта:', code_national_project)

            value_code_section = kbk_section.loc[
                (kbk_section.year == year) & (kbk_section.code == code_section_sub[:2]), "mean"
            ].to_list()
            if len(value_code_section):
                dict_kbk["value_code_section"] = value_code_section[0]
            else:
                pass
            # print('value_code_section:', value_code_section)

            value_code_sub = kbk_section.loc[
                (kbk_section.year == year) & (kbk_section.code == code_section_sub), "mean"
            ].to_list()
            if len(value_code_sub):
                dict_kbk["value_code_sub"] = value_code_sub[0]
            else:
                pass
            # print('code_type_expenses:', value_code_sub)

            value_code_type_expenses = kbk_type.loc[
                kbk_type.code == code_type_expenses, "mean"
            ].to_list()
            if len(value_code_type_expenses):
                dict_kbk["value_code_type_expenses"] = value_code_type_expenses[0]
            else:
                pass

            # print('value_code_type_expenses:', value_code_type_expenses)
            if code_national_project:
                list_national_project = kbk_np.loc[
                    (kbk_np.year == year) & (kbk_np.code == code_national_project),
                    ["name_national_project", "name_fed_national_project"],
                ].values
                # print('list_national_project:', list_national_project)

                if len(list_national_project):
                    dict_kbk["name_national_project"] = list_national_project[0]
                    dict_kbk["name_fed_national_project"] = list_national_project[1]
                else:
                    pass
                    # логи

            dict_kbk["code_main_admin"] = code_main_admin
            dict_kbk["code_section_sub"] = code_section_sub
            dict_kbk["code_direction_expenses"] = code_direction_expenses
            dict_kbk["code_type_expenses"] = code_type_expenses
            dict_kbk["code_national_project"] = code_national_project

            return dict_kbk

        else:
            # print(kbk)
            return dict_kbk
            # добавить логи

    def check_address(self, address: str, code: str, code_type: str):
        if type(code) != str or code == "":
            return address
        if type(code_type) != str or code_type == "":
            return address
        if type(address) != str:
            address = ""

        list_check = ["Российская Федерация", "РФ", "обл", "ул", "край", "г,", "п."]
        is_nan = type(address) == float
        is_telephon = address.replace("-", "").replace(" ", "").isdigit()
        is_email = ("@" in address) and not any([i in address for i in list_check])
        is_empty_string = address == ""

        need_replace = any([is_nan, is_telephon, is_email, is_empty_string])

        if not need_replace:
            return address
        else:
            unique = code + "_" + code_type
            return self.cache_org_address.get(unique)

    def processing_date_org(self, df: Series, columns: list):
        dict_result = {}
        # если мы получает nan при обращении к дате в DataFrame, то
        # np.nan == nan возращает False, однако у nan type float, когда у всех дат str
        # unique_code = df["code"] + df['code_type']
        date_list = [i for i in columns if "date" in i]
        for date_type in date_list:
            dict_result[date_type] = date_extract(df[date_type])
        for date_type, prefix_addres in zip(["address_customer", "postal_address"], ["c", "pc"]):
            dict_result_address = self.address_dec.address_decompose(df[date_type])
            for key, value in dict_result_address.items():
                dict_result[f"{prefix_addres}_{key}"] = value
        dict_result["organization_level"] = fillna_organization_level(
            df["organization_level"], df["full_name_customer"], df["inn_customer"]
        )

        return pd.Series(dict_result)[columns].to_list()

    def processing_date_contract(self, df: Series, columns: list):
        dict_result = {}
        year = self.default_year_for_kbk
        date_list = [i for i in columns if "date" in i]

        for date_type in date_list:
            dict_result[date_type] = date_extract(df[date_type])

        for address_type, prefix_addres in zip(
            ["address_customer", "address_supplier", "postal_address_supplier"], ["c", "s", "ps"]
        ):
            address = df[address_type]
            if address_type == "address_customer":
                address = self.check_address(address, code=df["code"], code_type=df["code_type"])

            dict_result_address = self.address_dec.address_decompose(address)
            for key, value in dict_result_address.items():
                dict_result[f"{prefix_addres}_{key}"] = value

        dict_result["organization_level"] = fillna_organization_level(
            df["budget_level"], df["full_name_customer"], df["inn_customer"]
        )

        for date in [
            "date_contract",
            "date_summarizing",
            "date_posting",
            "date_performance",
            "date_end_performance",
            "date_contract_registry",
        ]:
            if type(dict_result[date]) == datetime.date:
                year = str(dict_result[date].year)
                break

        dict_result.update(extract_data_from_kbk(df["kbk"], year))

        return pd.Series(dict_result)[columns].to_list()

    def add_new_address_to_cache(self, df: DataFrame):
        df_address = df[["address_customer", "code", "code_type"]]
        for index in df_address.index:
            address = df_address.loc[index, "address_customer"]
            code = df_address.loc[index, "code"]
            code_type = df_address.loc[index, "code_type"]
            if type(code) != str or type(code_type) != str or type(address) != str:
                continue

            unique = code + "_" + code_type
            if not unique in self.cache_org_address:
                self.cache_org_address[unique] = address

                pd.DataFrame(
                    {
                        "address": [address],
                        "code": [code],
                        "code_type": [code_type],
                        "year": [self.default_year_for_kbk],
                    },
                    index=[0],
                )[self.columns_cache].to_csv(
                    self.path_cache_org_address, mode="a", index=False, header=False, sep="|"
                )

    def run_org(self, path_input: str, path_output: str):
        df = pd.read_csv(path_input, sep="|", dtype="str")
        columns = [
            "date_registration",
            "date_last_change",
            "date_registration_tax",
            "date_iky",
            "organization_level",
        ] + [
            prefix + "_" + address_level
            for prefix in ["c", "pc"]
            for address_level in self.address_level
        ]
        df[columns] = df.apply(
            lambda x: self.processing_date_org(x, columns=columns), axis=1, result_type="expand"
        )
        self.add_new_address_to_cache(df)
        df.to_csv(path_output, sep="|", index=False)

    def run_contract(self, path_input: str, path_output: str):
        df = pd.read_csv(path_input, sep="|", dtype="str")
        columns = [
            "date_summarizing",
            "date_posting",
            "date_contract",
            "date_performance",
            "date_contract_registry",
            "date_update_registry",
            "date_start_performance",
            "date_end_performance",
            "date_registration_supplier",
            "code_main_admin",
            "code_section_sub",
            "code_direction_expenses",
            "code_type_expenses",
            "code_national_project",
            "value_code_section",
            "value_code_sub",
            "value_code_type_expenses",
            "name_national_project",
            "name_fed_national_project",
            "organization_level",
        ] + [
            prefix + "_" + address_level
            for prefix in ["c", "s", "ps"]
            for address_level in self.address_level
        ]
        df[columns] = df.apply(
            lambda x: self.processing_date_contract(x, columns=columns),
            axis=1,
            result_type="expand",
        )
        df.to_csv(path_output, sep="|", index=False)

In [97]:
processing_date = ProcessingData(
    path_cache_address="../data/cache/cache_address.csv",
    path_cache_org_address="../data/cache/cache_org_address.csv",
    path_kbk_table="../data/kbk.xlsx",
    default_year_for_kbk="2014",
)

In [98]:
path_input = "../data/raw_data/org/2014/"
output_path = "../data/processed_data/org/2014/"

for file_name in tqdm(sorted(os.listdir(path_input), key= lambda x: int(x.removesuffix('.csv')))):
    file_name_input = os.path.join(path_input, file_name)
    file_name_output = os.path.join(output_path, file_name)
    processing_date.run_org(file_name_input, file_name_output)

  3%|▎         | 2/60 [04:29<2:10:28, 134.98s/it]


KeyboardInterrupt: 

In [77]:
input_path = "../data/raw_data/contract/2014/0.csv"
output_path = "../data/processed_data/contract/2014/0.csv"
processing_date.run_contract(input_path, output_path)

In [81]:
df = pd.read_csv(input_path, sep="|", dtype=str)

In [85]:
df_new = df[df.number_contract == '0111300015714000030']

In [90]:
processing_date.check_address(df_new['address_customer'].values[0], code=df_new['code'].values[0], code_type=df_new['code_type'].values[0])

'kazansport@mail.ru'

In [92]:
df_new['code'].values[0] + '_' + df_new['code_type'].values[0]

'786179_Id'

In [93]:
processing_date.cache_org_address[df_new['code'].values[0] + '_' + df_new['code_type'].values[0]]

KeyError: '786179_Id'