### Описание проекта


Программа готовит отчёт по выбросам, для загрузки и анализа в Power BI. А также датафрейм с очищенными данными

### Основные библиотеки

In [1]:
# основные библиотеки
import pandas as pd
import numpy as np

# библиотеки для графиков
import matplotlib.pyplot as plt
import seaborn as sns

#настройка, чтобы можно было просматривать все столбцы датафрейма
import matplotlib
matplotlib.rcParams.update({'font.size': 14})
pd.options.display.max_columns = 100

import pyodbc

# этот волшебный код убирает ненужные предупреждения об ошибках
import warnings
warnings.filterwarnings('ignore')

#  визуализация прогресса прохождения цикла
from tqd import tqd


In [2]:
from datetime import datetime, timedelta

### Функции загрузки данных

In [3]:
def import_s():

    '''Функция загрузки и первичной обработки df_s'''
    
    # зададим параметры соединения с базой данных
    cn = pyodbc.connect("Driver={SQL Server Native Client 11.0};Server=a;Database=d;Trusted_Connection=yes")
    # загрузим s
    df_s = pd.read_sql_query(' \
            SELECT\
            s.[Дата] \
            ,klient.[Партнер ID]  \
            ,s.[Продукт ID]  \
            ,s.[Количество] AS [Количество] \
            ,s.[Сумма] AS [СуммаRUR]  \
            ,price.БазоваяЦенаRUR  \
            ,s.Цена  \
            ,s.[Тип операции ID] \
        FROM a.[d].[s].[s] AS s \
            LEFT JOIN [d].[s].[Клиент] AS klient  \
                ON s.[Клиент ID] = klient.[Клиент ID]  \
            LEFT JOIN a.[d].Продукт.Продукт AS prod \
                ON prod.[Продукт ID] = s.[Продукт ID]  \
            LEFT JOIN Sycorax.[Analitics].[RokinaEV].[РасходныеЦеныДляСкПргП] AS price \
                ON prod.[Код 1C] = price.[КодНоменклатуры]  \
        WHERE YEAR(s.[Дата]) >= 2022\
        ORDER BY \
            klient.[Партнер ID] \
            ,s.[Дата] \
            ,[Продукт ID] \
    ', cn)
    
    # подготовим запасной столбец с первоначальным индексом
    df_s = df_s.reset_index()
    
    # изменим формат поля Дата на datetime
    df_s['Дата_'] = pd.to_datetime(df_s['Дата'], format='%Y-%m-%d')
    # добавляем поле месяц
    df_s['Месяц'] = df_s['Дата_'].dt.floor('d')+pd.offsets.MonthEnd(0)-pd.offsets.MonthBegin(1)
    # Добавляем год для группировки
    df_s['Год'] = pd.DatetimeIndex(df_s['Дата']).year
    
    return df_s
    

In [4]:
def import_kn():
    
    '''Функция загрузки классификатора'''
    
    # зададим параметры соединения с базой данных
    cn = pyodbc.connect("Driver={SQL Server Native Client 11.0};Server=a;Database=d;Trusted_Connection=yes")
    
    # загрузка классификатора номенклатуры
    df_kn = pd.read_sql_query(' \
        SELECT  \
        prod.[Продукт ID] \
        ,prod.[Код 1C] AS КодНоменклатуры \
        ,prod.[Код Md] \
        ,prod.[Артикул] \
        ,prod.[Продукт] \
        ,prod.[ДДП ID] \
        ,ddp.Категория \
        ,ddp.Направление \
        ,ddp.Группа \
        ,ddp.Подгруппа \
      FROM [d].[Продукт].[Продукт] AS prod \
      LEFT JOIN [d].[Продукт].[ДДП] AS ddp \
        ON prod.[ДДП ID] = ddp.[ДДП ID] \
                              ', cn)
     
    return df_kn

In [5]:
def import_kontragent_all():
    
    '''Функция загрузки df_kontragent'''
    
    # зададим параметры соединения с базой данных
    kk = pyodbc.connect("Driver={SQL Server Native Client 11.0};Server=a;Database=d;Trusted_Connection=yes")
    
    # загрузим датафрейм
    df_kontragent = pd.read_sql_query('\
        SELECT\
            contr.[Контрагент ID] AS [Контрагент ID]\
            ,dim.PartnerCod_1C AS КодКонтрагента\
            ,contr.Контрагент  AS НаименованиеКонтрагента\
        \
        FROM a.[d].Контрагенты.Контрагент AS contr\
        \
        LEFT JOIN a.[FRAMBOISE].[dwh].[DimPartner] AS dim\
            ON dim.PartnerKey_1C = contr.[GUID 1C]\
        \
        ORDER BY\
            contr.[Контрагент ID]', kk)
    return df_kontragent

In [6]:
def import_status():
    
    '''Функция загрузки df_status'''
    
    # зададим параметры соединения с базой данных
    kk = pyodbc.connect("Driver={SQL Server Native Client 11.0};Server=Sycorax;Database=Analiticsdp;Trusted_Connection=yes")
    
    # загрузим датафрейм
    df_kontragent = pd.read_sql_query('\
       SELECT [Контрагент ID]\
      ,[Контрагент]\
      ,[Актуальные_s]\
      FROM [Analiticsdp].[dbo].[sстатусы_партнёров]', kk)
    return df_kontragent

In [7]:
def import_all_df():
    '''Функция загружает все таблицы'''
    df_s = import_s()
    df_kn_ = import_kn()
    df_kontragent = import_kontragent_all()
    df_status = import_status()

    return df_s, df_kn_, df_kontragent, df_status

### Функции обработки данных

In [8]:
def price_calculation(df_order_, limit_tru_price):
    '''Функция добавляет в отчёт столбец с расчётной ценой'''
    
    # формируем новые поля
    df_order_['Цена_расчёт'] = df_order_['СуммаRUR'] / df_order_['Количество']
    df_order_['Цена_расчёт/Баз_цена'] = df_order_['Цена_расчёт'] / df_order_['БазоваяЦенаRUR']
    
    # расчет отношения для отсутствующей базовой цены
    # в этом случае используем цену из s
    df_order_.loc[df_order_['Цена_расчёт/Баз_цена'].isna(), 'Цена_расчёт/Баз_цена'] = df_order_['Цена_расчёт'] / df_order_['Цена']
    
    # формируем и заполняем поле подозрительной цены
    df_order_['Подозрительная_цена'] = 0
    df_order_.loc[(df_order_['Цена_расчёт/Баз_цена'] >= limit_tru_price)|
                (df_order_['Цена_расчёт/Баз_цена'] <= 1/limit_tru_price)&
                ((df_order_['Цена_расчёт/Баз_цена'] != 0)), \
                'Подозрительная_цена'] = 1
    
    return df_order_

In [9]:
def sku_blowout(df_s, sku, q_max):

    '''Функция формирует поле с выбросами в указанном столбце для указанного sku по заданным квантилям'''
    df_sku = df_s.loc[df_s['Продукт ID']==sku]
    # вычисляем значения квантили в заданном столбце
    q_max_rur = df_sku['СуммаRUR'].quantile(q_max)
    
    df_s.loc[(df_s['Продукт ID']==sku)&\
              (df_s['СуммаRUR'].abs() >= q_max_rur),\
              'Подозрительно_RUR'] = 1    
 
    q_max_ = df_sku['Количество'].quantile(q_max)
    
    df_s.loc[(df_s['Продукт ID']==sku)&
              (df_s['Количество'].abs() >= q_max_),\
              'Подозрительно_количество'] = 1
    
    return df_s

In [10]:
def cycle_sku_blowout(df_s, q_max):
    
    '''Функция проходится циклом поиска выбросов для каждой SKU'''
    df_s['Подозрительно_RUR'] = 0
    df_s['Подозрительно_количество'] = 0
    
    for i in tqd(srted(df_s['Продукт ID'].unique())):
        df_s = sku_blowout(df_s, i, q_max)
    return df_s

In [11]:
def row_quant(df_, column_name, q_max):

    '''Функция формирует поле с выбросами в указанном столбце по заданным квантилям'''
    # подходит только для котлового метода
    
    # вычисляем значения квантили в заданном столбце
    q_max = df_[column_name].quantile(q_max)
    
    # формируем условия
    conditions = (df_[column_name].abs() >= q_max)
    
    new_column_name = 'Подозрительно_' + column_name + '_котловой_метод'
    df_[new_column_name] = 0
    df_.loc[conditions, new_column_name] = 1
    
    return df_

In [12]:
def quantity_zero(df_):
    
    '''Функция создает новое поле с признаком  Количество=0 при СуммаRUR != 0'''

    df_['Количество=0'] = 0
    df_.loc[(df_['Количество']==0)&(df_['СуммаRUR']!=0), 'Количество=0'] = 1
    return df_

In [13]:
# считаю целесообразным применять эту функцию только для отчета по ошибкам
def rur_zero(df_order_):
    
    '''Функция создает новое поле с признаком СуммаRUR=0 при Количество!=0'''
    df_order_['СуммаRUR=0'] = 0
    df_order_.loc[(df_order_['СуммаRUR']==0) & 
                  ((df_order_['Количество']!=0)), 'СуммаRUR=0'] = 1
    return df_order_

In [14]:
def free_memory(df_kn_, df_kontragent_):
    # удаляем df_kn для освобождения памяти
    df_kn.drop(df_kn.index, inplace=True)
    # удаляем df_kontragent для освобождения памяти
    df_kontragent.drop(df_kontragent.index, inplace=True)

    return df_kn_, df_kontragent_

In [15]:
def make_processing(df_s, df_kn_, df_kontragent_, q_max=0.999975):
    '''Функция обработки данных'''
    # добавляем данные о продукте
    df_s = df_s.merge(df_kn_, on= ['Продукт ID'], how='left')

    '''Выжный шаг. Запускаем цикл расчёта выбросов по каждому sku в отдельности'''
    # запускаем цикл для квантили 0.999975
    df_s = cycle_sku_blowout(df_s, q_max)

    #Следующий шаг. Добавляем данные о партнёре
    df_s = df_s.merge(df_kontragent_, left_on = ['Партнер ID'], \
                          right_on = ['Контрагент ID'], how='left')

    # Следующий шаг. Добавляем в общий файл расчетную цену, отношение цен, и признак Подозрительная цена, где отношение цен более заданного значения.
    # добавляем в df_s расчетную цену
    # и создаем критерий Подозрительная_цена
    limit_tru_price = 5 # отношение цен не более данного значения, если больше-то подозрительно
    df_s = price_calculation(df_s, limit_tru_price)

    # Добавляем критерии подозрительных s по руб и кол-ву котловым Функцияом
    # формируем поле с выбросами по количеству котловым Функцияом
    df_s = row_quant(df_s, 'Количество', q_max)
    
    # формируем поле с выбросами по СуммаRUR котловым Функцияом
    df_s = row_quant(df_s, 'СуммаRUR', 0.999975)

    # создаём поле Количество=0
    df_s = quantity_zero(df_s)
    
    # удалим таблицы для освобождения памяти
    df_kn_, df_kontragent_ = free_memory(df_kn_, df_kontragent_)

    return df_s, df_kn_, df_kontragent_


### Функции отчёта по выбросам

In [16]:
def sorder(df_s):
    '''Функция формирует отчёт по выбросам'''
    # формируем условия отбора для отчёта о выбросах
    selection_conditions = \
        (df_s['Подозрительно_RUR'] ==1)\
        |(df_s['Подозрительно_количество'] ==1)\
        |(df_s['Подозрительная_цена'] ==1)\
        |(df_s['Подозрительно_Количество_котловой_Функция'] ==1)\
        |(df_s['Подозрительно_СуммаRUR_котловой_Функция'] ==1)\
        |(df_s['Количество=0'] == 1)
    
    # формируем датафрейм с выбросами для последующей выгрузки
    df_order = df_s.loc[selection_conditions]

    # формируем поле СуммаRUR=0
    df_order = rur_zero(df_order)

    #  подтверждение того, что в отчёте подозрительных s нет пропусков
    print('Проверяем, что бы в отчёте подозрительных s небыло пропусков')
    display(df_order.isna().sum())

    print('Смотрим количество полученных строк', 'df_s', df_s.shape, 'df_order', df_order.shape)

    # смотрим количество строк по критериям подозрительности
    print('смотрим количество строк по критериям подозрительности')
    display(df_order[['Подозрительно_RUR', 'Подозрительно_количество',  
            'Подозрительная_цена',
            'Подозрительно_Количество_котловой_Функция', 'Подозрительно_СуммаRUR_котловой_метод', 
            'Количество=0', 'СуммаRUR=0']].sum())
    
    return df_order

In [17]:
def save_order(file_name='oder_blowout_'):
    '''Функция выгружает отчёт c выбросами в файл csv'''
    PREPARED_DATASET_PATH = 'C:/Users/lazarevnv/Desktop/projectspython/project_swarning_filter/order_blowout/{name}.csv'
    df_order.to_csv(PREPARED_DATASET_PATH, index=False)

### Функции создания df_clean таблицы без ошибочных s 

In [18]:
def cycle_index_for_delete_all(df_s, dict_from_delete):
    
    '''Функция формирует общий список индексов тех строк которые мы потом будем удалять'''
    # формируем пустой общий список индексов тех строк которые мы потом будем удалять
    list_index_for_delete_all = []
    n=0
    # пройдёмся циклом по словарю удаления и сформируем общий список индексов
    for kontr_id in dict_from_delete:
        for sku_id in dict_from_delete[kontr_id]:
            list_index_for_delete = df_s.loc[(df_s['Контрагент ID']==kontr_id)&(df_s['Продукт ID']==sku_id), 'index'].to_list()
            list_index_for_delete_all.extend(list_index_for_delete)
            n += 1
            print('Номер цикла:', n)
        print('Новый контрагент', '\n\n')
    return list_index_for_delete_all

In [19]:
def sclean(df_s, actual_date = '2000-01-01'):
    '''Функция формирует датафрейм только с нужными очищенными s'''

    # загружаем статусы
    df_status = import_status()
    print('print_df_status', df_status)

    # формируем список ID контрагентов которых мы оставим
    id_for_etm = 5663
    list_need_kontragent = df_status.loc[(df_status['Актуальные_s']==1)\
                                         |(df_status['Контрагент ID']== id_for_etm),\
                                         'Контрагент ID'].to_list()
    print('print_list_need_kontragent', list_need_kontragent)
    # формируем булевую маску для условия нужных контрагентов
    condition_need_kontragent = df_s['Контрагент ID'].isin(list_need_kontragent)
    print('print_condition_need_kontragent', condition_need_kontragent)
    # получим датафрейм для формирования словаря удаления
    df_for_dict = df_s[['index', 'Партнер ID', 'Продукт ID', 'Код Md']].loc[condition_need_kontragent & (
                (df_s['Подозрительная_цена'] == 1)
                |(df_s['Подозрительно_количество'] == 1)
                |(df_s['Подозрительно_RUR'] == 1)
                |(df_s['Подозрительно_Количество_котловой_Функция'] == 1)
                )]
    print('print_df_for_dict', df_for_dict)
    # получим список партнёров из df_for_dict для формирования словаря удаления
    list_kontragentid_for_dict = list(set(df_for_dict['Партнер ID']))
    print('print_list_kontragentid_for_dict', list_kontragentid_for_dict)
    # формируем словарь для удаления
    dict_from_delete = {}

    # циклом прохождения по партнёрам подготовим словарь для удаления
    for kontr_id in tqd(list_kontragentid_for_dict):

        list_sku_id = list(set(df_for_dict.loc[df_for_dict['Партнер ID'] == kontr_id, 'Код Md']))
        
        dict_from_delete[kontr_id] = list_sku_id
    print('print_dict_from_delete', dict_from_delete)
    #  почистим память
    df_for_dict.drop(df_for_dict.index, inplace=True)
    print('print_df_for_dict', df_for_dict)
    
    # получили датафрейм только по тем контрагентам, которые нам нужны
    df_s = df_s.loc[df_s['Контрагент ID'].isin(list_need_kontragent)]
    print('print_df_s только по тем контрагентам, которые нам нужны', df_s)
    
    # Теперь нужно подготовить общий список индексов для удаления из df_sneed_contragent
    # запускаем цикл формирования списка list_index_for_delete_all
    list_index_for_delete_all = cycle_index_for_delete_all(df_s, dict_from_delete)
    print('print_list_index_for_delete_all общий список индексов для удаления', list_index_for_delete_all)
    
    # смотрим сколько строк нужно удалить
    line_all = df_s['index'].count()
    print('print_line_all смотрим сколько всего строк', line_all)
    
    delete_line = len(list_index_for_delete_all)
    print('print_delete_line смотрим сколько строк нужно удалить', delete_line)
    print(f'Нужно удалить {round(delete_line/10**6, 2)} млн. строк из {round(line_all/10**6, 2)} млн., что составляет {round(delete_line/line_all*100)}% ')

    # Теперь самое интересное. Нужно убрать из df_s строки с индексами из list_index_for_delete_all

    # сформируем файл с очищенными и сгруппированными данными
    df_sclean = df_s[['Партнер ID', 'КодКонтрагента',
                        'Код Md', 'КодНоменклатуры', 
                        'Месяц',
                        'Количество', 'СуммаRUR']]\
    .loc[~df_s['index'].isin(list_index_for_delete_all)]\
    .groupby(['Партнер ID', 'КодКонтрагента',
            'Код Md', 'КодНоменклатуры', 
            'Месяц']).sum().reset_index()
    print('df_sclean', df_sclean)
    # добавим поле ДатаАктуальности
    df_sclean['ДатаАктуальности'] = actual_date

    return df_sclean


In [20]:
def export_sclean(df_sclean):
    
    '''Функция экспортирует данные df_sclean в базу данных Analitics'''
    data_server = 'sycorax'
    database = 'Analitics'
    cn = pyodbc.connect("Driver={SQL Server Native Client 11.0};\
                        Server="+data_server+";Database="+database+";Trusted_Connection=yes")
    cursr = cn.cursr()
    for index, row in tqd(df_sclean.iterrows()):
        cursr.execute(
            "INSERT INTO  dbo.[sclean] (\
             [Партнер ID], [КодКонтрагента], [Код Md], [КодНоменклатуры],\
             [Месяц],\
             [Количество], [СуммаRUR],\
             [ДатаАктуальности] \
                                                ) \
            values(?,?,?,?,?,?,?,?)",

            row['Партнер ID'], row['КодКонтрагента'], 
            row['Код Md'], row['КодНоменклатуры'], 
            row['Месяц'], 
            row['Количество'], row['СуммаRUR'],
            row['ДатаАктуальности']
        )

    cn.commit()
    cursr.close()
    cn.close()

In [39]:
def save_file(df, file_name):
    # на всякий случай выгружаем очищенные строки в файл csv
    PREPARED_DATASET_PATH = f'C:/Users/lazarevnv/Desktop/projectspython/project_swarning_filter/order_blowout/{file_name}.csv'
    df.to_csv(PREPARED_DATASET_PATH, index=False)

### Запускаем выполнение функций

In [22]:
# Загружаем данные
df_s, df_kn, df_kontragent, df_status =  import_all_df()

In [23]:
# обрабатываем данные в df_s
df_s, df_kn, df_kontragent = make_processing(df_s, df_kn, df_kontragent)

100%|██████████| 18608/18608 [2:22:32<00:00,  2.18it/s]  


In [None]:
# готовим отчёт по выбросам
df_order = sorder(df_s)

In [42]:
df_order.shape

(699212, 33)

In [None]:
df_order.tail(3)

In [40]:
%%time
#  выгружаем отчёт по выбросам
save_file(df_order, 'oder_blowout_08_08_2023')

CPU times: total: 10.2 s
Wall time: 12.9 s


In [None]:
df_sclean = sclean(df_s, '2023-08-08')

#### Фильтруем ситуацию с отсутствием данных в периоде 90 дней

In [116]:
df_sclean.shape

(1093801, 8)

In [26]:
df_sclean.shape

(1092690, 8)

In [27]:
def cycle_index_for_delete_all(df_sclean_):

    '''Функция формирует список индексов для удаления'''
    '''по критерию отсутствия продаж в период 90 дней'''
    # зададим дату окончания исторического периода
    hist_date_end = str(datetime.now().date()-timedelta(days=61))
    # инициируем список для заполнения
    list_index_for_delete_all = []
    partner_set = set(df_sclean_['Партнер ID'])
    print('Всего партнёров:', len(partner_set))
    
    # цикл по партнёрам
    for partner_id in tqd(partner_set):
        # создадим df для конкретного партнёра в цикле
        df_partner = df_sclean_.loc[(df_sclean_['Партнер ID'] == partner_id)]

        # цикл по sku
        for sku_id in set(df_partner['Код Md']):
            # создадим df для конкретного sku для выбранного партнёра
            df_partner_sku = df_partner.loc[(df_partner['Код Md'] == sku_id)]
            
            # определим s количество в историческом периоде
            hist_quantity = int(df_partner_sku.loc[df_partner_sku['Месяц']<= hist_date_end
                    ,'Количество'].sum())
            # определим s количество в периоде 90 дней
            ninety_daysquantity = int(df_partner_sku.loc[df_partner_sku['Месяц'] > hist_date_end
                            ,'Количество'].sum())
            if hist_quantity > 0 and ninety_daysquantity == 0:
                list_index_for_delete = df_partner_sku.index
                list_index_for_delete_all.extend(list_index_for_delete)
    return list_index_for_delete_all

In [28]:
def sclean_ninety_days(df_sclean_):
    
    '''Функция очищает df_sclean по списку список индексов для удаления'''
    '''по критерию отсутствия продаж в период 90 дней'''
    # вызываем функцию удаления 
    list_index_for_delete_all = cycle_index_for_delete_all(df_sclean_)

    # подготовим вспомогательный столбец с индексом
    df_sclean_ = df_sclean_.reset_index()

    df_sdelete_ = df_sclean_.loc[df_sclean_['index'].isin(list_index_for_delete_all)]
    df_sclean_ = df_sclean_.loc[~df_sclean_['index'].isin(list_index_for_delete_all)]
    
    return df_sclean_[['Партнер ID', 'КодКонтрагента', 'Код Md', 'КодНоменклатуры', 'Месяц',
       'Количество', 'СуммаRUR', 'ДатаАктуальности']], df_sdelete_

In [29]:
%%time
# доработаем df_sclean по требованию 
df_sclean, df_sdelete = sclean_ninety_days(df_sclean)

Всего партнёров: 53


100%|██████████| 53/53 [02:48<00:00,  3.18s/it]


CPU times: total: 2min 18s
Wall time: 2min 48s


In [30]:
# доработка не нашла примеров подходящих под условие 0 продаж за 90 дней
df_sclean.shape

(668583, 8)

In [31]:
df_sclean.tail()

Unnamed: 0,Партнер ID,КодКонтрагента,Код MDM,КодНоменклатуры,Месяц,Количество,СуммаRUR,ДатаАктуальности
1092682,35479,2377,143176,99546,2023-07-01,1.0,583.1,2023-08-08
1092684,35479,2377,144966,101338,2023-06-01,180.0,6229.8,2023-08-08
1092685,35479,2377,144966,101338,2023-07-01,180.0,6229.8,2023-08-08
1092686,35479,2377,144967,101339,2023-07-01,300.0,12417.0,2023-08-08
1092689,35479,2377,159952,116198,2023-07-01,60.0,3429.0,2023-08-08


In [32]:
# выгружаем df_sclean в базу данных
export_sclean(df_sclean)

668583it [37:41, 295.70it/s]
