In [None]:
# подключим библиотеки

from datetime import datetime, timedelta
import pandas as pd
from io import StringIO
import requests
import pandahouse

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context


# Функция, чтобы вытащить данные с Clickhouse
def get_df(query='Select 1', host='', password=''):
    r = requests.post(host, data=query.encode("utf-8"), auth=(user, password), verify=False)
    result = pd.read_csv(StringIO(r.text), sep='\t')
    return result

# Дефолтные параметры, которые прокидываются в таски
default_args = {
    'owner': 'd.melikhov',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2022, 7, 31),
}

# Интервал запуска DAG
schedule_interval = '0 10 * * *'


@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def dag_yesterday_stat():
    
    @task
    def extract_messages():
        query = """
           q1 = '''
        select distinct day, user,
                case when m.gender = 0 and f.gender = 1 then f.gender
                    when m.gender = 1 and f.gender = 0 then m.gender
                    else ifNull(m.gender, f.gender) 
                    end gender, 
                case when m.age = 0 and f.age != 0 then f.age
                    when m.age != 0 and f.age = 0 then m.age
                    else ifNull(m.age, f.age) 
                    end age, 
                case when m.os = '' and f.os != '' then f.os
                    when f.os = '' and m.os != '' then m.os
                    else ifNull(m.os, f.os) 
                    end os,
                messages_sent, users_sent, messages_received, users_received 
        from
            ( -- отберём всех кто получал или отправлял вчера сообщения 
            select distinct toDate(time) day, user_id user
            from simulator_20220720.message_actions
            where toDate(time) = yesterday()
            union all
            select distinct toDate(time), reciever_id
            from simulator_20220720.message_actions
            where toDate(time) = yesterday() 
            ) yesterday
        -- джойним общие данные, чтобы получить в дальнейшем возраст, пол, ос, так как все эти атрибуты изначально
        -- относятся только к отправителю и нет гарантий, что вчерашние пользователи есть только в базе ленты или базе сообщений
            left join (select distinct user_id, gender, age, os
                from simulator_20220720.message_actions)  m on yesterday.user = m.user_id
            left join ( select distinct user_id, gender, age, os
                from simulator_20220720.feed_actions) f on yesterday.user = f.user_id
        -- левый джойн для подсчёта отправленных
            left join (select user_id,
                        count(reciever_id) messages_sent,
                        count(distinct reciever_id) users_sent
                        from simulator_20220720.message_actions 
                        where toDate(time) = yesterday()
                        group by user_id) sent on sent.user_id = yesterday.user
        -- левый джойн для полученных           
            left join (select reciever_id,
                        count(user_id) messages_received,
                        count(distinct user_id) users_received
                        from simulator_20220720.message_actions 
                        where toDate(time) = yesterday()
                        group by reciever_id) rec on rec.reciever_id = yesterday.user
            format TSVWithNames
                        """
        messages_df = get_df(query = query)
        return messages_df
    
    
    @task
    def extract_feeds():
        query = """
                select toDate(time) day, user_id user, gender, os, age, 
                        countIf(action, action = 'like') likes, 
                        countIf(action, action = 'view') views
                from simulator_20220720.feed_actions
                where toDate(time) = yesterday() 
                group by day, user_id, gender, os, age
                format TSVWithNames
                
                """
        feeds_df = get_df(query = query)
        return feeds_df
    
    
    @task
    def merge_df(feeds_df, messages_df):
        # объединим фреймы, заполним пустые значения совпадающих колонок у иксов и удалим игреки  
        main_df = feeds_df.merge( messages_df, on = 'user', how = 'outer')
        main_df['day_x'] = main_df['day_x'].fillna(main_df['day_y'])
        main_df['gender_x'] = main_df['gender_x'].fillna(main_df['gender_y'])
        main_df['age_x'] = main_df['age_x'].fillna(main_df['age_y'])
        main_df['os_x'] = main_df['os_x'].fillna(main_df['os_y'])

        main_df = main_df.drop(columns = ['day_y',
                   'gender_y', 'age_y', 'os_y'])
        main_df = main_df.rename(columns = {'day_x':'event_date', 'gender_x':'gender', 'os_x':'os', 'age_x':'age'})
        return main_df
    
    @task
    def gender_count(main_df):
        # группируем и сразу приводим к целочисленному типу
        gen = main_df.groupby(['event_date', 'gender'])[['likes', 'views', 'messages_sent',
       'users_sent', 'messages_received', 'users_received']].sum().reset_index()
        gen[['gender', 'likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received']] = \
            gen[['gender', 'likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received']].astype(int)
        return gen
    
    @task
    def os_count(main_df):
        # группируем и сразу приводим к целочисленному типу
        os = main_df.groupby(['event_date','os'])[['likes', 'views', 'messages_sent',
       'users_sent', 'messages_received', 'users_received']].sum().reset_index()
        os[[ 'likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received']] = \
            os[[ 'likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received']].astype(int)
        return os
    
    @task
    def age_count(main_df):
        # группируем и сразу приводим к целочисленному типу
        age = main_df.groupby(['event_date', 'age'])[['likes', 'views', 'messages_sent',
       'users_sent', 'messages_received', 'users_received']].sum().reset_index()
        age[['age', 'likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received']] = \
            age[['age', 'likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received']].astype(int)
        return age
    
    @task
    def fin_report(gen, os, age):
        # в итоговом df есть отдельная колонка dimension, так что просто insert во все группировкиb и конкатим в один общий
        fin = pd.DataFrame(columns = ['event_date', 'dimension', 'dimension_value', 'likes', 'views', 'messages_sent',
       'users_sent', 'messages_received', 'users_received'])

        gen.insert(loc=1, column='dimension', value='gender')
        gen = gen.rename(columns = {'gender':'dimension_value'})
        
        os.insert(loc=1, column='dimension', value='os')
        os = os.rename(columns = {'os':'dimension_value'})
        
        age.insert(loc=1, column='dimension', value='age')
        age = age.rename(columns = {'age':'dimension_value'})

        fin = pd.concat([gen, os, age])
        
        return fin 
    
    @task
    def load_to_clickhouse(fin):
        query = """
                CREATE TABLE IF NOT EXISTS test.d_melihov_9
                            (event_date Date, 
                            dimension String,
                            dimension_value String,
                            likes Int64,
                            views Int64,
                            messages_sent Int64,
                            users_sent Int64,
                            messages_received Int64,
                            users_received Int64
                            )
                            ENGINE = MergeTree()
                            order by event_date
                """
        connection = {
                        'host':'https://clickhouse.lab.karpov.courses',
                        'password': '656e2b0c9c',
                        'user':'student-rw',
                        'database': 'test'
                       }
        pandahouse.execute(query=query,connection=connection)
        pandahouse.to_clickhouse(df=fin,table='d_melihov_9', index=False, connection=connection)
        
    
    messages_df = extract_messages()
    feeds_df = extract_feeds()
    main_df = merge_df(feeds_df, messages_df)
    gen = gen_count(main_df)
    os = os_count(main_df)
    age = age_count(main_df)
    fin = fin_report(gen, os, age)
    load_to_clickhose(fin)
    
dag_d_melihov_9 = dag_yesterday_stat()
    

        
        
        
        
    
        
        
    
    