# Цель работы

**Цель** - реализовать ETL задачу: на выходе получить DAG в Airflow, который будет каждый день рассчитывать важные показатели за предыдущий день.

**Идея** - загрузить данные с Clickhouse, разбить на интересующие нас срезы и, объединив в одну табличку, выгрузить обратно в Clickhouse.

**Задачи**:

1. Параллельно будем обрабатывать две таблицы. В feed_actions для каждого юзера посчитаем число просмотров и лайков контента. В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему. Каждая выгрузка должна быть в отдельном таске.
2. Далее объединяем две таблицы в одну.
3. Для этой таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. Делаем три разных таска на каждый срез.
4. И финальные данные со всеми метриками записываем в отдельную таблицу в ClickHouse.
5. Каждый день таблица должна дополняться новыми данными.

*Сам скрипт DAGа в конце*

# ETL-задача

## Подготовка к работе

In [1]:
# Установка библиотек

from datetime import datetime, timedelta
import pandas as pd
from io import StringIO
import requests
import pandahouse

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

In [37]:
# Устанавливаем свзять с ClickHouse

connection = {
              'host': 'https://clickhouse.lab.karpov.courses',
              'password': 'dpo_python_2020',
              'user': 'student',
              'database': 'simulator_20230320'
             }
# И для записи нашей таблицы
connection_test = {
                   'host': 'https://clickhouse.lab.karpov.courses',
                   'database':'test',
                   'user':'student-rw', 
                   'password':'656e2b0c9c'
                  }

# Дефолтные параметры, которые используются в ДАГе

default_args = {
                'owner': 'grigorash',
                'depends_on_past': False,
                'retries': 2,
                'retry_delay': timedelta(minutes=5),
                'start_date': datetime(2023, 4, 12),
               }

# Интервал запуска DAG
schedule_interval = '0 23 * * *'

In [38]:
## Извлечение датасетов
# Выгружаем данные из датасета feed_actions
def extract_fa():
    query = """
                SELECT
                        DISTINCT user_id,
                        toDate(time) as event_date,
                        gender,
                        age,
                        os,
                        SUM(action = 'view') as views,
                        SUM(action = 'like') as likes
                FROM simulator_20230320.feed_actions
                WHERE toDate(time) = yesterday() --За вчерашний день--
                GROUP BY user_id, event_date, gender, age, os
            """
    df_fa = pandahouse.read_clickhouse(query, connection=connection)
    
    return df_fa

    
# Выгружаем данные из датасета message_actions
def extract_ma():
    query = """
                SELECT
                        user_id,
                        messages_received,
                        messages_sent,
                        users_received,
                        users_sent
                FROM
                    (SELECT
                           user_id,
                           COUNT() as messages_sent, --Сколько смс отправил--
                           uniqExact(reciever_id) as users_sent --Скольким людям смс отправил--
                     FROM simulator_20230320.message_actions
                     WHERE toDate(time) = yesterday() --За вчерашний день--
                     GROUP BY user_id) u1

                FULL OUTER JOIN 

                    (SELECT
                           reciever_id,
                           COUNT() as messages_received, --Сколько смс получил--
                           uniqExact(user_id) as users_received --Сколько людей смс отправили--
                     FROM simulator_20230320.message_actions
                     WHERE toDate(time) = yesterday() --За вчерашний день--
                     GROUP BY reciever_id) u2

                ON u1.user_id = u2.reciever_id        
            """
    df_ma = pandahouse.read_clickhouse(query, connection=connection)
    return df_ma

In [39]:
# Создание одного датасета
def merge_df(df_fa, df_ma):

    df_merge = df_fa.merge(df_ma, on='user_id', how='left').fillna(0)
    return df_merge

In [40]:
#№ Считаем метрики
common_metrics = ['event_date', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']
# Считаем метрику для гендера
def transfrom_gender(df_merge):
    metric_list = common_metrics + ['gender']
    df_merge_gender = df_merge[metric_list]\
        .groupby(['event_date', 'gender'])\
        .sum()\
        .reset_index()
    df_merge_gender['dimension'] = df_merge_gender.columns[1]
    df_merge_gender.rename(columns = {'gender': 'dimension_value'}, inplace = True )
    return df_merge_gender
    
# Считаем метрику для возраста
def transfrom_age(df_merge):
    metric_list = common_metrics + ['age']
    df_merge_age = df_merge[metric_list]\
        .groupby(['event_date', 'age'])\
        .sum()\
        .reset_index()
    df_merge_age['dimension'] = df_merge_age.columns[1]
    df_merge_age.rename(columns = {'age': 'dimension_value'}, inplace = True )
    return df_merge_age
    
# Считаем метрику для операционной системы
def transfrom_os(df_merge):
    metric_list = common_metrics + ['os']
    df_merge_os = df_merge[metric_list]\
        .groupby(['event_date', 'os'])\
        .sum()\
        .reset_index()
    df_merge_os['dimension'] = df_merge_os.columns[1]
    df_merge_os.rename(columns = {'os': 'dimension_value'}, inplace = True )
    return df_merge_os

In [41]:
# Загружаем все в табличку
def final_df(df_os, df_gender, df_age):
    df_fin = pd.concat([df_os, df_gender, df_age])
    df_fin = df_fin[['event_date','dimension', 'dimension_value', 'views', 'likes', 
                     'messages_received', 'messages_sent', 'users_received', 'users_sent']]
    # Преобразуем float в int
    df_fin = df_fin.astype({'event_date': 'str',
                            'views': 'int64', 
                            'likes': 'int64',
                            'messages_received': 'int64', 
                            'messages_sent': 'int64',
                            'users_received': 'int64',
                            'users_sent': 'int64'})
    return df_fin

In [42]:
# Выгружаем все в Clickhouse (не работает на python - в airflow сработает)
def load(df_fin):
        context = get_current_context()
        ds = context ['ds']
        q = """
            CREATE TABLE IF NOT EXISTS test.e_grigorash (
            event_date String, 
            dimension String, 
            dimension_value String, 
            views Int64, 
            likes Int64, 
            messages_received Int64,
            messages_sent Int64, 
            users_received Int64, 
            users_sent Int64                    
            )
            engine = MergeTree()
            order by event_date
            """
        
        pandahouse.execute(query=q, connection=connection_test)
        pandahouse.to_clickhouse(df=df_fin, table='e_grigorash', connection=connection_test, index=False)

In [43]:
## Все считаем
# Таблицы
df_fa = extract_fa()
df_ma = extract_ma()
df_merge = merge_df(df_fa, df_ma)
# Метрики
df_gender = transfrom_gender(df_merge)
df_age = transfrom_age(df_merge)
df_os = transfrom_os(df_merge)
# Объединяем и выводим
df_fin = final_df(df_os, df_gender, df_age)
#load(df_fin) - когда уже будем загружать в Airflow

## Код для DAG