### Настройка системы оповещений об аномалиях

Необходимо настроить систему алертов, которая будет сообщать об аномалиях в данных мобильного приложения. Система должна с периодичность каждые 30 минут проверять ключевые метрики, такие как активные пользователи, просмотры, лайки, CTR. 

Для реализации поставленной задачи напишем DAG, который подгрузим в Apache Airflow. Данная программа позволит ежедневно с заданной периодичностью обрабатывать скрипт и в случае фиксации аномалий запускать рассылку с использованием телеграм-бота.

In [None]:
#импортируем необходимые библиотеки
import requests
import json
from urllib.parse import urlencode
import pandahouse
import seaborn as sns 
import io
import matplotlib.pyplot as plt
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable
from datetime import timedelta
from datetime import datetime

In [55]:
#зададим параметры DAG
default_args = {
    'owner': 'a-lelkova',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 8, 29)   
}

Перед написанием скрипта в телеграм создадим бота, дадим ему имя и уникальный никнейм, запустим его и скопируем token.

Чтобы узнать id необходимого чата, в строку поиска браузера вставим https://api.telegram.org/bot{token}/getUpdates и подставим token нашего бота.

В открывшемся окне будет содержание json файла, где будет содержаться id чата (result > 0 > chat > id).

In [56]:
#напишем скрипт для выявления аномалий и оформим его в DAG
@dag('lelkova_alert_DAG',default_args=default_args, schedule_interval='/30 * * * *', catchup=False)
def lelkova_alert_DAG():
    #все расчеты будут производится в рамках одного таска 
    @task(retries=3)
    def alert():
        #параметры подключения к clickhouse
        connection = {
              'host': 'https://clickhouse.lab.karpov.courses',
              'password': 'dpo_python_2020',
              'user': 'student',
              'database': 'simulator_20240620'
                    }
        #импорт данных из clickhouse
        q = '''SELECT 
                    toDate(time) AS date,
                    count(distinct user_id) AS active_users,
                    sum(action = 'like') AS likes,
                    sum(action = 'view') AS views,
                    sum(action = 'like') / NULLIF(sum(action = 'view'), 0) AS CTR
                FROM 
                    simulator_20240620.feed_actions
                WHERE 
                    (time >= now() - INTERVAL 15 MINUTE AND time < now()) OR
                    (time >= now() - INTERVAL 1 DAY - INTERVAL 15 MINUTE AND time < now() - INTERVAL 1 DAY) OR
                    (time >= now() - INTERVAL 2 DAY - INTERVAL 15 MINUTE AND time < now() - INTERVAL 2 DAY) OR
                    (time >= now() - INTERVAL 3 DAY - INTERVAL 15 MINUTE AND time < now() - INTERVAL 3 DAY) OR
                    (time >= now() - INTERVAL 4 DAY - INTERVAL 15 MINUTE AND time < now() - INTERVAL 4 DAY) OR
                    (time >= now() - INTERVAL 5 DAY - INTERVAL 15 MINUTE AND time < now() - INTERVAL 5 DAY) OR
                    (time >= now() - INTERVAL 6 DAY - INTERVAL 15 MINUTE AND time < now() - INTERVAL 6 DAY) OR
                    (time >= now() - INTERVAL 7 DAY - INTERVAL 15 MINUTE AND time < now() - INTERVAL 7 DAY) 
                GROUP BY 
                    date
                ORDER BY 
                    date DESC
                            '''
        df_history = pandahouse.read_clickhouse(q, connection=connection)
        
        #зададим функцию, которая будет проверять данные с помощью метода IQR
        def anomalies_check(df, column_name):
            #посчитаем первый и третий квартили и межквартильный размах
            Q1 = df[column_name].quantile(0.25)
            Q3 = df[column_name].quantile(0.75)
            IQR = Q3 - Q1
            #зададим верхнюю и нижнюю границы
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            #значения, выходящие за границы, запишем в датафрейм
            anomalies_iqr = df[(df[column_name] < lower_bound) | (df[column_name] > upper_bound)]
            return anomalies_iqr
        #применим функцию к интересующим нас столбцам
        anomalies_users = anomalies_check(df_history, 'active_users')
        anomalies_likes = anomalies_check(df_history, 'likes')
        anomalies_views = anomalies_check(df_history, 'views')
        anomalies_CTR = anomalies_check(df_history, 'CTR')

        # пишем систему оповещений при обнаружеии аномалий в метрике active_users
        if not anomalies_users.empty:
            # записываем в переменную аномальное значение метрики
            users_anomal = anomalies_users.active_users.iloc[0]
            #считаем отклонение аномального значения от среднего в %
            users_mean = df_history.active_users.mean()
            deviation_users = (((users_anomal - users_mean) / users_mean) * 100).round(2)
            #пишем текст для оповещения и задаем параметры
            chat_id = 432320975
            token = '7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM'
            msg = f"Обнаружены аномалии! \nactive users = {users_anomal}. \nОтклонение {deviation_users}%"
            params = {'chat_id': chat_id, 'text': msg}

            base_url = f'https://api.telegram.org/bot7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM/'
            url = base_url + 'sendMessage?' + urlencode(params)
            #если аномалия будет зафиксировано, наше уведомление направится в чат
            resp = requests.get(url)
        #аналогично пишем систему оповещений при обнаружеии аномалий в метрике likes
        if not anomalies_likes.empty:
            likes_anomal = anomalies_likes.likes.iloc[0]
            likes_mean = df_history.likes.mean()
            deviation_likes = (((likes_anomal - likes_mean) / likes_mean) * 100).round(2)
            chat_id = 432320975
            token = '7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM'
            msg = f"Обнаружены аномалии! \nlikes = {likes_anomal}. \nОтклонение {deviation_likes}%"
            params = {'chat_id': chat_id, 'text': msg}

            base_url = f'https://api.telegram.org/bot7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM/'
            url = base_url + 'sendMessage?' + urlencode(params)

            resp = requests.get(url)
        #аналогично пишем систему оповещений при обнаружеии аномалий в метрике views
        if not anomalies_views.empty:
            views_anomal = anomalies_views.views.iloc[0]
            views_mean = df_history.views.mean()
            deviation_views = (((views_anomal - views_mean) / views_mean) * 100).round(2)
            chat_id = 432320975
            token = '7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM'
            msg = f"Обнаружены аномалии! \nviews = {views_anomal}. \nОтклонение {deviation_views}%"
            params = {'chat_id': chat_id, 'text': msg}

            base_url = f'https://api.telegram.org/bot7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM/'
            url = base_url + 'sendMessage?' + urlencode(params)

            resp = requests.get(url)
        #аналогично пишем систему оповещений при обнаружеии аномалий в метрике CTR
        if not anomalies_CTR.empty:
            CTR_anomal = anomalies_CTR.CTR.iloc[0]
            CTR_mean = df_history.CTR.mean()
            deviation_CTR = (((CTR_anomal - CTR_mean) / CTR_mean) * 100).round(2)
            chat_id = 432320975
            token = '7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM'
            msg = f"Обнаружены аномалии! \nCTR = {CTR_anomal}. \nОтклонение {deviation_CTR}%"
            params = {'chat_id': chat_id, 'text': msg}

            base_url = f'https://api.telegram.org/bot7189949223:AAFbFkFXxHtw8oq3C2JbYHFqgI5eqccInnM/'
            url = base_url + 'sendMessage?' + urlencode(params)

            resp = requests.get(url)

        else:
            print('Ok')
        

    alert()

In [57]:
lelkova_alert_DAG = lelkova_alert_DAG()

После загрузки данного файла в формате .py через репозиторий в Airflow скрипт будет отрабатываться каждые 30 минут и проверять данные на аномалии. В случае обнаружения аномального значения в какой-либо из метрик телеграм бот будет направлять рассылку с оповещением в указанный чат.