# Построение ETL-пайплайна

В данном задании мы пишем код для построения нашего отчёта через Airflow и дальше отправляет этот отчёт в ClickHouse.

#### Порядок действий:
1. Параллельно будем обрабатывать две таблицы. В feed_actions для каждого юзера посчитаем число просмотров и лайков контента. В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему. Каждая выгрузка должна быть в отдельном таске.
2. Далее объединяем две таблицы в одну.
3. Для этой таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. Делаем три разных таска на каждый срез.
4. И финальные данные со всеми метриками записываем в отдельную таблицу в ClickHouse.
5. Каждый день таблица должна дополняться новыми данными. 

In [1]:
# coding=utf-8

from datetime import datetime, timedelta
import pandas as pd
from io import StringIO
import requests
import pandahouse
import seaborn as sns

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context


# бд от куда загрузить исходные данные
connection = {
    'host': 'https://clickhouse.lab.karpov.courses',
    'password': 'dpo_python_2020',
    'user': 'student',
    'database': 'simulator_20220320'
}

# бд куда выгрузить данные
connection_upload = {
                'host': 'https://clickhouse.lab.karpov.courses',
                'password': '656e2b0c9c',
                'user': 'student-rw',
                'database': 'test'
}

def request(q):
    return pandahouse.read_clickhouse(q, connection=connection)

# Дефолтные параметры, которые прокидываются в таски
default_args = {
    'owner': 'a.pronin',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2022, 6, 14),
}

# Интервал запуска DAG
schedule_interval = '0 0 * * *'

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup = False)
def dag_etl_pronin():
    
    @task()
    def exatract_feed():
        return request('''SELECT 
                               toDate(time) as event_date, user_id,
                               countIf(action = 'like') as likes,
                               countIf(action = 'view') as views,
                               gender, age, os
                          FROM simulator_20220520.feed_actions 
                          WHERE toDate(time) = today() - 1
                          GROUP BY event_date, user_id, gender, age, os''')
    @task()
    def exatract_message():
        return request('''SELECT event_date, user_id, gender, os, age, messages_sent, users_sent, messages_received, users_received
                            FROM
                            (SELECT  
                            toDate(time) as event_date, user_id, 
                            count(reciever_id) as messages_sent,
                            count(distinct reciever_id) as users_sent,
                            gender, age, os
                            FROM simulator_20220520.message_actions 
                            WHERE toDate(time) = today() - 1
                            GROUP BY event_date, user_id, gender, age, os) t1

                            join

                            (SELECT  
                            toDate(time) as event_date, reciever_id, 
                            count(user_id) as messages_received,
                            count(distinct user_id) as users_received
                            FROM simulator_20220520.message_actions 
                            WHERE toDate(time) = today() - 1
                            GROUP BY event_date, reciever_id) t2

                            ON t1.user_id = t2.reciever_id AND t1.event_date= t2.event_date''')
    
    #От скольких пользователей получили сообщения - users_received
    #Скольким пользователям отправили сообщение - users_sent
    #Число полученных сообщений - messages_received
    #Число отправленных сообщений - messages_sent

    
    @task()
    def merge_request(q_feed, q_message):
        df = q_feed.merge(q_message, on = ['event_date', 'user_id', 'gender','age','os'], how='outer')
        
        df['gender'] = df['gender'].apply(lambda x: 'male' if x == 1 else 'femail')
        
        def value_age(age):
            if age < 18:
                return '0-17'
            elif 17 < age < 25:
                return '18-24'
            elif 24 < age < 35:
                return '25-34'
            elif 34 < age < 45:
                return '35-44'
            else:
                return '45+'
            
        df['age'] = df['age'].apply(value_age)
        
        df = df[['event_date', 'gender', 'os', 'age', 'likes', 'views', 'messages_sent', 'users_sent', 'messages_received', 'users_received']]
        
        return df
            
    @task()
    def metrics_os(df):
        df['metrics'] = 'os: ' + df['os']
        os_df = df.groupby(['event_date', 'metrics'])['likes','views','messages_received','messages_sent','users_received','users_sent'].sum().sort_index().reset_index()
        return os_df
    @task()
    def metrics_age(df):
        df['metrics'] = 'age: ' + df['age']
        age_df = df.groupby(['event_date', 'metrics'])['likes','views','messages_received','messages_sent','users_received','users_sent'].sum().sort_index().reset_index()
        return age_df
    @task()
    def metrics_gender(df):
        df['metrics'] = 'gender: ' + df['gender']
        gender_df = df.groupby(['event_date', 'metrics'])['likes','views','messages_received','messages_sent','users_received','users_sent'].sum().sort_index().reset_index()
        return gender_df
    
    @task()
    def concat_metrics(os_df, age_df, gender_df):
        final_df = pd.concat([gender_df, os_df, age_df])
        return final_df
    
    @task()
    def load(final_df):
        q_final = '''CREATE TABLE IF NOT EXISTS test.pronin_v1
                (   event_date Date,
                    metrics String,
                    likes UInt64,
                    views UInt64,
                    messages_received UInt64,
                    messages_sent UInt64,
                    users_received UInt64,
                    users_sent UInt64
                ) ENGINE = MergeTree'''

        pandahouse.execute(query=q_final, connection=connection_upload)

        pandahouse.to_clickhouse(final_df, 'pronin_v1', connection=connection_upload, index=False)
        
    q_feed = exatract_feed()
    q_message = exatract_message()
    df = merge_request(q_feed, q_message)

    os_df = metrics_os(df)
    age_df = metrics_age(df)
    gender_df = metrics_gender(df)
    
    final_df = concat_metrics(os_df, age_df, gender_df)
    
    load(final_df)

dag_etl_pronin = dag_etl_pronin()
    