# Построение ETL-пайплайна

Ожидается, что на выходе будет DAG в airflow, который будет считаться каждый день за вчера. 

1. Параллельно будем обрабатывать две таблицы. В feed_actions для каждого юзера посчитаем число просмотров и лайков контента. В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему. Каждая выгрузка должна быть в отдельном таске.
2. Далее объединяем две таблицы в одну.
3. Для этой таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. Делаем три разных таска на каждый срез.
4. И финальную данные со всеми метриками записываем в отдельную таблицу в ClickHouse.

Структура финальной таблицы должна быть такая:
* Дата - event_date
* Пол - gender
* Возраст - age
* Операционная система - os
* Число просмотров - views
* Числой лайков - likes
* Число полученных сообщений - messages_received
* Число отправленных сообщений - messages_sent
* От скольких пользователей получили сообщения - users_received
* Скольким пользователям отправили сообщение - users_sent

Вашу таблицу необходимо загрузить в схему test, ответ на это задание - название Вашей таблицы в схеме test

## 1. Запросы к таблицам feed_actions и message_actions

In [26]:
# Импортируем необходимые библиотеки
import pandas as pd
import pandahouse
from airflow import DAG
from airflow.operators.python import PythonOperator 
from datetime import datetime, timedelta
import requests

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

In [10]:
# Задаем параметры в DAG
default_args = {
    'owner': 'evg.dubrovin',             # Владелец операции
    'depends_on_past': False,            # Зависимость от прошлых запусков
    'retries': 2,                        # Кол-во попыток выполнить DAG
    'retry_delay': timedelta(minutes=5), # Промежуток между перезапусками
    'start_date': datetime(2022, 5, 17)  # Дата начала выполнения DAG
}
# Интервал запуска DAG
schedule_interval = '0 10 * * *'

In [27]:
# Подключаемся к БД
def ch_get_df(query):
    connection_1 = {
                   'host': 'https://clickhouse.lab.karpov.courses',
                   'password': 'dpo_python_2020',
                   'user': 'student',
                   'database': 'simulator_20220420'
                    }
    result = pandahouse.read_clickhouse(query, connection=connection_1)
    return result

In [None]:
# # DAG
# @dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
# def dag_dubrovin():

In [28]:
# В feed_actions для каждого юзера посчитаем число просмотров и лайков контента
def extract_feed():
    query = '''
        SELECT
            toDate(time) AS event_date, 
            user_id AS user, 
            gender, 
            age, 
            os, 
            countIf(action = 'like') AS likes, 
            countIf(action = 'views') AS views
        FROM
            simulator_20220420.feed_actions
        WHERE
            toDate(time) = yesterday()
        GROUP BY
            event_date,  
            user,
            gender, 
            age, 
            os
        ORDER BY
            user
        '''

    feed_df = ch_get_df(query=query)
    return feed_df

In [None]:
# В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему
def extract_messenger():
    # 
    query_1 = '''
        SELECT
            event_date, 
            user, 
            gender, 
            age, 
            os, 
            messages_sent, 
            messages_received, 
            users_sent, 
            users_received
        FROM
        -- Для каждого юзера посчитаем, сколько он отсылает сообщений и скольким людям он пишет
            (
            SELECT
                toDate(time) AS event_date, 
                user_id AS user, 
                gender, 
                age, 
                os, 
                COUNT(reciever_id) as messages_sent,
                uniqExact(reciever_id) as users_sent
            FROM
                simulator_20220420.message_actions
            WHERE
                toDate(time) = yesterday()
            GROUP BY
                event_date,
                user, 
                gender, 
                age, 
                os
            ) AS sent

        FULL JOIN
        -- Для каждого юзера посчитаем, сколько он получает сообщений и сколько людей пишут ему
            (
            SELECT
                toDate(time) AS event_date, 
                reciever_id AS user, 
                gender, 
                age, 
                os, 
                COUNT(user_id) as messages_received,
                uniqExact(user_id) as users_received
            FROM
                simulator_20220420.message_actions
            WHERE
                toDate(time) = yesterday()
            GROUP BY
                event_date,
                user, 
                gender, 
                age, 
                os
            ) AS received

        ON sent.user = received.user
        '''
    messenger_df = ch_get_df(query=query_1)

    return messenger_df

## 2. Объединяем две таблицы в одну.

In [30]:
# Объединяем feed и messenger
def merge_tables(feed_df, messenger_df):
    full_df = feed_df.merge(messenger_df, on=['user', 'event_date', 'gender', 'age', 'os'], how='outer').dropna()
    return full_df

In [31]:
feed_df = extract_feed()

In [32]:
messenger_df = extract_messenger()

In [33]:
full_df = merge_tables(feed_df, messenger_df)

In [34]:
full_df.tail()

Unnamed: 0,event_date,user,gender,age,os,likes,views,messages_sent,messages_received,users_sent,users_received
15333,2022-11-01,132791,1,21,Android,11.0,0.0,2.0,0.0,2.0,0.0
15337,2022-11-01,132812,0,18,Android,5.0,0.0,2.0,0.0,2.0,0.0
15355,2022-11-01,133066,1,19,iOS,4.0,0.0,4.0,1.0,4.0,1.0
15356,2022-11-01,133066,1,19,iOS,4.0,0.0,4.0,1.0,4.0,1.0
15381,2022-11-01,133369,0,18,Android,3.0,0.0,4.0,4.0,4.0,1.0


## 3. Считаем метрики по полу, возрасту и ОС

In [35]:
# Считаем метрики по полу
def metrics_by_gender(full_df):
    metrics = ['event_date','views','likes','messages_received','messages_sent','users_received','users_sent', 'gender']
    metrics_by_gender = full_df[metrics].groupby(['event_date', 'gender']).sum().reset_index()
    return metrics_by_gender

In [36]:
metrics_by_gender = metrics_by_gender(full_df)

In [37]:
metrics_by_gender

Unnamed: 0,event_date,gender,views,likes,messages_received,messages_sent,users_received,users_sent
0,2022-11-01,0,0.0,6431.0,1151.0,3604.0,843.0,3376.0
1,2022-11-01,1,0.0,5297.0,792.0,3523.0,696.0,3159.0


In [38]:
# Считаем метрики по возрасту
def metrics_by_age(full_df):
    metrics = ['event_date','views','likes','messages_received','messages_sent','users_received','users_sent', 'age']
    metrics_by_age = full_df[metrics].groupby(['event_date', 'age']).sum().reset_index()
    return metrics_by_age

In [39]:
metrics_by_age = metrics_by_age(full_df)

In [40]:
metrics_by_age.tail()

Unnamed: 0,event_date,age,views,likes,messages_received,messages_sent,users_received,users_sent
42,2022-11-01,57,0.0,13.0,0.0,9.0,0.0,2.0
43,2022-11-01,58,0.0,18.0,0.0,7.0,0.0,2.0
44,2022-11-01,61,0.0,18.0,0.0,2.0,0.0,2.0
45,2022-11-01,62,0.0,7.0,0.0,1.0,0.0,1.0
46,2022-11-01,63,0.0,20.0,0.0,1.0,0.0,1.0


In [41]:
# Считаем метрики по ОС
def metrics_by_os(full_df):
    metrics = ['event_date','views','likes','messages_received','messages_sent','users_received','users_sent', 'os']
    metrics_by_os = full_df[metrics].groupby(['event_date', 'os']).sum().reset_index()
    return metrics_by_os

In [42]:
metrics_by_os = metrics_by_os(full_df)

In [43]:
metrics_by_os

Unnamed: 0,event_date,os,views,likes,messages_received,messages_sent,users_received,users_sent
0,2022-11-01,Android,0.0,7784.0,1493.0,4726.0,1108.0,4345.0
1,2022-11-01,iOS,0.0,3944.0,450.0,2401.0,431.0,2190.0


## 4. Записываем финальную таблицу в Clickhouse

In [44]:
# Получаем необходимую таблицу
def get_fin_table(full_df):
    metrics = ['event_date','views','likes','messages_received','messages_sent','users_received','users_sent', 'gender', 'age', 'os']
    fin_df = full_df[metrics].groupby(['event_date', 'gender', 'age', 'os']).sum().reset_index()
    
    # Заменим float значения на int
    float_cols = ['views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']
    fin_df[float_cols] = fin_df[float_cols].astype(int)
    return fin_df

In [45]:
fin_df = get_fin_table(full_df)

In [46]:
fin_df

Unnamed: 0,event_date,gender,age,os,views,likes,messages_received,messages_sent,users_received,users_sent
0,2022-11-01,0,15,Android,0,22,14,55,14,30
1,2022-11-01,0,15,iOS,0,30,3,21,3,15
2,2022-11-01,0,16,Android,0,87,7,43,7,35
3,2022-11-01,0,16,iOS,0,487,40,268,35,251
4,2022-11-01,0,17,Android,0,58,1,30,1,29
...,...,...,...,...,...,...,...,...,...,...
129,2022-11-01,1,54,Android,0,0,0,6,0,2
130,2022-11-01,1,55,iOS,0,3,0,11,0,4
131,2022-11-01,1,58,iOS,0,18,0,7,0,2
132,2022-11-01,1,62,Android,0,7,0,1,0,1


In [47]:
fin_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 134 entries, 0 to 133
Data columns (total 10 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   event_date         134 non-null    datetime64[ns]
 1   gender             134 non-null    int64         
 2   age                134 non-null    int64         
 3   os                 134 non-null    object        
 4   views              134 non-null    int64         
 5   likes              134 non-null    int64         
 6   messages_received  134 non-null    int64         
 7   messages_sent      134 non-null    int64         
 8   users_received     134 non-null    int64         
 9   users_sent         134 non-null    int64         
dtypes: datetime64[ns](1), int64(8), object(1)
memory usage: 10.6+ KB


In [53]:
# x.strftime('%Y-%m-%d %H:%M:%S')
# fin_df['event_date'] = fin_df['event_date'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))

In [54]:
fin_df

Unnamed: 0,event_date,gender,age,os,views,likes,messages_received,messages_sent,users_received,users_sent
0,2022-11-01 00:00:00,0,15,Android,0,22,14,55,14,30
1,2022-11-01 00:00:00,0,15,iOS,0,30,3,21,3,15
2,2022-11-01 00:00:00,0,16,Android,0,87,7,43,7,35
3,2022-11-01 00:00:00,0,16,iOS,0,487,40,268,35,251
4,2022-11-01 00:00:00,0,17,Android,0,58,1,30,1,29
...,...,...,...,...,...,...,...,...,...,...
129,2022-11-01 00:00:00,1,54,Android,0,0,0,6,0,2
130,2022-11-01 00:00:00,1,55,iOS,0,3,0,11,0,4
131,2022-11-01 00:00:00,1,58,iOS,0,18,0,7,0,2
132,2022-11-01 00:00:00,1,62,Android,0,7,0,1,0,1


In [57]:
# Загружаем финальную таблицу в схему test
def load_table(fin_df):
    # Подключаемся к БД
    connection_2 = {
        'host': 'https://clickhouse.lab.karpov.courses',
        'password': '656e2b0c9c',
        'user': 'student-rw',
        'database': 'test'
                    }
    # Создаем таблицу
    q = '''
        CREATE TABLE IF NOT EXISTS test.evg_dubrovin_test 
            (
            event_date DATE,
            gender INTEGER,
            age INTEGER, 
            os TEXT,
            views INTEGER,
            likes INTEGER,
            messages_received INTEGER,
            messages_sent INTEGER,
            users_received INTEGER,
            users_sent INTEGER
            )
            ENGINE = MergeTree 
            ORDER BY (event_date);
        '''
    # Отправляем таблицу в базу данных
    pandahouse.execute(connection=connection_2, query=q)
    pandahouse.to_clickhouse(df=fin_df, table='evg_dubrovin_test', index=False, connection=connection_2)

In [58]:
# Выполняем таски 
# В feed_actions для каждого юзера посчитаем число просмотров и лайков контента. 
feed_df = extract_feed()
# В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему
messenger_df = extract_messenger()

# Далее объединяем две таблицы в одну
full_df = merge_tables(feed_df, messenger_df)

# Метрики в разрезе по полу
metrics_by_gender = metrics_by_gender(full_df)
# Метрики в разрезе по возрасту
metrics_by_age = metrics_by_age(full_df)
# Метрики в разрезе по ОС
metrics_by_os = metrics_by_os(full_df)

# Финальная таблица со всеми данными
fin_df = get_fin_table(full_df)

# Загружаем таблицу в базу данных
load_table(fin_df)

In [None]:
# dag_dubrovin = dag_dubrovin()

Fin.