# Построение ETL-пайплайна

## Описание проекта

Необходимо создать DAG в Airflow, который ежедневно будет считать показатели за вчерашний день и записывать их в таблицу.

1. Параллельно должно обрабатываться две таблицы. В feed_actions  для каждого юзера нужно посчитать число просмотров и лайков контента. В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему. Каждая выгрузка должна быть в отдельном таске.

2. Далее небходимо объединить две таблицы в одну.

3. Для полученной таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. Делаем три разных таска на каждый срез.

4. И финальные данные со всеми метриками записываем в отдельную таблицу в ClickHouse.

5. Каждый день таблица должна дополняться новыми данными. 

## Данные

Таблица feed_actions - данные по ленте новостей:
 - user_id - уникальный номер пользователя
 - post_id - уникальный номер поста
 - action - лайк или просмотр
 - time - время события
 - gender - пол пользователя
 - age - возраст пользователя
 - country - страна пользователя
 - city - город пользователя
 - os - операционная система устройства пользователя
 - sourse - источник привлечения пользователя (organic - поисковик, ads - реклама)
 - exp_group - номер экспериментальной группы
 
Таблица message_actions - данные по сервису обмена сообщениями:
 - user_id - уникальный номер пользователя
 - reciever_id - уникальный номер получетеля
 - time - время события
 - source - источник привлечения пользователя (organic - поисковик, ads - реклама)
 - exp_group - номер экспериментальной группы
 - gender - пол пользователя
 - age - возраст пользователя
 - country - страна пользователя
 - city - город пользователя
 - os - операционная система устройства пользователя

## Результат

Структура финальной таблицы:
 - event_date - дата
 - dimension - название среза (os, gender, age)
 - dimension_value - значение среза
 - views - число просмотров
 - likes - число лайков
 - messages_received - число полученных сообщений
 - messages_sent - число отправленных сообщений
 - users_received - от скольких пользователей получили сообщения
 - users_sent - скольким пользователям отправили сообщение

## Скрипт сборки таблицы

In [None]:
# coding=utf-8

from datetime import datetime, timedelta
import pandas as pd
import pandahouse as ph

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context


connection = {
    'host': 'https://clickhouse.lab.karpov.courses',
    'password': 'dpo_python_2020',
    'user': 'student',
    'database': 'simulator_20220820'
}

connection_test = {
    'host': 'https://clickhouse.lab.karpov.courses',
    'password': '656e2b0c9c',
    'user': 'student-rw',
    'database': 'test'
}

default_args = {
    'owner': 'e-varshavskaja-10',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2022, 9, 11), 
}

schedule_interval = '0 08 * * *'

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def dag_evarshavskaya():
    
    metric_list = ['event_date', 'likes', 'views', 'messages_sent', 'messages_received', 'users_sent', 'users_received']

    @task()
    def extract_actions():
        query_1 = '''SELECT
                      toDate(time) as event_date, 
                      gender, 
                      age, 
                      os, 
                      user_id as user, 
                      countIf(action = 'view') as views,
                      countIf(action = 'like') as likes
                   FROM
                      simulator_20220820.feed_actions
                   WHERE 
                      toDate(time) = yesterday()
                   GROUP BY
                      event_date, 
                      gender, 
                      age, 
                      os, 
                      user
                '''
        actions = ph.read_clickhouse(query=query_1, connection=connection)
        return actions
    
    @task()
    def extract_messages():
        query_2 = '''SELECT 
                 if(event_date = '1970-01-01', yesterday(), event_date) as event_date, 
                 gender, 
                 age, 
                 user, 
                 os, 
                 messages_sent, 
                 messages_received, 
                 users_sent, 
                 users_received
              FROM (SELECT 
                        toDate(time) as event_date, 
                        gender, 
                        age, 
                        os, 
                        user_id as user, 
                        count() as messages_sent,
                        uniq(reciever_id) as users_sent
                    FROM 
                        simulator_20220820.message_actions
                    WHERE 
                        toDate(time) = yesterday()
                    GROUP BY 
                        event_date, 
                        gender, 
                        os, 
                        age, 
                        user) AS sent
                FULL OUTER JOIN 
                   (SELECT 
                        toDate(time) as event_date, 
                        gender, 
                        age, 
                        os, 
                        reciever_id as user, 
                        count() as messages_received, 
                        uniq(user_id) as users_received
                    FROM 
                        simulator_20220820.message_actions
                    WHERE 
                        toDate(time) = yesterday()
                    GROUP BY 
                        event_date, 
                        gender, 
                        age, 
                        os, 
                        user) AS receive
                USING user
            '''
        messages = ph.read_clickhouse(query=query_2, connection=connection)
        return messages
    
    @task()
    def join_data(messages, actions):
        data = messages.merge(actions, how = 'outer', 
                              on = ['user', 'event_date','gender', 'age', 'os']).fillna(0).drop_duplicates(subset = 'user')
        return data
    
    @task()
    def transform_gender(data):
        full_metric_list = metric_list + ['gender']
        group_metric_list = ['event_date'] + ['gender']
        gender = data[full_metric_list].groupby(group_metric_list).sum().reset_index()
        gender.insert(1, 'dimension', 'gender')
        gender = gender.rename(columns = {'gender':'dimension_value'})
        return gender
    
    @task()
    def transform_age(data):
        full_metric_list = metric_list + ['age']
        group_metric_list = ['event_date'] + ['age']
        age = data[full_metric_list].groupby(group_metric_list).sum().reset_index()
        age.insert(1, 'dimension', 'age')
        age = age.rename(columns = {'age':'dimension_value'})
        return age
    
    @task()
    def transform_os(data):
        full_metric_list = metric_list + ['os']
        group_metric_list = ['event_date'] + ['os']
        os = data[full_metric_list].groupby(group_metric_list).sum().reset_index()
        os.insert(1, 'dimension', 'os')
        os = os.rename(columns = {'os':'dimension_value'})
        return os
    
    @task()
    def concat_data(gender, age, os):
        final_data = pd.concat([gender, age, os])
        final_data[metric_list[1:]] = final_data[metric_list[1:]].astype('int')
        return final_data
    
    @task()
    def load(final_data):
        query_3 = '''
                CREATE TABLE IF NOT EXISTS test.e_varshavskaja_01
                (   event_date Date,
                    dimension String,
                    dimension_value String,
                    likes UInt64,
                    views UInt64,
                    messages_sent UInt64,
                    messages_received UInt64,
                    users_sent UInt64,
                    users_received UInt64
                ) ENGINE = Log()
                '''
        ph.execute(query=query_3, connection=connection_test)
        ph.to_clickhouse(df=final_data, table='e_varshavskaja_01', index=False, connection=connection_test)
    
    actions = extract_actions()
    messages = extract_messages()
    data = join_data(actions, messages)
    
    gender = transform_gender(data)
    age = transform_age(data)
    os = transform_os(data)
    final_data = concat_data(gender, age, os)
    
    load(final_data)
    
dag_evarshavskaya = dag_evarshavskaya()