In [None]:
import datetime as dt
from airflow.decorators import dag, task
import pandas as pd
import pandahouse as ph

In [None]:
connection = {'host': 'host',
                      'database':'db',
                      'user':'name', 
                      'password':'password'
                     }
connection1 = {'host': 'host',
                      'database':'db',
                      'user':'name', 
                      'password':'password'
                     }
default_args = {
    'owner': 'name',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': dt.timedelta(minutes=5),
    'start_date': dt.datetime(2022, 7, 30),
}

schedule_interval = '15 23 * * *'

In [None]:
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def lmk_dag():

    @task
    def actions():
        q = """
            SELECT user_id, gender, age, os,
            sum(action='like') as likes,
            sum(action='view') as views
            FROM simulator_20220720.feed_actions
            WHERE DATE(time) = yesterday()
            GROUP BY user_id, gender, age, os
        """
        df = ph.read_clickhouse(q, connection=connection)
        return df
    
    @task
    def messages():
        q = """
            SELECT * FROM 
            (SELECT user_id, COUNT(reciever_id) as messages_sent, COUNT(DISTINCT reciever_id) as users_sent
            FROM simulator_20220720.message_actions
            WHERE DATE(time) = yesterday()
            GROUP BY user_id) q
            FULL OUTER JOIN
            (SELECT reciever_id as user_id, COUNT(user_id) as messages_received, COUNT(DISTINCT user_id) as users_received
            FROM simulator_20220720.message_actions
            WHERE DATE(time) = yesterday()
            GROUP BY reciever_id) q1
            using user_id
        """
        df = ph.read_clickhouse(q, connection=connection)
        return df
    
    @task
    def merged_data(action, message):
        df = action.merge(message, on='user_id', how='outer')
        return df
    
    @task
    def by_age(table):
        df = table.pivot_table(index='age', values=['likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received'], aggfunc='sum').reset_index()
        df['dimension'] = 'age'
        df = df.rename(columns={'age':'dimension_value'})
        return df
        
    @task
    def by_gender(table):
        df = table.pivot_table(index='gender', values=['likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received'], aggfunc='sum').reset_index()
        df['dimension'] = 'gender'
        df = df.rename(columns={'gender':'dimension_value'})
        return df
        
    @task
    def by_os(table):
        df = table.pivot_table(index='os', values=['likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received'], aggfunc='sum').reset_index()
        df['dimension'] = 'os'
        df = df.rename(columns={'os':'dimension_value'})
        return df
    
    @task
    def create_final_table(table_age, table_gender, table_os):
        df = pd.concat([table_age, table_gender, table_os])
        df['event_date'] = dt.date.today() - dt.timedelta(days=1)
        #df['event_date'] = pd.to_datetime(df['event_date'], format='%Y-%d-%m')
        df = df[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent',
       'users_received', 'users_sent']]
        return df
    
    @task
    def load(final_table):
        q1 = """
CREATE TABLE IF NOT EXISTS test.lmk_6_table
(
    event_date Date,
    dimension String,
    dimension_value String,
    views Float64,
    likes Float64,
    messages_received Float64,
    messages_sent Float64,
    users_received Float64,
    users_sent Float64
) ENGINE = Log()
"""
        ph.execute(q1, connection=connection1)
        ph.to_clickhouse(final_table, 'lmk_6_table', index=False, connection=connection1)
        
        
    action = actions()
    message = messages()
    table = merged_data(action, message)
    table_age = by_age(table)
    table_gender = by_gender(table)
    table_os = by_os(table)
    
    final_table = create_final_table(table_age, table_gender, table_os)
    load(final_table)
              
lmk_dag_6 = lmk_dag()