Задача: создать DAG для ежедневной загрузки рассчитанных метрик за вчерашний день в базу кликхаус.

1. В feed_actions для каждого пользователя считается число просмотров и лайков контента.
2. В message_actions для каждого пользователя считается, сколько он получает и отсылает сообщений, скольким людям он пишет, сколько людей пишут ему. 
2. Каждая выгрузка должна быть в отдельном таске. Далее объединяем две таблицы в одну.
3. Все метрики считаем в разрезе по полу, возрасту и ос. На каждый срез - свой таск.
4. Финальные данные со всеми метриками записываем в отдельную таблицу в ClickHouse.
5. Каждый день таблица должна дополняться новыми данными. 

Структура финальной таблицы должна быть такая: 
- Дата - event_date 
- Название среза - dimension 
- Значение среза - dimension_value 
- Число просмотров - views 
- Число лайков - likes 
- Число полученных сообщений - messages_received 
- Число отправленных сообщений - messages_sent 
- От скольких пользователей получили сообщения - users_received 
- Скольким пользователям отправили сообщение - users_sent 

Срезы — это os, gender и age /

Сперва все проделаю в ноутбуке для тестирования и проверки. Потом подготовлю файл для airflow

In [1]:
# импортируем библиотеки
from datetime import datetime, timedelta
import pandas as pd
from io import StringIO
import requests
import pandahouse as ph
import numpy as np

# эти библиотеки понадобятся только в файле dag
# from airflow.decorators import dag, task
# from airflow.operators.python import get_current_context

In [3]:
# параметры соединения
connection = {'host': 'http://clickhouse.lab.karpov.courses:8123',
'database':'simulator_20251020',
'user':'student',
'password':'dpo_python_2020'
}

In [7]:
# запрос к feed_actions
query_fa = '''
    SELECT 
       toDate(time) as event_date, 
       user_id,
       gender, 
       age,
       os,
       countIf(action='view') as views,
       countIf(action='like') as likes
    FROM 
        simulator_20251020.feed_actions 
    where 
        toDate(time) = toDate(now()) - 1
    group by
        event_date,
        user_id,
        gender, 
        age,
        os
    order by 
        event_date,
        user_id,
        gender, 
        age,
        os
    '''

fa = ph.read_clickhouse(query=query_fa, connection=connection)
fa.head()

Unnamed: 0,event_date,user_id,gender,age,os,views,likes
0,2025-11-26,208,1,17,Android,30,6
1,2025-11-26,211,1,22,iOS,29,4
2,2025-11-26,267,1,31,iOS,13,3
3,2025-11-26,278,0,20,iOS,18,2
4,2025-11-26,297,1,27,Android,15,5


In [8]:
# запрос к message_actions
query_fm ='''
    SELECT   t.event_date as event_date,
             t.user_id as user_id,
             gender, 
             age,
             os,
             t1.messages_received, t2.messages_sent, t3.users_received, t4.users_sent
    FROM (
          SELECT 
             toDate(a.time) as event_date, 
             user_id,
             gender, 
             age,
             os
          FROM 
              simulator_20251020.message_actions a
          where 
              event_date = toDate(now()) - 1
          group by
              event_date,
              user_id,    
              gender, 
              age,
              os
          ) t
    LEFT JOIN ( -- Число полученных сообщений - messages_received
          SELECT 
              toDate(time) as event_date,
              receiver_id,
              count(receiver_id) as messages_received
          FROM 
              simulator_20251020.message_actions
          GROUP BY  
              event_date,
              receiver_id
          ) t1 on t1.event_date=t.event_date and t1.receiver_id = t.user_id
    LEFT JOIN (-- Число отправленных сообщений - messages_sent
          SELECT 
              toDate(time) as event_date,
              user_id,
              count(user_id) as messages_sent
          FROM 
              simulator_20251020.message_actions 
          GROUP BY  
              event_date,
              user_id
          ) t2 on t2.event_date=t.event_date and t2.user_id = t.user_id    
    LEFT JOIN (-- От скольких пользователей получили сообщения - users_received
          SELECT 
              toDate(time) as event_date,
              receiver_id,
              count(distinct user_id) as users_received
          FROM 
              simulator_20251020.message_actions 
          GROUP BY  
              event_date,
              receiver_id
          ) t3 on t3.event_date=t.event_date and t3.receiver_id = t.user_id       
    LEFT JOIN (-- Скольким пользователям отправили сообщение - users_sent
          SELECT 
              toDate(time) as event_date,
              user_id,
              count(distinct receiver_id) as users_sent
          FROM 
              simulator_20251020.message_actions 
          GROUP BY  
              event_date,
              user_id
          ) t4 on t4.event_date=t.event_date and t4.user_id = t.user_id       
    order by 
        t.event_date,
        t.user_id
    '''
fm = ph.read_clickhouse(query=query_fm, connection=connection)
fm.head()

Unnamed: 0,event_date,user_id,gender,age,os,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,209,0,17,Android,0,9,0,9
1,2025-11-26,268,0,27,iOS,0,4,0,4
2,2025-11-26,274,0,15,Android,1,2,1,1
3,2025-11-26,464,1,19,iOS,1,10,1,9
4,2025-11-26,503,1,43,iOS,0,7,0,2


In [9]:
# объединяю 
df = pd.merge(fa, fm, on=['event_date', 'user_id'], how='outer')
df

Unnamed: 0,event_date,user_id,gender_x,age_x,os_x,views,likes,gender_y,age_y,os_y,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,208,1.0,17.0,Android,30.0,6.0,,,,,,,
1,2025-11-26,211,1.0,22.0,iOS,29.0,4.0,,,,,,,
2,2025-11-26,267,1.0,31.0,iOS,13.0,3.0,,,,,,,
3,2025-11-26,278,0.0,20.0,iOS,18.0,2.0,,,,,,,
4,2025-11-26,297,1.0,27.0,Android,15.0,5.0,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
21361,2025-11-26,133586,,,,,,0.0,43.0,Android,0.0,2.0,0.0,2.0
21362,2025-11-26,133613,,,,,,1.0,48.0,Android,0.0,2.0,0.0,1.0
21363,2025-11-26,133627,,,,,,1.0,15.0,Android,0.0,9.0,0.0,7.0
21364,2025-11-26,133628,,,,,,1.0,39.0,Android,1.0,6.0,1.0,3.0


In [13]:
# создаю отдельные колонки для пола, возвраста и ос и заполняю их
df['gender'] = df.apply(lambda row: row['gender_x'] if not pd.isnull(row['gender_x']) else row['gender_y'], axis=1)
df['age'] = df.apply(lambda row: row['age_x'] if not pd.isnull(row['age_x']) else row['age_y'], axis=1)
df['os'] = df.apply(lambda row: row['os_x'] if not pd.isnull(row['os_x']) else row['os_y'], axis=1)
df.head()

Unnamed: 0,event_date,user_id,gender_x,age_x,os_x,views,likes,gender_y,age_y,os_y,messages_received,messages_sent,users_received,users_sent,gender,age,os
0,2025-11-26,208,1.0,17.0,Android,30.0,6.0,,,,,,,,1.0,17.0,Android
1,2025-11-26,211,1.0,22.0,iOS,29.0,4.0,,,,,,,,1.0,22.0,iOS
2,2025-11-26,267,1.0,31.0,iOS,13.0,3.0,,,,,,,,1.0,31.0,iOS
3,2025-11-26,278,0.0,20.0,iOS,18.0,2.0,,,,,,,,0.0,20.0,iOS
4,2025-11-26,297,1.0,27.0,Android,15.0,5.0,,,,,,,,1.0,27.0,Android


In [14]:
# оставляю только нужные колонки и заменяю NaN на нули
df = df[['event_date', 'user_id', 'gender', 'age', 'os', 'views','likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]
df = df.fillna(0)
m = df.select_dtypes(np.number)
df[m.columns]= m.round().astype('int')
df.head()

Unnamed: 0,event_date,user_id,gender,age,os,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,208,1,17,Android,30,6,0,0,0,0
1,2025-11-26,211,1,22,iOS,29,4,0,0,0,0
2,2025-11-26,267,1,31,iOS,13,3,0,0,0,0
3,2025-11-26,278,0,20,iOS,18,2,0,0,0,0
4,2025-11-26,297,1,27,Android,15,5,0,0,0,0


Cчитаю все эти метрики в разрезе по полу, возрасту и ос.

In [16]:
# по полу
df_gender = df[['event_date', 'gender', 'views','likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]
df_gender = df_gender.groupby(['event_date', 'gender']).sum().reset_index()
df_gender

Unnamed: 0,event_date,gender,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,0,233362,52161,4841,5641,4263,4525
1,2025-11-26,1,291741,65751,7952,7251,6007,5841


In [17]:
# по возрасту
df_age = df[['event_date', 'age', 'views','likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]
df_age = df_age.groupby(['event_date', 'age']).sum().reset_index()
df_age.head()

Unnamed: 0,event_date,age,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,14,7339,1654,241,188,141,77
1,2025-11-26,15,18670,4184,709,524,441,409
2,2025-11-26,16,20787,4548,422,455,320,408
3,2025-11-26,17,25101,5643,350,598,337,553
4,2025-11-26,18,29557,6611,812,684,584,667


In [18]:
# по ос
df_os = df[['event_date', 'os', 'views','likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]
df_os = df_os.groupby(['event_date', 'os']).sum().reset_index()
df_os

Unnamed: 0,event_date,os,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,Android,343791,77141,7751,8225,6322,6613
1,2025-11-26,iOS,181312,40771,5042,4667,3948,3753


Привожу к единому виду

In [20]:
df_gender['dimension'] = 'gender'
df_gender = df_gender.rename(columns={'gender': 'dimension_value'})
df_gender = df_gender[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]
df_gender  

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,gender,0,233362,52161,4841,5641,4263,4525
1,2025-11-26,gender,1,291741,65751,7952,7251,6007,5841


In [21]:
df_age['dimension'] = 'age'
df_age = df_age.rename(columns={'age': 'dimension_value'})
df_age = df_age[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]
df_age.head()

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,age,14,7339,1654,241,188,141,77
1,2025-11-26,age,15,18670,4184,709,524,441,409
2,2025-11-26,age,16,20787,4548,422,455,320,408
3,2025-11-26,age,17,25101,5643,350,598,337,553
4,2025-11-26,age,18,29557,6611,812,684,584,667


In [22]:
df_os['dimension'] = 'os'
df_os = df_os.rename(columns={'os': 'dimension_value'})
df_os = df_os[['event_date', 'dimension', 'dimension_value', 'views', 'likes', 'messages_received', 'messages_sent', 'users_received', 'users_sent']]
df_os            

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,os,Android,343791,77141,7751,8225,6322,6613
1,2025-11-26,os,iOS,181312,40771,5042,4667,3948,3753


In [24]:
# финальная таблица
df_final = pd.concat([df_gender, df_age, df_os], axis=0)
df_final

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-11-26,gender,0,233362,52161,4841,5641,4263,4525
1,2025-11-26,gender,1,291741,65751,7952,7251,6007,5841
0,2025-11-26,age,14,7339,1654,241,188,141,77
1,2025-11-26,age,15,18670,4184,709,524,441,409
2,2025-11-26,age,16,20787,4548,422,455,320,408
...,...,...,...,...,...,...,...,...,...
61,2025-11-26,age,75,20,6,0,0,0,0
62,2025-11-26,age,79,22,6,0,0,0,0
63,2025-11-26,age,80,35,5,0,0,0,0
0,2025-11-26,os,Android,343791,77141,7751,8225,6322,6613
