In [None]:
У нас имеется база данных в кикхаусе, куда ежедневно записываются данные по пользователям ленты новостей.
Лента новостей представляет из себя посты с возможностью их просматривать, лайкать, и также отправлять сообщения
друг другу. Действия по ленте новостей это like и view, действия по месссенджеру заполняются отдельной схемой в БД
и включают просто сообщения, кому и когда. В целом, лента и мессенджер составляют некое приложение, которое мы 
хотим развивать, наполняя новыми пользователями и поддерживая их активность.

In [None]:
Напишите код, для  DAG в airflow, который будет считаться каждый день за вчера. 
1. Параллельно будем обрабатывать две таблицы. В feed_actions для каждого юзера посчитаем число просмотров
и лайков контента. В message_actions для каждого юзера считаем, сколько он получает и отсылает сообщений,
скольким людям он пишет, сколько людей пишут ему. Каждая выгрузка должна быть в отдельном таске.
2. Далее объединяем две таблицы в одну.
3. Для этой таблицы считаем все эти метрики в разрезе по полу, возрасту и ос. 
Делаем три разных таска на каждый срез.
4. И финальные данные со всеми метриками записываем в отдельную таблицу в ClickHouse.
5. Каждый день таблица должна дополняться новыми данными. 
Структура финальной таблицы должна быть такая:
Дата - event_date
Название среза - dimension
Значение среза - dimension_value
Число просмотров - views
Число лайков - likes
Число полученных сообщений - messages_received
Число отправленных сообщений - messages_sent
От скольких пользователей получили сообщения - users_received
Скольким пользователям отправили сообщение - users_sent
Срез — это os, gender и age

In [3]:
import pandahouse as ph
# Строка подключения к кликхаус
connection = {
    'host': 'https://clickhouse.lab.karpov.courses',
    'database':'simulator_20250120',
    'user':'student',
    'password':'dpo_python_2020'}
# Запрос исходной таблицы ленты новостей из БД для понимания структуры таблицы
feed = """
SELECT *
FROM {db}.feed_actions 
WHERE toDate(time) >= toDate('2025-01-03') and toDate(time) < toDate('2025-01-04')"""
# Запрос исходной таблицы мессенджера из БД для понимания структуры таблицы
messenger = """
SELECT *
FROM {db}.message_actions 
WHERE toDate(time) >= toDate('2025-01-03') and toDate(time) < toDate('2025-01-04')"""

In [4]:
# Наша исходная таблица ленты новостей
df_table_feed = ph.read_clickhouse(feed, connection=connection)
df_table_feed.head(5)

Unnamed: 0,user_id,post_id,action,time,gender,age,country,city,os,source,exp_group
0,12827,2004,like,2025-01-03 00:00:00,1,43,Turkey,İzmir,Android,ads,2
1,20565,1942,like,2025-01-03 00:00:00,0,23,Russia,Moscow,iOS,ads,4
2,114053,1915,view,2025-01-03 00:00:00,1,56,Russia,Kirov,Android,organic,3
3,115273,1984,view,2025-01-03 00:00:00,1,21,Russia,Moscow,iOS,organic,3
4,122707,1850,view,2025-01-03 00:00:01,1,25,Russia,Mirnyy,iOS,organic,1


In [5]:
# Наша исходная таблица мессенджера
df_table_messenger = ph.read_clickhouse(messenger, connection=connection)
df_table_messenger.head(5)

Unnamed: 0,user_id,receiver_id,time,source,exp_group,gender,age,country,city,os
0,1405,107098,2025-01-03 00:00:00,ads,0,1,27,Estonia,Tallinn,iOS
1,113395,113523,2025-01-03 00:00:00,organic,1,0,18,Russia,Novosibirsk,Android
2,1405,5791,2025-01-03 00:00:01,ads,0,1,27,Estonia,Tallinn,iOS
3,4554,110714,2025-01-03 00:00:01,ads,2,0,14,Russia,Bratsk,iOS
4,5953,110714,2025-01-03 00:00:01,ads,0,1,33,Russia,Saint Petersburg,iOS


In [None]:
'''Ниже ячейка с кодом это код который будет сохранен в файл для отправки в репозиторий.
Остальные ячейки с кодом показывают работу кадой функции отдельно и могут быть запущены
для просмотра результатов'''

In [None]:
from datetime import datetime, timedelta
import pandas as pd
import pandahouse as ph
from airflow.decorators import dag, task #это декораторы для определения DAG и задач

# Подключение к clickhouse
connection = {'host': 'https://clickhouse.lab.karpov.courses', 
     'database':'simulator_20250120', 'user':'student',
     'password':'dpo_python_2020'}

# Запрос для извлечения данных по ленте новостей: дата, пользователь, лайки, просмотры, возраст, пол, OS.
q_feed = """
    SELECT toDate(time) as event_date, user_id, age, gender, os,
           countIf(action, action = 'view') as views, 
           countIf(action, action = 'like') as likes
    FROM {db}.feed_actions 
    WHERE  event_date = yesterday()
    GROUP BY event_date, user_id, age, gender, os
    """

# Запрос для извлечения данных по сообщениям дата, пользователь, кол-во отпр-х sms и кол-во польз-й, и полученных.
# coalesce позволяет при отсутствии пользователя в одной из таблиц осталять того который есть(т.е НЕ 0)
# и получить один столбик без 0 с уникальными пользователями которые либо отправляли, либо получали
q_message = """
   
    SELECT coalesce(t1.event_date, t2.event_date) as event_date,
               coalesce(t1.user_id, t2.receiver_id) as user_id,
               t1.messages_sent, t1.users_sent,
               t2.messages_received, t2.users_received,
               coalesce(t1.gender, t2.gender) as gender, 
               coalesce(t1.age, t1.age) as age, 
               coalesce(t1.os, t2.os) as os
        FROM (SELECT toDate(time) as event_date, user_id, 
               count(receiver_id) as messages_sent, count(distinct receiver_id) as users_sent,
               gender, age, os
              FROM {db}.message_actions 
              WHERE event_date = yesterday()
              GROUP BY event_date, user_id, gender, age, os) t1
        FULL JOIN (SELECT toDate(time) as event_date, receiver_id, 
                          count(user_id) as messages_received, count(distinct user_id) as users_received,
                          gender, age, os
                   FROM {db}.message_actions 
                   WHERE event_date = yesterday()
                   GROUP BY event_date, receiver_id, gender, age, os) t2
        ON t1.user_id = t2.receiver_id and t1.event_date = t2.event_date
        where user_id != 0  
        """
# Подключение к тестовой схеме 
connection_test = {'host': 'https://clickhouse.lab.karpov.courses',
                      'database':'test',
                      'user':'student-rw', 
                      'password':'656e2b0c9c'
                     }
# Запрос на создание таблицы
query_test = '''CREATE TABLE IF NOT EXISTS test.slicer_table_dotsenko
                    (event_date Date,
                     dimension String,
                     dimension_value String,
                     views Float64,
                     likes Float64,
                     messages_received Float64,
                     messages_sent Float64,
                     users_received Float64,
                     users_sent Float64
                    )
                    ENGINE = MergeTree()
                    ORDER BY event_date
                    '''
# Аргументы DAG
default_args = {
    'owner': 'A.Dotsenko',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=4),
    'start_date': datetime(2025, 2, 22)
}
schedule_interval = '0 11 * * *'
# Определяем даг и его параметры, аргументы, интервал запуска, и нужно ли  нагнать прошлые запуски
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def add_data_table_dotsenko():
    
    @task() # Создаем таблицу 'slicer_table_dotsenko'
    def query_create_table(query_test, connection_test):
        ph.execute(query_test, connection=connection_test)
        
        
    @task() # Достаем данные с ленты
    def extract_feed_action(q_feed, connection):
        df_feed_action = ph.read_clickhouse(q_feed, connection=connection)
        return df_feed_action
    
    @task() # Достаем данные по сообщениям 
    def extract_message_action(q_message, connection):
        df_message_action = ph.read_clickhouse(q_message, connection=connection)
        return df_message_action
    
    @task() # Соединяем две таблицы ленты и смс
    def merge_feed_mess(df_feed_action, df_message_action):
        merge_table = pd.merge(df_feed_action,  df_message_action, on=['user_id', 'event_date', 'gender', 'age', 'os'], how='outer')
        return merge_table.fillna(0)
    
    @task() # Срез по полу
    def gender_slice(merge_table):
        gender_table = merge_table[['event_date', 'gender', 'views', 'likes',
                           'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
                         .groupby(['event_date', 'gender'])\
                         .sum().reset_index().rename(columns={'gender': 'dimension_value'})
        # создаем столбец с типом среза и присваеваем значение
        gender_table.loc[:, 'dimension'] = 'gender'
        gender_table = gender_table[['event_date', 'dimension', 'dimension_value', 'views', 'likes',
            'messages_received', 'messages_sent', 'users_received', 'users_sent']]
        return gender_table
    
    @task() # Срез по OS
    def os_slice(merge_table):
        os_table = merge_table[['event_date', 'os', 'views', 'likes',
                           'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
                         .groupby(['event_date', 'os'])\
                         .sum().reset_index().rename(columns={'os': 'dimension_value'})

        os_table.loc[:, 'dimension'] = 'os'
        os_table = os_table[['event_date', 'dimension', 'dimension_value', 'views', 'likes',
            'messages_received', 'messages_sent', 'users_received', 'users_sent']]
        return os_table
    
    @task() # Срез по возрасту
    def age_slice(merge_table):
        age_table = merge_table[['event_date', 'age', 'views', 'likes',
                           'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
                         .groupby(['event_date', 'age'])\
                         .sum().reset_index().rename(columns={'age': 'dimension_value'})

        age_table.loc[:, 'dimension'] = 'age'
        age_table = age_table[['event_date', 'dimension', 'dimension_value', 'views', 'likes',
            'messages_received', 'messages_sent', 'users_received', 'users_sent']]
        return age_table
    
    @task() # Соединяем три среза в один df
    def concat_(gender_table, os_table, age_table):
        df_concat = pd.concat([gender_table, os_table, age_table])
        df_concat = df_concat.astype({'dimension_value' : 'str'})
        return df_concat
    
   
    
    @task() # Отправка df , в таблицу 'slicer_table_dotsenko'
    def add_df_concat(df_concat, connection_test):
        ph.to_clickhouse(df=df_concat, table='slicer_table_dotsenko', index=False, connection=connection_test)
    
    query_create_table(query_test, connection_test)
    df_feed_action = extract_feed_action(q_feed, connection)
    df_message_action = extract_message_action(q_message, connection)
    merge_table = merge_feed_mess(df_feed_action, df_message_action)
    gender_table = gender_slice(merge_table)
    os_table = os_slice(merge_table)
    age_table = age_slice(merge_table)
    df_concat = concat_(gender_table, os_table, age_table)
    add_df_concat(df_concat, connection_test)
    
add_data_table_dotsenko = add_data_table_dotsenko()

In [1]:
# Импортируем необходимые библиотеки
from datetime import datetime, timedelta
import pandas as pd
import pandahouse as ph

In [2]:
# Подключаемся к кликхаус
connection = {'host': 'https://clickhouse.lab.karpov.courses', 
     'database':'simulator_20250120', 'user':'student',
     'password':'dpo_python_2020'}
# Пишем запрос для данных из ленты по каждому пользователю
q_feed = """
    SELECT toDate(time) as event_date, user_id, age, gender, os,
           countIf(action, action = 'view') as views,
           countIf(action, action = 'like') as likes
    FROM {db}.feed_actions 
    WHERE  event_date = yesterday()
    GROUP BY event_date, user_id, age, gender, os
    """
# Пишем функцию которая будет делать запрос в кликхаус, потом это будет task
def extract_feed_action(q_feed, connection):
    df_feed_action = ph.read_clickhouse(q_feed, connection=connection)
    return df_feed_action

In [3]:
# Посмотрим как работает наша функция и на результирующую таблицу
df_feed_action = extract_feed_action(q_feed, connection)
df_feed_action.head(4)

Unnamed: 0,event_date,user_id,age,gender,os,views,likes
0,2025-03-29,133220,17,0,Android,63,9
1,2025-03-29,107237,17,1,Android,42,14
2,2025-03-29,135541,33,0,iOS,63,15
3,2025-03-29,140161,17,0,Android,53,8


In [4]:
# Напишем запрос для схемы сообщений, который достает количество полученных и отправленных смс и пользователей,
# Подзапрос t1 достает количество отправленных сообщений и скольки пользователям, для каждого пользователя 
# с его gender, age, os
# Подзапрос t2 тоже самое для полученных сообщений. После все соединяем FULL JOIN, чтоб собрать всех пользователей,
# даже если у пользователя нет отправленных или полученных. А что бы не получать 0, если пользователя нет в одном 
# из подзапросов, во всех полях кроме количества сообщений и ползователей, используем  coalesce.
q_message = """   
   SELECT coalesce(t1.event_date, t2.event_date) as event_date,
               coalesce(t1.user_id, t2.receiver_id) as user_id,
               t1.messages_sent, t1.users_sent,
               t2.messages_received, t2.users_received,
               coalesce(t1.gender, t2.gender) as gender, 
               coalesce(t1.age, t1.age) as age, 
               coalesce(t1.os, t2.os) as os
        FROM (SELECT toDate(time) as event_date, user_id, 
               count(receiver_id) as messages_sent, count(distinct receiver_id) as users_sent,
               gender, age, os
              FROM {db}.message_actions 
              WHERE event_date = yesterday()
              GROUP BY event_date, user_id, gender, age, os) t1
        FULL JOIN (SELECT toDate(time) as event_date, receiver_id, 
                          count(user_id) as messages_received, count(distinct user_id) as users_received,
                          gender, age, os
                   FROM {db}.message_actions 
                   WHERE event_date = yesterday()
                   GROUP BY event_date, receiver_id, gender, age, os) t2
        ON t1.user_id = t2.receiver_id and t1.event_date = t2.event_date
        where user_id != 0  
        """
# Делаем функцию которая достает данные по сообщениям (потом будет отдельный task)
def extract_message_action(q_message, connection):
    df_message_action = ph.read_clickhouse(q_message, connection=connection)
    return df_message_action

In [6]:
# Посмотрим на результат работы нашей функции
df_message_action = extract_message_action(q_message, connection)
df_message_action.head(4)

Unnamed: 0,event_date,user_id,messages_sent,users_sent,messages_received,users_received,gender,age,os
0,2025-03-29,7311,7,7,0,0,1,24,iOS
1,2025-03-29,12948,9,2,1,1,1,36,Android
2,2025-03-29,12948,9,2,1,1,1,36,Android
3,2025-03-29,12948,9,2,1,1,1,36,Android


In [8]:
# Функция для соединения наших таблиц ленты и сообщений
def merge_feed_mess(df_feed_action, df_message_action):
    merge_table = pd.merge(df_feed_action,  df_message_action,\
                           on=['user_id', 'event_date', 'gender', 'age', 'os'], how='outer')
    return merge_table.fillna(0)

In [9]:
# Смотрим на результурующую таблицу
merge_table = merge_feed_mess(df_feed_action, df_message_action)
merge_table.head(10)

Unnamed: 0,event_date,user_id,age,gender,os,views,likes,messages_sent,users_sent,messages_received,users_received
0,2025-03-29,133220,17,0,Android,63.0,9.0,0.0,0.0,0.0,0.0
1,2025-03-29,107237,17,1,Android,42.0,14.0,0.0,0.0,0.0,0.0
2,2025-03-29,135541,33,0,iOS,63.0,15.0,0.0,0.0,0.0,0.0
3,2025-03-29,140161,17,0,Android,53.0,8.0,0.0,0.0,0.0,0.0
4,2025-03-29,54776,37,1,Android,17.0,6.0,0.0,0.0,0.0,0.0
5,2025-03-29,114113,19,1,Android,18.0,4.0,0.0,0.0,0.0,0.0
6,2025-03-29,35860,24,0,iOS,7.0,1.0,0.0,0.0,0.0,0.0
7,2025-03-29,11361,23,0,Android,15.0,5.0,0.0,0.0,0.0,0.0
8,2025-03-29,143338,16,1,Android,25.0,3.0,0.0,0.0,0.0,0.0
9,2025-03-29,107983,26,1,Android,11.0,4.0,0.0,0.0,0.0,0.0


In [16]:
# Для каждого среза по заданию напишем отдельную функцию
# Группируем по срезу и дате
def gender_slice(merge_table):
    gender_table = merge_table[['event_date', 'gender', 'views', 'likes',
                       'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
                     .groupby(['event_date', 'gender'])\
                     .sum().reset_index().rename(columns={'gender': 'dimension_value'}) # Суммируем показатели
                                          # Переименовываем столбец
    gender_table.loc[:, 'dimension'] = 'gender'# Создаем новый столбец и заполняем его названием среза
    gender_table = gender_table[['event_date', 'dimension', 'dimension_value', 'views', 'likes',
        'messages_received', 'messages_sent', 'users_received', 'users_sent']]
    return gender_table

In [17]:
# Смотрим на результат среза по gender
gender_table = gender_slice(merge_table)
gender_table

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-03-29,gender,0,336937.0,71216.0,6287.0,33848.0,4390.0,28069.0
1,2025-03-29,gender,1,413110.0,87267.0,6560.0,40265.0,5001.0,33118.0


In [18]:
# Функция среза по OS 
def os_slice(merge_table):
    os_table = merge_table[['event_date', 'os', 'views', 'likes',
                       'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
                     .groupby(['event_date', 'os'])\
                     .sum().reset_index().rename(columns={'os': 'dimension_value'})

    os_table.loc[:, 'dimension'] = 'os'
    os_table = os_table[['event_date', 'dimension', 'dimension_value', 'views', 'likes',
        'messages_received', 'messages_sent', 'users_received', 'users_sent']]
    return os_table


In [19]:
# Смотрим на результат среза по gender
os_table = os_slice(merge_table)
os_table

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-03-29,os,Android,490018.0,103436.0,8228.0,48086.0,6159.0,39760.0
1,2025-03-29,os,iOS,260029.0,55047.0,4619.0,26027.0,3232.0,21427.0


In [21]:
# Функция среза по возрасту
def age_slice(merge_table):
    age_table = merge_table[['event_date', 'age', 'views', 'likes',
                       'messages_received', 'messages_sent', 'users_received', 'users_sent']]\
                     .groupby(['event_date', 'age'])\
                     .sum().reset_index().rename(columns={'age': 'dimension_value'})

    age_table.loc[:, 'dimension'] = 'age'
    age_table = age_table[['event_date', 'dimension', 'dimension_value', 'views', 'likes',
        'messages_received', 'messages_sent', 'users_received', 'users_sent']]
    return age_table

In [22]:
# Смотрим результат работы функции среза по возрасту 
age_table = age_slice(merge_table)
age_table.head(5)

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-03-29,age,14,9362.0,1983.0,548.0,2026.0,202.0,666.0
1,2025-03-29,age,15,25716.0,5486.0,148.0,967.0,128.0,702.0
2,2025-03-29,age,16,29962.0,6078.0,280.0,1746.0,253.0,1453.0
3,2025-03-29,age,17,34067.0,7250.0,411.0,2980.0,384.0,2505.0
4,2025-03-29,age,18,41410.0,8755.0,738.0,3873.0,527.0,3516.0


In [23]:
# Функция которая сканкатенирует все срезы в одну таблицу
def concat_(gender_table, os_table, age_table):
    df_concat = pd.concat([gender_table, os_table, age_table])
    df_concat = df_concat.astype({'dimension_value' : 'str'}) # явно приведем столбец к типу str
    return df_concat

In [24]:
# Результат канкатенации
df_concat = concat_(gender_table, os_table, age_table)
df_concat.head(5)

Unnamed: 0,event_date,dimension,dimension_value,views,likes,messages_received,messages_sent,users_received,users_sent
0,2025-03-29,gender,0,336937.0,71216.0,6287.0,33848.0,4390.0,28069.0
1,2025-03-29,gender,1,413110.0,87267.0,6560.0,40265.0,5001.0,33118.0
0,2025-03-29,os,Android,490018.0,103436.0,8228.0,48086.0,6159.0,39760.0
1,2025-03-29,os,iOS,260029.0,55047.0,4619.0,26027.0,3232.0,21427.0
0,2025-03-29,age,14,9362.0,1983.0,548.0,2026.0,202.0,666.0


In [None]:
# Пишем функцию для которая создаст таблицу в кликхаусе с заданными столбцами
connection_test = {'host': 'https://clickhouse.lab.karpov.courses',
                      'database':'test',
                      'user':'student-rw', 
                      'password':'656e2b0c9c'
                     }
query_test = '''CREATE TABLE IF NOT EXISTS test.slicer_table_dotsenko
                    (event_date Date,
                     dimension String,
                     dimension_value String,
                     views Float64,
                     likes Float64,
                     messages_received Float64,
                     messages_sent Float64,
                     users_received Float64,
                     users_sent Float64
                    )
                    ENGINE = MergeTree()
                    ORDER BY event_date
'''
def query_create_table(query_test, connection_test):
    ph.execute(query_test, connection=connection_test)  # Запрос на создание таблицы

In [None]:
# Функция которая запишет созданную нами таблицу в подготовленную в кликхаусе
def add_df_concat(df_concat, connection_test):
    ph.to_clickhouse(df=df_concat, table='slicer_table_dotsenko', index=False, connection=connection_test)

In [None]:
''' 
Таким образом мы создали все необходимые функции, которые будут обернуты в task внутри DAG.
Сам файл DAG.py будет отправлен в репозиторий airflow, где и будет запускаться ежедневно.
'''