#### ETL задачa: ожидается, что на выходе будет DAG в airflow, который будет считаться каждый день за вчера. 

1. Параллельно будем обрабатывать две таблицы. В feed_actions для каждого юзера посчитаем число просмотров и лайков контента. В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему. Каждая выгрузка должна быть в отдельном таске.
2. Далее объединяем две таблицы в одну.
3. Для этой таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. Делаем три разных таска на каждый срез.
4. И финальные данные со всеми метриками записываем в отдельную таблицу в ClickHouse.
5. Каждый день таблица должна дополняться новыми данными. 


##### Структура финальной таблицы должна быть такая:

Дата - event_date<br>
Название среза - dimension<br>
Значение среза - dimension_value<br>
Число просмотров - views<br>
Числой лайков - likes<br>
Число полученных сообщений - messages_received<br>
Число отправленных сообщений - messages_sent<br>
От скольких пользователей получили сообщения - users_received<br>
Скольким пользователям отправили сообщение - users_sent<br>
Срез - это os, gender и age<br>

In [None]:
from datetime import datetime, timedelta
import pandas as pd
import pandahouse as ph

from airflow.decorators import dag, task
#from airflow.operators.python import get_current_contex


#для подключения к clickhouse
connection = {'host':'https://clickhouse.lab.karpov.courses',
              'database':'simulator_20221020',
              'user':'student', 
              'password':'dpo_python_2020'}

#для подключения к тестовой базе данных в CH
connection_test = {'host':'https://clickhouse.lab.karpov.courses', 
              'user':'student', 
              'password':'password',
              'database': 'test'}


# Дефолтные параметры, которые прокидываются в таски
default_args = {
    'owner': 'aksakal',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2022, 9, 1), # старт дата, с которой собираем данные 01/09/2022
    }

# Интервал запуска DAG
schedule_interval = '0 9 * * *'


# создаем DAG 
# параметр catchup - чтобы Dag считался только за текущий день, а не за прошлые
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def dag_aksakal():


    #1.1. В feed_actions для каждого юзера посчитаем число просмотров и лайков контента
    @task()
    def extract_feed():
        query  = """SELECT 
                       user_id, 
                       toDate(time) as event_date, 
                       os, 
                       IF(gender = 0, 'male', 'female') as gender,
                       sum(action = 'like') as likes, 
                       sum(action = 'view') as views, 
                       multiIf(age <= 20, '0 - 20', 
                       age > 20 and age <= 30, '21-30', 
                       age > 30 and age <= 40, '31-40', 
                       age > 40 and age <= 50, '41-50', '50+') as age
                   FROM 
                       {db}.feed_actions 
                        
                   WHERE 
                       toDate(time) = today() - 1
                   group by
                        user_id,
                        event_date,
                        os, 
                        age, 
                        gender"""
        df_cube_feed = ph.read_clickhouse(query, connection=connection)
        return df_cube_feed
        
#1.2.В message_actions для каждого юзера считаем,сколько получает и отсылает сообщений,скольким людям пишет,сколько пишут ему 
#Число полученных сообщений - messages_received
#Число отправленных сообщений - messages_sent
#От скольких пользователей получили сообщения - users_received
#Скольким пользователям отправили сообщение - users_sent
# WHERE toDate(time) = today() - 1 -- за вчера   


    @task()
    def extract_message():
        query  = '''SELECT 
                    id as user_id, 
                    event_date,
                    messages_sent, 
                    messages_received, 
                    users_received,
                    users_sent,
                    os, gender, age
                    
                FROM
                
                    (SELECT user_id as id, os, 
                    toDate(time) as event_date,
                    count(user_id) as messages_sent,
                    countdistinct(reciever_id) as users_sent,
                    
                    IF(gender = 0, 'male', 'female') as gender,
                    
                    multiIf(age <= 20, '0 - 20', 
                    age > 20 and age <= 30, '21-30', 
                    age > 30 and age <= 40, '31-40', 
                    age > 40 and age <= 50, '41-50', '50+') as age
                    
                    FROM  {db}.message_actions
                    WHERE toDate(time) = today() - 1 
                    group by user_id, event_date, 
                    os, age, gender) t1               
               JOIN
                    (SELECT reciever_id as id,
                    toDate(time) as event_date,
                    count(reciever_id) as messages_received, 
                    countdistinct(user_id) as users_received 
                    FROM  {db}.message_actions
                    WHERE toDate(time) = today() - 1 
                    group by reciever_id, event_date) t2
                USING id'''
        df_cube_message = ph.read_clickhouse(query, connection=connection)
        return df_cube_message
      

#2. Далее объединяем две таблицы в одну.
    @task
    def merge_feed_message(df_cube_feed, df_cube_message):
        df_cube = df_cube_feed.merge(df_cube_message, on=['user_id', 'event_date', 'gender', 'age', 'os'], how='outer')
        return df_cube

                    
#3. Для этой таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. Делаем три разных таска на каждый срез.   
    @task
    def transform_age(df_cube):
        df_cube_age = df_cube[['event_date', 'age','likes', 'views', 'messages_sent', 'messages_received', \
                      'users_received', 'users_sent']].groupby(['age', 'event_date']).sum().reset_index()
        df_cube_age ['dimension'] = 'age'
        df_cube_age .rename(columns= {'age': 'dimension_value'}, inplace=True)
        return  df_cube_age 

    @task
    def transform_gender(df_cube):
        df_cube_gender = df_cube[['event_date', 'gender','likes', 'views', 'messages_sent', 'messages_received', \
                      'users_received', 'users_sent']].groupby(['gender','event_date']).sum().reset_index()
        df_cube_gender['dimension'] = 'gender'
        df_cube_gender.rename(columns= {'gender': 'dimension_value'}, inplace=True)
        return df_cube_gender

    @task
    def transform_os(df_cube):
        df_cube_os = df_cube[['event_date', 'os','likes', 'views', 'messages_sent', 'messages_received', \
                      'users_received', 'users_sent']].groupby(['os', 'event_date']).sum().reset_index()
        df_cube_os['dimension'] = 'os'
        df_cube_os.rename(columns= {'os': 'dimension_value'}, inplace=True)
        return df_cube_os
  
    # объединяю финальные данные со всеми метриками
    @task
    def concat_metrics_table(df_cube_age, df_cube_gender, df_cube_os):
        metrics_table = pd.concat([df_cube_age, df_cube_gender, df_cube_os]).reset_index(drop=True)
       
        # изменила типы данных - подготовка к записи в базу test
        metrics_table = metrics_table.astype({'dimension': str, 'dimension_value': str,'likes':  int, 'views':  int,\
                        'messages_sent':  int, 'messages_received': int, 'users_received': int, 'users_sent':  int}) 
        metrics_table['event_date'] = pd.to_datetime(metrics_table['event_date']) # изменила тип на дата
        return metrics_table
    
    
#4. И финальные данные со всеми метриками записываем в отдельную таблицу в ClickHouse.   
    @task
    def load(metrics_table):  
        #подключаемся к тестовой базе данных
        q_create = ('''CREATE TABLE IF NOT EXISTS test.aksakal_table
                  (event_date Date,
                  dimension VARCHAR(255),
                  dimension_value VARCHAR(255),
                  views Int64,
                  likes Int64,
                  messages_received Int64,
                  messages_sent Int64,
                  users_received Int64,
                  users_sent Int64)
                  ENGINE = MergeTree()
                  ORDER BY event_date''')
    
        ph.execute(query=q_create, connection=connection_test)
        ph.to_clickhouse(df=metrics_table, table='aksakal_table', connection=connection_test, index=False)
                                          
                                                  
    #запускаем task   
    df_cube_feed =  extract_feed()
    df_cube_message = extract_message()
    df_cube = merge_feed_message(df_cube_feed, df_cube_message)
    df_cube_age = transform_age(df_cube)
    df_cube_gender = transform_gender(df_cube)
    df_cube_os = transform_os(df_cube)
    metrics_table = concat_metrics_table(df_cube_age, df_cube_gender, df_cube_os)
    load(metrics_table)

    
# запускаем dag  
dag_aksakal = dag_aksakal()