## Загрузка данных и подготовка к анализу

### Установка необходимых библиотек и плагинов

### Загрузка библиотек

In [None]:
import os

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

import psycopg2
from sqlalchemy import create_engine 
from psycopg2 import OperationalError


from datetime import datetime, timedelta, date
from dateutil.relativedelta import relativedelta
import calendar
import time

from itertools import cycle
from IPython.display import display_html

import pathlib2 as pl2
import gc

### Дополнительные настройки

In [None]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

pd.set_option('chained_assignment',None)

## Определение функций

In [None]:
def loaded_region_grouped(from_parquet1, from_parquet2, from_parquet3, regions, batches = 100):

    organizations = pd.DataFrame()
    houses = pd.DataFrame()
    payment_documents = pd.DataFrame()

    if from_parquet1:
        organizations = pd.read_csv(PATH_DOWNLOAD + 'organizations1.csv')
    else:
        for i, region in enumerate(tqdm(regions, ncols=80)):

            data = dataloader(collect_query_regions(region), db_config=db_config_analytic)
            organizations = pd.concat([organizations, data])
            print(f"Region: {region} загружен. Df shape: {organizations.shape}")
        
        organizations.to_csv(PATH_DOWNLOAD + 'organizations1.csv')
    print(f"Organizations shape: {organizations.shape}")
    
    fias_houses = organizations['fias_house_code'][organizations['fias_house_code'].notnull()].unique().tolist()
    accounts = organizations['sha_guid'][organizations['sha_guid'].notnull()].unique().tolist()
    
    houses_list = "'" + "','".join(fias_houses) + "'"
    print(f"Размер houses_list: {len(fias_houses)}. Размер accounts_list: {len(accounts)}")
        
    if from_parquet2:
        houses = pd.read_parquet(PATH_DOWNLOAD + 'houses1.parquet')
    else:
        for i, region in enumerate(tqdm(regions, ncols=80)):

            data = dataloader(collect_query_rso_houses(houses_list), db_config=db_config_analytic)
            houses = pd.concat([houses, data])
            print(f"Region: {region} загружен. Df shape: {houses.shape}")
        
        houses.to_parquet(PATH_DOWNLOAD + 'houses1.parquet')
    print(f"Houses shape: {houses.shape}")
  
    if from_parquet3:
        payment_documents = pd.read_parquet(PATH_DOWNLOAD + 'payment_documents2.parquet')
    else:
        if len(accounts) % batches:
            block_size = int(len(accounts) / batches)
            batches += 1
        else:
            block_size = int(len(accounts) / batches)

        for i, batch in enumerate(tqdm(range(batches), ncols=80)):
            
            account_list = accounts[i * block_size: block_size * (i + 1)]
            accounts_list = "'" + "','".join(account_list) + "'"
            data = dataloader(collect_query_epdc(accounts_list), db_config=db_config_analytic)
            payment_documents = pd.concat([payment_documents, data])
            print(f"Batch: {i}. Payment_documents shape: {payment_documents.shape}") 
            
        payment_documents.to_parquet(PATH_DOWNLOAD + 'payment_documents2.parquet')
    print(f"Payment_documents shape: {payment_documents.shape}")
        
    data = (
        organizations
        .merge(houses, how='left', on='fias_house_code')
        .merge(payment_documents, how='left', left_on='sha_guid', right_on='account_guid')
    )
    return data

In [None]:
def day_ranges(last_month_day=30):
    to_list = []

    for input_from, input_to, from_next, to_next, allow in zip(device_metering2['input_day_from'], device_metering2['input_day_to'], device_metering2['input_day_from_next_month'], device_metering2['input_day_to_next_month'], device_metering2['allow_transfer_any_day']):
        
        if allow:  #TO DO проверить, что адекватно работает!!!!!!!!!!!!!!
            day_list = list(range(1, last_month_day + 1)) 
            to_list.append(day_list)
        elif (((from_next == False) and (to_next == False)) and (input_to > last_month_day)) or ((from_next == True) and (to_next == True) and (input_to > last_month_day)):
            if input_to > 2 * last_month_day:
                day_list = list(range(input_from, last_month_day + 1)) + list(range(1, input_to - 2 * last_month_day + 1))
            else:
                day_list = list(range(input_from, last_month_day + 1)) + list(range(1, input_to - last_month_day + 1))
            to_list.append(day_list)
        elif ((from_next == False) and (to_next == False)) or ((from_next == True) and (to_next == True)):
            to_list.append(list(range(input_from, input_to + 1)))
        elif ((from_next == False) and (to_next == True)):
            day_list = list(range(input_from, last_month_day + 1)) + list(range(1, input_to - last_month_day + 1))
            to_list.append(day_list)
        elif ((from_next == True) and (to_next == False)):
            day_list = list(range(input_to - last_month_day, input_from + 1))
            to_list.append(day_list)

    return to_list

In [None]:
def create_connection(db_config):
    """
    Создание подключения к базе данных PostgreSQL
    :db_config: параметры подключения
    :return: string
    """
    connection = None
    try:
        connection = psycopg2.connect(
            database=db_config['db'],
            user=db_config['user'],
            password=db_config['pwd'],
            host=db_config['host'],
            port=db_config['port'],
        )
        print("\nПодключение к базе данных PostgreSQL прошло успешно")
    except psycopg2.OperationalError as e:
        print(f"Произошла ошибка '{e}'")
    return connection

In [None]:
def fetcher(query, db_config):
    """
    Выгрузка данных из POSTGRESQL
    :query: POSTGRESQL запрос
    :db_config: параметры подключения
    :return: pandas.DataFrame
    """
    connection = create_connection(db_config)
    records = None
    try:
        with connection:
            with connection.cursor() as cursor:
                start = datetime.now()
                print(f'Старт расчета: {start}')

                cursor.execute(query)
                print(f"Время выполнения execute {(datetime.now() - start)} сек")
                middle = datetime.now()

                result = cursor.fetchall()
                print(f"result: {len(result)}. Длина первой строки: {len(result[0])}")
                print(f"Время выполнения fetchall {(datetime.now() - middle)} сек")
                middle1 = datetime.now()

                column_names = [description[0] for description in cursor.description]
                data = pd.DataFrame(data=result, columns=column_names)

                print(f"Время формирования датафрейма {(datetime.now() - middle1)} сек. Shape: {data.shape}")
                print(f'Общее время выгрузки: {datetime.now() - start}')
    except Exception as err:
        print(f"Ошибка при выполнении запроса: {err}")
    finally:
        cursor.close()
        connection.close()
    return data

In [None]:
def dataloader(query, db_config):
    """
    Повтор загрузки данных в случае сбоя
    :query: POSTGRESQL запрос
    :return: pandas.DataFrame
    """
    k = 0
    not_loaded = True
    NUMBER_OF_ATTEMPTS_TO_CONNECT_TO_THE_DATABASE = 2
    TIME_SLEEP = 1

    while not_loaded:
        k = k + 1
        if k > NUMBER_OF_ATTEMPTS_TO_CONNECT_TO_THE_DATABASE:
            break
        try:
            data = fetcher(query, db_config)
            not_loaded = False
        except Exception as err:
            print(f'Exception: {err}. Произошел сбой выгрузки данных. Попытка {k}. Засыпаем на {TIME_SLEEP / 60} мин')
            time.sleep(TIME_SLEEP)
            not_loaded = True
    else:
        print(f"Выгрузка данных прошла без сбоя. Попытка №{k} из {NUMBER_OF_ATTEMPTS_TO_CONNECT_TO_THE_DATABASE}.")
        return data

In [None]:
def missing_values(df, with_min_max=False):
    """
    Вычисление аномалий в датасете
    :df: pandas.DataFrame, в котором ищем аномалии
    :with_min_max: флаг - включать минимальные и максимальные значения или нет
    :return: pandas.DataFrame
    """
    
    "TO DO: добавить None, 0 как строка"
    
    columns = [
            'NaN_part, %', '-1_counts', 'empty_counts', 'space_counts', '0_counts', 'unique_counts',
            'dupl_sum', 'dtypes', 'length'
        ]
    if with_min_max:
        columns += ['min_value', 'max_value']

    data = pd.DataFrame(columns=columns, index=df.columns)
    
    for column in df.columns:
        data['NaN_part, %'][column] = df[column].isnull().mean() * 100
        data['-1_counts'][column] = df[df[column] == -1][column].count()
        data['empty_counts'][column] = df[df[column] == ''][column].count()
        data['space_counts'][column] = df[df[column] == ' '][column].count()
        data['0_counts'][column] = df[df[column] == 0][column].count()
        data['unique_counts'][column] = len(df[column].unique())
        if with_min_max:
            data['min_value'][column] = df[column].min()
            data['max_value'][column] = df[column].max()
        data['dupl_sum'][column] = df[column].duplicated().sum()
        data['dtypes'][column] = df[column].dtypes
        data['length'][column] = len(df[column])
    return data 

In [None]:
def create_addresses(df1, columns_data):
    """
    Формирование полного адреса домовладения
    :df: pandas.DataFrame, в котором ищем аномалии
    :columns: колонки, по которым собирается адрес домовладения
    :return: pandas.DataFrame
    """
    
    columns = [
        'addr_region_fullname', 
        'addr_area_abbr', 'addr_area_fullname', 
        'addr_city_abbr', 'addr_city_fullname',  
        'addr_ctar_abbr', 'addr_ctar_fullname', 
        'addr_place_abbr', 'addr_place_fullname',  
        'addr_street_abbr', 'addr_street_fullname',  
        'addr_extr_abbr', 'addr_extr_fullname',  
        'addr_sext_abbr', 'addr_sext_fullname',  
        'addr_house_abbr','addr_house_fullname',  
        'addr_postal_code'
    ]
    
    df = df1[columns_data + columns]
    
    df['addr_postal_code'] = pd.to_numeric(df['addr_postal_code']).fillna(-1).astype(int)
    df = df.fillna(value=np.nan)
    df[columns] = df[columns].astype('str')
    df['addr_postal_code'] = np.where(df['addr_postal_code'] == '-1', 'nan', df['addr_postal_code'])
    
    def municipality(df, columns):

        suffix_common = ', '
        suffix_dot = '. '

        def apply_function(row, suffix_common, suffix_dot, columns):
            address = ''
            for column in columns:
                if row[column] != 'nan':
                    if column.endswith('_abbr'):
                        address = address + row[column] + suffix_dot
                    else:
                        if column in columns[-1]:
                            address = address + row[column]
                        else:
                            address = address + row[column] + suffix_common
            return address

        df['Адрес_домовладения'] = df.apply(apply_function, args=(suffix_common, suffix_dot, columns), axis=1)
        return df

    df2 = municipality(df, columns)

    df2['Адрес_домовладения'] = df2['Адрес_домовладения'].str.rstrip(', ')
    df2['Адрес_домовладения'] = np.where(df2['Адрес_домовладения'] == '', 'nan', df2['Адрес_домовладения'])
    df2 = df2.replace('nan', np.nan)
    df2['Адрес_домовладения'] = np.where(df2['Адрес_домовладения'] == '', np.nan, df2['Адрес_домовладения'])

    return df2.drop(columns=columns)

In [None]:
def drop_duplicate_rows(df):
    """
    Удаление дубликатов
    :df: pandas.DataFrame, в котором ищем дубликаты
    :return: pandas.DataFrame
    """
    duplicateRows = df[df.duplicated()]
    df = df[~df.index.isin(duplicateRows.index)]
    print(f"Количество дубликатов: {duplicateRows.shape}, Итоговый размер датафрейма: {df.shape}")
    return df

In [None]:
def plot_optimum_day(list_1D_30, days1, list_1D_31, days2):
    """
    Вывод графика частотного распределения ввода показаний ПУ
    :list_1D_30: list, распределение показаний для случая 30 дней в месяце
    :days1: int, количество дней в месяце для списка list_1D_30
    :list_1D_31: list, распределение показаний для случая 31 дней в месяце
    :days2: int, количество дней в месяце для списка list_1D_31
    :return: pandas.DataFrame
    """
    figsize=(20, 10)
    plt.figure(figsize=figsize)
    
    #ax1 = plt.subplot(2, 1, 1)
    fig, axs = plt.subplots(1, 2, sharey=True, tight_layout=True)
    
    axs[0].hist(list_1D_30, bins=days1)
    axs[0].legend()
    axs[0].set_xlabel('30-дневный месяц')
    axs[0].set_ylabel('Количество')
    #axs[0].set_title(f'Частотное распределение ввода показателей')
    axs[0].set_xlim([0, days1])
      
    axs[1].hist(list_1D_31, bins=days2)
    axs[1].legend()
    axs[1].set_xlabel('31-дневный месяц')
    #axs[1].set_title(f'Частотное распределение ввода показателей')
    axs[1].set_xlim([0, days2])
    
    fig.suptitle(' Распределение дней ввода показателей по месяцам ', fontsize=15)
    
    plt.savefig('my_plot.png')
    
    plt.show()

In [None]:
def processing_data(df):
    df['По нормативу'] = np.where(df['individual_consumption_type'] == 'NORM', 1, 0)

    df['По ПУ'] = np.where(
        (df['individual_consumption_type'] == 'METERING_DEVICE')
        & (df['device_guid'].isnull()),
        1, 0
    )

    df1 = (
        df
        .groupby(by=['fias_region_code', 'organization_short_name', 'inn', 'kpp', 'org_email', 'url', 'phone'])
        .agg({'По нормативу': 'sum', 'По ПУ': 'sum'})
        .reset_index()
    )
    return df1

## Определение запросов

In [None]:
def collect_query_rso_houses(houses):
    query = f"""
        WITH temp_addresses_parts as (
            select  distinct on (1) fias_house_code, region_fullname, 
                area_fullname, area_abbr, 
                city_abbr, city_fullname, 
                ctar_abbr, ctar_fullname, 
                place_abbr, place_fullname, 
                street_abbr, street_fullname, 
                extr_abbr, extr_fullname, 
                sext_abbr, sext_fullname, 
                house_abbr, house_fullname, 
                planstruct_abbr, planstruct_fullname, 
                postal_code
            from snsi.nsi_addresses
            where entity_is_actual
            order by 1, last_editing_date desc
        ),        
        temp_addresses_whole AS (
            select 
                fias_house_code
                ,MAX(
                    COALESCE(region_fullname||', ', '') 
                    ||COALESCE(area_abbr||'. ', area_fullname, '')||COALESCE(area_fullname||', ', '')      --возможно надо будет подправить, дублируется area_fullname
                    ||COALESCE(city_abbr||'. ', '')               ||COALESCE(city_fullname||', ', '')
                    ||COALESCE(ctar_abbr||'. ', '')               ||COALESCE(ctar_fullname||', ', '')
                    ||COALESCE(place_abbr||' ', '')               ||COALESCE(place_fullname||', ', '')
                    ||COALESCE(street_abbr||' ', '')              ||COALESCE(street_fullname||', ', '')
                    ||COALESCE(extr_abbr||' ', '')                ||COALESCE(extr_fullname||', ', '')
                    ||COALESCE(sext_abbr||'. ', '')               ||COALESCE(sext_fullname||', ', '')
                    ||COALESCE(house_abbr||'. ', '')              ||COALESCE(house_fullname||', ', '')
                    ||COALESCE(planstruct_abbr||' ', '')          ||COALESCE(planstruct_fullname||', ', '')
                    ||COALESCE(postal_code||'.', '') 
                ) AS full_address_string --адресная строка стандартная
            from temp_addresses_parts
            group by fias_house_code
        )
        WITH temp_data AS (                   
            select
                arh.fias_house_code
                ,arh.agreement_guid
                --,nms.nsi_3_code                       --посмотреть snsi.nsi_municipal_services и сопоставить nsi_3_code and nsi_2_code с metering_input_days. Присоединять уже без этого параметра
                ,arhs.supply_period_start               --договор на подключение к сети? 
                ,arhs.supply_period_end                 --договор на подключение к сети?
                ,case 
                    when (arhs.supply_period_start IS NULL OR arhs.supply_period_start <= current_date) 
                        and (arhs.supply_period_end IS NULL OR arhs.supply_period_end > current_date) 
                    then TRUE else FALSE
                end as contractor                        --Исполнитель КУ. Считаем, что если физически поставляет ресурс, то исполнитель
               ,ROW_NUMBER() OVER(
                   PARTITION BY arh.fias_house_code, arhs.nsi_3_code 
                   ORDER BY arh.last_editing_date DESC
                ) AS row_num
                ,nms.municipal_service_name
            FROM sagr.agr_rso_houses as arh
                JOIN sagr.agr_rso_house_services arhs on arh.guid = arhs.rso_house_guid 
                            AND arh.fias_house_code in ({houses})
                            AND arh.nsi_26_code = '1'                                                           --МКД
                            --AND arh.region_code in ('')
                            AND (arhs.supply_period_end IS NULL OR arhs.supply_period_end > current_date)
                            AND (arhs.supply_period_start IS NULL OR arhs.supply_period_start <= current_date)
                            AND arhs.nsi_3_code in ('6')                                                        --отопление
                JOIN snsi.nsi_municipal_services nms on nms.nsi_3_code = arhs.nsi_3_code
        )
        SELECT 
            td.fias_house_code
            ,array_agg(td.supply_period_start) supply_period_start
            ,array_agg(case when td.supply_period_end > '2100-01-01' then '2100-01-01'::date else td.supply_period_end end) supply_period_end
            ,bool_or(td.contractor) contractor
            ,array_agg(COALESCE(ard.billing_deadline_months::varchar||', ', '')||COALESCE(ard.billing_deadline_next_months::varchar, ''))  billing_deadline           --срок предоставления ПД
            ,max(td.municipal_service_name) municipal_service_name
        FROM temp_data AS td
            LEFT JOIN sagr.agr_rso_documents ard on ard.document_guid = td.agreement_guid 
                        AND ard.entity_is_actual = True
                        AND (ard.finish_date is null or ard.finish_date > current_date)
                        AND (ard.effective_date is null or ard.effective_date <= current_date)
                        AND ard.status='RUNNING'
        WHERE td.row_num = 1
        GROUP BY td.fias_house_code
    """
    return query

In [None]:
query = {
    "test": """
        SELECT *
        FROM sagr.agr_rso_rate_consumption_houses
    """,
    "ppa_organizations": """
        SELECT guid
               ,organization_short_name
               ,inn
               ,kpp
        FROM  sppa.ppa_organizations
    """,
}

## Константы

In [None]:
DB_NAME = ['hcshmdb', 'hcspafoetldb']
HOST = '172.16.214.22'
ABSOLUTE_PATH = "C:/Python/ENV/bg_venv/Scriptss/"
PATH_DOWNLOAD = ABSOLUTE_PATH + 'datasets/'
PATH_UPLOAD = ABSOLUTE_PATH + 'uploads/'

### Конфигурация подключения

In [None]:
db_config_analytic = {
    'user': 'buchkurashvili_gg',      # имя пользователя
    'pwd': 'vDlgJjRotPEgL0THC0RU',    # пароль
    'host': '172.16.3.40',            # адрес сервера для подключения
    'port': 5454,                     # порт подключения
    'db': DB_NAME[1]                  # название базы данных
}

db_config_test = {
    'user': 'buchkurashvili_gg',      # имя пользователя
    'pwd': 'A5ZFawMpODi4A9jwsqW8',    # пароль
    'host': '172.16.214.21',          # адрес сервера для подключения
    'port': 5454,                     # порт подключения
    'db': DB_NAME[0]                  # название базы данных
}

db_config_product = {
    'user': '',      # имя пользователя
    'pwd': '',    # пароль
    'host': '',          # адрес сервера для подключения
    'port': 5454,                     # порт подключения
    'db': DB_NAME[0]                  # название базы данных
}

In [None]:
def create_configs(db_config={}, db_name='', host=''):
    db_config['db'] = db_name
    db_config['host'] = host
    print(db_config)
    return db_config

## Загрузка и проверка данных

In [None]:
from_parquet = False
if from_parquet:
    df = pd.read_parquet(PATH_DOWNLOAD + 'test.parquet')
else:
    #db_config = create_configs(db_config=db_config_test, db_name=DB_NAME[2], host=HOST)
    df = dataloader(query['ppa_organizations'], db_config=db_config_analytic)
    #df = dataloader(query['test'], db_config=db_config)
    #df.to_parquet(PATH + 'TEST.parquet')
df.shape

In [None]:
from_parquet1 = True
from_parquet2 = True
from_parquet3 = False

batches = 100

regions = [                                                        
    '0c5b2444-70a0-4932-980c-b4dc0d3f02b5', #Москва
    '29251dcf-00a1-4e34-98d4-5c47484a36d4'  #Московская область
]

df = loaded_region_grouped(from_parquet1, from_parquet2, from_parquet3, regions, batches=batches)
df.shape

## Анализ данных

In [None]:
missing_values(df)

## Обработка данных

In [None]:
region_grouped_all = pd.DataFrame()

for i in range(16):
    region_grouped = pd.read_parquet(PATH_DOWNLOAD + f'{save_name}_{i}.parquet')
    region_grouped_all = pd.concat([region_grouped_all, region_grouped])

region_grouped_all.to_parquet(PATH_DOWNLOAD + f'region_grouped_all.parquet')
region_grouped_all.shape

## Исследовательский анализ данных

## Анализ результата

In [None]:
missing_values(region_grouped_all)

## Выгрузка данных

In [None]:
df1 = pd.read_excel(PATH_UPLOAD + 'Связка имя_региона-код_региона.xlsx', usecols=['region_fullname', 'region_code'])
df2 = region_grouped_all.merge(df1, how='left', left_on='fias_region_code', right_on='region_code')
df2 = df2.rename(columns={
    'region_fullname': 'Регион',
    'organization_short_name': 'Название организации',
    'inn': 'ИНН',
    'kpp': 'КПП'
    'org_email': 'Электронная почта',
    'url': 'Адрес в интернете',
    'phone': 'Телефон'
})

In [None]:
columns = [
    'Регион', 
    'Название организации', 
    'ИНН', 
    'КПП', 
    'Электронная почта',
    'Адрес в интернете',
    'Телефон',
    'По нормативу', 
    'По ПУ'
]
df2[columns].to_excel(PATH_UPLOAD + 'ПУ_Джушхинова.xlsx')

## Подвал