In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator # Так как мы пишет такси в питоне
from datetime import datetime,  timedelta
from airflow.decorators import dag, task
from io import StringIO 
import requests 
from airflow.operators.python import get_current_context 
import pandas as pd
import pandahouse as ph


# дефолтные аргументы для DAG
default_args = {
    'owner': 'r_muksinov', # владелец
    'depends_on_past': False, # не зависит от  успешности прошлого запуска 
    'retries': 2, # количество рестартов
    'retry_delay': timedelta(minutes=5), # пауза между рестартами
    'start_date': datetime(2023, 4, 9) # начало выполнения
}

# база в которую загружаем результат
connect_test = {
        'host': '******',
        'password': '******',
        'user': '******',
        'database': '******'
        }

# база из которой берем данные
connection = {'host': '******',
              'database': '******',
              'user': '******', 
              'password': '******'
              }

schedule_interval = '0 5 * * *' # cron - выражение. Каждый день в 5 утра

# функция для выгрузки данных из базы в переменную
def get_data(query, connection):
    return ph.read_clickhouse(query, connection=connection)


@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, tags=['r.muksinov'])
def muksinov_task():
    # для каждого юзера посчитаем число просмотров и лайков контента
    @task()
    def get_feed(): 
        query = """
        SELECT user_id, 
               age, 
               os,
               gender,  
               countIf(action='like') AS likes, 
               countIf(action='view') AS views, 
               toString(toDate(time)) as event_date
        FROM  {db}.feed_actions
        WHERE toDate(time) = yesterday()
        GROUP BY user_id, toString(toDate(time)), os, gender, age
        ORDER BY user_id
        """
        
             
        df = get_data(query=query, connection=connection)
        return df
    # для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему
    @task()
    def get_messages(): 
        query2 = """
        SELECT * 
        FROM   
            (SELECT user_id, 
                    age,
                    os, 
                    gender, 
                    count(reciever_id) as sent_messages, 
                    count(distinct reciever_id) sent_uniqusers_qty,
                    toString(toDate(time)) as event_date
            FROM  {db}.message_actions 
            GROUP BY user_id, 
                     event_date, 
                     age, 
                     os,
                     gender
            HAVING toDate(time) = yesterday()
            ORDER BY user_id) as user_sent
            JOIN
            (SELECT reciever_id, 
                    count(user_id) as received_messages, 
                    count(distinct user_id) as received_uniqusers_qty
            FROM  {db}.message_actions 
            GROUP BY reciever_id
            HAVING toDate(time) = yesterday()
            ORDER BY reciever_id) as user_received
        ON user_sent.user_id = user_received.reciever_id
        """

        df = get_data(query=query2, connection=connection)
        return df
    
    # объединяем обе таблицы
    @task()
    def merge_data(feed, messages):
        all_data = feed.merge(messages, how='outer', on=['user_id', 'event_date', 'age', 'os',  'gender'])
        return all_data
    
    # срез по ОС
    @task()
    def pivot_os(df):
        data_os = df.pivot_table(index=['event_date', 'os'],\
                                   values=['views', 'likes', 'sent_messages', 'sent_uniqusers_qty', 'received_messages', 'received_uniqusers_qty'], \
                                   aggfunc='sum'
                                  ).reset_index()
        data_os.rename(columns={'os': 'dimension_value'}, inplace=True)
        data_os['dimension'] = 'os'
        return data_os
    
    # срез по полу
    @task()
    def pivot_gender(df):
        data_gender = df.pivot_table(index=['event_date', 'gender'],\
                                   values=['views', 'likes', 'sent_messages', 'sent_uniqusers_qty', 'received_messages', 'received_uniqusers_qty'], \
                                   aggfunc='sum'
                                  ).reset_index()
        data_gender.rename(columns={'gender': 'dimension_value'}, inplace=True)
        data_gender['dimension'] = 'gender'
        return data_gender
    
    # срез по возрасту
    @task()
    def pivot_age(df):
        # категоризация возраста
        def categorize_age(age):
            try:
                if 0 <= age <= 18:
                    return 'до 18'
                elif 18 < age <= 21:
                    return 'от 18 до 21'
                elif 21 < age <= 24:
                    return 'от 21 до 24'
                elif 24 < age <= 27:
                    return 'от 24 до 27'
                elif 27 < age <= 30:
                    return 'от 27 до 30'
                elif 30 < age <= 35:
                    return 'от 30 до 35'
                elif 35 < age <= 45:
                    return 'от 35 до 45'
                elif age > 45:
                    return 'от 45'
            except:
                pass
        df['age'] = df['age'].apply(categorize_age)
        
        data_age = df.pivot_table(index=['event_date', 'age'],\
                                   values=['views', 'likes', 'sent_messages', 'sent_uniqusers_qty', 'received_messages', 'received_uniqusers_qty'], \
                                   aggfunc='sum'
                                  ).reset_index()
        data_age.rename(columns={'age': 'dimension_value'}, inplace=True)
        data_age['dimension'] = 'age'

        return data_age
    
    # конкатенация всех срезов
    @task()
    def concat_pivot(os, gender, age): 
        concat_data = pd.concat([os, gender, age], axis=0).reset_index() 
        concat_data = concat_data.astype({'likes': 'int32', 'views': 'int32', 'received_messages': 'int32', 'received_uniqusers_qty': 'int32', 'sent_messages': 'int32', 'sent_uniqusers_qty': 'int32'})
        return concat_data.drop(columns='index') 
    
    # загрузка нашей таблицы в БД 
    @task()
    def load(table):
        add_table = """
                CREATE TABLE IF NOT EXISTS test.muksinov_ruslan(
                event_date String,
                dimension_value String,
                likes Int32,
                views Int32,
                received_messages Int32,
                received_uniqusers_qty Int32,
                sent_messages Int32,
                sent_uniqusers_qty Int32,
                dimension String)
                ENGINE = MergeTree()
                ORDER BY event_date
                """
        ph.execute(query=add_table, connection=connect_test)
        ph.to_clickhouse(df = table, table = 'muksinov_ruslan', connection=connect_test, index=False)
    
    
    feed = get_feed()
    messages = get_messages()
    merged_data =  merge_data(feed, messages)
    pivot_os = pivot_os(merged_data)
    pivot_gender = pivot_gender(merged_data)
    pivot_age = pivot_age(merged_data)
    uni_data =  concat_pivot(pivot_os, pivot_gender, pivot_age)
    load(uni_data)

muksinov_task = muksinov_task()