In [None]:
# coding=utf-8

from datetime import datetime, timedelta
import pandas as pd
import pandahouse as ph
from io import StringIO
import requests
from datetime import date
import telegram 
import matplotlib.pyplot as plt
import numpy as np
import io
import seaborn as sns

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

#доступ в телеграм бот
my_token = '***'#(удалено)
bot = telegram.Bot(token=my_token)
chat_id = *** #(удалено)
    
#параметры соединения 
connection = {'host': 'https://clickhouse.lab.karpov.courses',
'database':'simulator_20231113',
'user':'student',
'password':'dpo_python_2020'
}

# Дефолтные параметры, которые прокидываются в таски
default_args = {
    'owner': 'd-kulikova', # Владелец операции 
    'depends_on_past': False, # Зависимость от прошлых запусков
    'retries': 2, # Кол-во попыток выполнить DAG
    'retry_delay': timedelta(minutes=5), # Промежуток между перезапусками
    'start_date': datetime(2023, 10, 1), # Дата начала выполнения DAG
}

# Интервал запуска DAG
schedule_interval = '0 11 * * *' 

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def dag_kulikova_app_bot_report():

    @task
    def extract_feed_day():
        q = """
            SELECT toDate(time) as date,
                    uniq(user_id) as DAU,
                    sum(action = 'like') as Likes,
                    sum(action = 'view') as Views,
                    round(Likes/Views, 3) as CTR
            FROM {db}.feed_actions
            WHERE toDate(time) = yesterday()
            GROUP BY date
        """ 
        feed = ph.read_clickhouse(q, connection=connection)
        return feed
    
    @task
    def extract_feed_week():
        q = """ SELECT toDate(time) as date,
                       uniq(user_id) as DAU,
                       sum(action = 'like') as Likes,
                       sum(action = 'view') as Views,
                       round(Likes/Views, 3) as CTR
                 FROM {db}.feed_actions
                 WHERE toDate(time) between DATE_SUB(NOW(), INTERVAL 30 DAY) AND NOW()
                 GROUP BY date
        """
        feed = ph.read_clickhouse(q, connection=connection)
        return feed

    @task
    def extract_message_day():
        q = """ SELECT date, 
                       count(user_id) as DAU,
                       sum(messages_received) as messages_received,
                       sum(messages_sent) as messages_sent,
                       sum(users_received) as users_received,
                       sum(users_sent) as users_sent
                FROM
                    (SELECT date, user_id, messages_received, messages_sent, users_received, users_sent
                     FROM
                        (SELECT toDate(time) as date,
                                user_id,
                                count(receiver_id) as messages_sent,
                                uniq(receiver_id) as users_sent
                         FROM {db}.message_actions
                         WHERE toDate(time) = yesterday()
                         GROUP BY user_id, date) as t1
                JOIN
                    (SELECT toDate(time) as date,
                            receiver_id,
                            count(user_id) as messages_received,
                            uniq(user_id) as users_received
                    FROM {db}.message_actions
                    WHERE toDate(time) = yesterday()
                    GROUP BY receiver_id, date) as t2
                    ON t1.user_id = t2.receiver_id) as tbl
               GROUP BY date
        """
        message = ph.read_clickhouse(q, connection=connection)
        return message
    
    @task
    def extract_message_week():
        q = """ SELECT date, 
                       count(user_id) as DAU,
                       sum(messages_received) as messages_received,
                       sum(messages_sent) as messages_sent,
                       sum(users_received) as users_received,
                       sum(users_sent) as users_sent
                FROM
                    (SELECT date, user_id, messages_received, messages_sent, users_received, users_sent
                     FROM
                        (SELECT toDate(time) as date,
                                user_id,
                                count(receiver_id) as messages_sent,
                                uniq(receiver_id) as users_sent
                         FROM {db}.message_actions
                         WHERE toDate(time) between DATE_SUB(NOW(), INTERVAL 30 DAY) AND NOW() 
                         GROUP BY user_id, date) as t1
                JOIN
                     (SELECT toDate(time) as date,
                             receiver_id,
                             count(user_id) as messages_received,
                             uniq(user_id) as users_received
                      FROM {db}.message_actions
                      WHERE toDate(time) between DATE_SUB(NOW(), INTERVAL 30 DAY) AND NOW()
                      GROUP BY receiver_id, date) as t2
                ON t1.user_id = t2.receiver_id) as tbl
                GROUP BY date
        """
        
        message = ph.read_clickhouse(q, connection=connection)
        return message
    
    @task
    def send_text(feed, message):
        msg = f'''
        Отчет за *{feed.date[0].strftime('%Y-%m-%d')}*:

        *DAU ленты:* {feed.DAU[0]}
        *Views:* {feed.Likes[0]}
        *Likes:* {feed.Views[0]}
        *CTR:* {feed.CTR[0]}
        
        *DAU сообщений:* {message.DAU[0]}
        *Messages received:* {message.messages_received[0]}
        *Messages sent:* {message.messages_sent[0]}
        *Users received:* {message.users_received[0]}
        *Users sent:* {message.users_sent[0]}
        '''
        bot.sendMessage(chat_id=chat_id, text=msg)

    @task
    def send_picture(df, metric, title):
        sns.lineplot(x=df.date, y=df[metric])
        plt.title(title)
        plt.xticks(rotation=20)
        plot_object = io.BytesIO()
        plt.savefig(plot_object)
        plot_object.seek(0)
        plot_object.name = 'metric.png'
        plt.close()
        bot.sendPhoto(chat_id=chat_id, photo=plot_object)
        
    feed_day = extract_feed_day()
    feed_week = extract_feed_week()
    message_day = extract_message_day()
    message_week = extract_message_week()
    send_text(feed_day, message_day)
    send_picture(feed_week, 'DAU', 'DAU ленты')
    send_picture(feed_week, 'Likes', 'Likes')
    send_picture(feed_week, 'Views', 'Views')
    send_picture(feed_week, 'CTR', 'CTR')
    send_picture(message_week, 'DAU', 'DAU сообщений')
    send_picture(message_week, 'messages_received', 'Messages received')
    send_picture(message_week, 'messages_sent', 'Messages sent')
    send_picture(message_week, 'users_received', 'Users received')
    send_picture(message_week, 'users_sent', 'Users sent')

dag_kulikova_app_bot_report = dag_kulikova_app_bot_report()
