In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import telegram
import pandahouse
from datetime import date
import io
from read_db.CH import Getch
import sys
import os


def check_anomaly(df, metric, sigma, days = 7):
    
    """
    функция check_anomaly предлагает алгоритм проверки значения на аномальность посредством
    сравнения текущего значения метрики со средненедельным показателем в 15 минутном интервале.

    Параметры
    ---------
    df - датафрейм
    metric - метрика кототорая проверяется
    sigma - значение сигмы
    days - число предыдущих дней, которые идут для определения аномалии.

    Функция возвращает: 
    is_alert - запуск оповещения, 1 или 0
    current_value - текущее значение метрики
    diff - отклонение текущего значения от сравниваемого
    """

    
    # найдем список значений метрики за текущий и предыдущие дни
    list_of_value = []
    for n in range(0, days + 1):
        current_ts = df['ts'].max()           # достаем максимальную 15-минутку из датафрейма - ту, которую будем проверять на аномальность
        n_days_ago_ts = current_ts - pd.DateOffset(days=n)
        n_days_ago_value = df[df['ts'] == n_days_ago_ts][metric].iloc[0] # достаем из датафрейма значение метрики в максимальную 15-минутку
        list_of_value.append(round(n_days_ago_value, 2))
        print(n_days_ago_ts, n_days_ago_value)

     
    # значение метрики в текущую 15-минутку, удаляем из общего списка
    current_value = list_of_value.pop(0)
    # рассчитываем значение от которого будет определяться вызов алерта
    mean_value = np.mean(list_of_value)
    print(' last_week_value', list_of_value, '\n',
          'current_value', current_value, '\n', 
          'mean_value', round(mean_value, 2))
    

    # вычисляем отклонение
    if current_value <= mean_value:
        diff = abs(current_value / mean_value - 1)
    else:
        diff = abs(mean_value / current_value - 1)
    print(' diff', round(diff, 2))

    
    # Вариант 1. Определяем верхние и нижние границы по правилу 3x сигм
    low_sigm = mean_value - sigma * np.std(list_of_value)
    upper_sigm = mean_value + sigma * np.std(list_of_value)
    
    
    # Вариант 2. Определяем верхние и нижние границы межквартильного размаха
    quant_25 = np.quantile(list_of_value, 0.25)
    quant_75 = np.quantile(list_of_value, 0.75)
    low_quant = quant_25 - sigma * (quant_75 - quant_25)
    upper_quant = quant_75 + sigma * (quant_75 - quant_25)
    
    
    # Методы на основе ML
    # Вариант 3. DBSCAN (Density-based spatial clustering of applications with noise)
    # Вариант 4. LOF (Local Outlier Factor)
    
    
    # Проверяем вызов алерта
    ''' Настроим аллерт на на одновременную проверку границ 3мя сигмам и межквартильным размахом, но при высокой сигме.
        Поскольку при маленькой выборке, но высокой волатильности и высокой сигме межквартильный размах устанавливает 
        слишком широкие границы для активации алерта'''
    if (current_value < (low_sigm or low_quant)
       or current_value > (upper_sigm or upper_quant)):
            is_alert = 1
    else:
            is_alert = 0
    print(' is_alert', is_alert, 'границы алерта 3х сигм: ', round(low_sigm, 2), round(upper_sigm, 2))
    print(' is_alert', is_alert, 'границы алерта квантилей: ', round(low_quant, 2), round(upper_quant, 2))
    print('----------------------------------')
    return is_alert, current_value, diff



def run_alerts(chat=2093549033, sigma = 1):
    
    """
    Функция run_alerts запускает подключение к базе данных, выполняет запрос на проверку 
    метрик на аномалии, а также формирует информацию для отчета в telegram
    
    Параметры
    ---------
    chat - chat_id telegram для отправки сообщения
    sigma - значение сигмы
    """
    
    
    # Информация о боте telegram
    chat_id = chat or -655725590
    bot = telegram.Bot(token='*****************************************************')
    

    # Формирование запроса к базе данных
    querry = '''SELECT *
                FROM
                    (select toStartOfFifteenMinutes(time) as ts,
                        toDate(time) as date,
                        formatDateTime(ts, '%R') as hm,
                        count(distinct user_id) as users_lenta,
                        countIf(user_id, action='like') as likes,
                        countIf(user_id, action='view') as views,
                        countIf(user_id, action = 'like') / countIf(user_id, action = 'view') * 100 as CTR
                    from simulator_20220120.feed_actions
                    where ts >=  today() - 7 and ts < toStartOfFifteenMinutes(now())
                    group by ts, date, hm) t1
                FULL JOIN
                    (select toStartOfFifteenMinutes(time) as ts,
                        toDate(time) as date,
                        formatDateTime(ts, '%R') as hm,
                        count(distinct user_id) as users_messenger,
                        count(user_id) as messages,
                        messages/users_messenger as messages_per_user
                    from simulator_20220120.message_actions
                    where ts >=  today() - 7 and ts < toStartOfFifteenMinutes(now())
                    group by ts, date, hm
                    order by toDate(time) DESC) t2 using ts, date, hm
                ORDER BY ts DESC'''
    
    # Датафрейм для проферки анамалий
    data = Getch(querry).df

    
    # Проверка группы метрик на анамалии
    metrics = data.columns[3:]
    for metric in metrics:
        print('Метрика: ', metric)
        # проверяем метрику на аномальность алгоритмом, описаным внутри функции check_anomaly()
        is_alert, current_value, diff = check_anomaly(data, metric, sigma)
        if is_alert:
            
            # Строим графики
            sns.set(rc={'figure.figsize': (16, 10)}) # задаем размер графика
            plt.tight_layout() # плотная компоновка

            
            # визуализация метрики за сегодняшний день
            ax = sns.lineplot(data=data[data.date == data.date.unique()[0]].sort_values(by=['date', 'hm']), 
                              x="hm", 
                              y=metric,
                              label = '{metric}, {day}'.format(metric = metric,
                                                              day = data['date'].dt.date[0]))

            
            # при сигма 1/ 2/ 3 доверительный интервал соответственно равен 68,3% / 95,5% / 99,7%
            if sigma == 1:
                ci = 68.3
            elif sigma == 2:
                ci = 95.5
            else:
                ci = 99.7

                
            # визуализация среднего значения и доверительного интервала за предыдущую неделю
            ax = sns.lineplot(data=data[data.date < data.date.unique()[0]].sort_values(by=['date', 'hm']), 
                              x="hm",
                              y=metric,
                              ci=ci,
                              label='{sigma} sigma range + rolling mean, 1 week before:'.format(sigma = sigma))

            
            # разряжаем подписи по оси X
            for ind, label in enumerate(ax.get_xticklabels()):
                if ind % 15 == 0:
                    label.set_visible(True)
                else:
                    label.set_visible(False)
            
            
            ax.set(xlabel='time') # задаем имя оси Х
            ax.set(ylabel=metric) # задаем имя оси У
            ax.set_title('{}'.format(metric)) # задаем заголовок графика
            ax.set(ylim=(0, None)) # задаем лимит для оси У
            
            
            # Создаем ссылку на актуальный дашборд
            half_link = ['17204', '17197', '17205', '17206', '17207', '17208', '17209']
            link = dict(zip(metrics, half_link))
            
            
            # Формируем сообщение для отправки
            msg = '''Метрика {metric}:\n- текущее значение: {current_value:.2f}
- отклонение от скользящего среднего за неделю: {diff:.2%}\nДоп. информация по метрике:
https://redash.lab.karpov.courses/queries/7664#{link}'''.format(metric=metric,
                                                               current_value=current_value,
                                                               diff=diff,
                                                               link=link.get(metric, None))
            
            # формируем файловый объект
            plot_object = io.BytesIO()
            ax.figure.savefig(plot_object)
            plot_object.seek(0)
            plot_object.name = '{0}.png'.format(metric)
            plt.close()

            # отправляем алерт
            bot.sendMessage(chat_id=chat_id, text=msg)
            bot.sendPhoto(chat_id=chat_id, photo=plot_object)


try:
    run_alerts()
except Exception as e:
    print(e)

Метрика:  users_lenta
2022-02-10 10:30:00 579
2022-02-09 10:30:00 605
2022-02-08 10:30:00 585
2022-02-07 10:30:00 594
2022-02-06 10:30:00 958
2022-02-05 10:30:00 456
2022-02-04 10:30:00 566
2022-02-03 10:30:00 568
 last_week_value [605, 585, 594, 958, 456, 566, 568] 
 current_value 579 
 mean_value 618.86
 diff 0.06
 is_alert 0 границы алерта 3х сигм:  473.02 764.69
 is_alert 0 границы алерта квантилей:  534.5 632.0
----------------------------------
Метрика:  likes
2022-02-10 10:30:00 1440
2022-02-09 10:30:00 1266
2022-02-08 10:30:00 1420
2022-02-07 10:30:00 1438
2022-02-06 10:30:00 2169
2022-02-05 10:30:00 1185
2022-02-04 10:30:00 1679
2022-02-03 10:30:00 1228
 last_week_value [1266, 1420, 1438, 2169, 1185, 1679, 1228] 
 current_value 1440 
 mean_value 1483.57
 diff 0.03
 is_alert 0 границы алерта 3х сигм:  1163.77 1803.38
 is_alert 0 границы алерта квантилей:  935.5 1870.0
----------------------------------
Метрика:  views
2022-02-10 10:30:00 6753
2022-02-09 10:30:00 5946
2022-02-08