# Задача

Создать ETL пайплайн в Airflow для ежедневного расчета нужной таблицы и добавления ее в Clickhouse

# Код

In [None]:
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import pandahouse
import pandas as pd

# Определение параметров по умолчанию для DAG
default_args = {
    'owner': 's.krupnov',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 3, 24)
}

# Параметры подключения к ClickHouse для загрузки данных
connection_load = {
    'host': 'https://clickhouse.lab.karpov.courses',
    'database': 'test',
    'user': 'student-rw',
    'password': '656e2b0c9c'
}

# Параметры подключения к ClickHouse для чтения данных
connection = {
    'host': 'https://clickhouse.lab.karpov.courses/',
    'password': 'dpo_python_2020',
    'user': 'student',
    'database': 'simulator_20230220'
}

# SQL-запросы для извлечения данных из ClickHouse
q_feed = '''
SELECT toDate(time) as event_date, user_id,
sum(action = 'like') as likes,
sum(action = 'view') as views
FROM simulator_20230220.feed_actions
WHERE toDate(time) = today() - 1
GROUP BY event_date, user_id order by user_id
'''

q_message = '''
    SELECT DISTINCT user_id,
                    event_date,
                    messages_sent,
                    users_sent,
                    messages_received,
                    users_received
    FROM (
        SELECT toDate(time) as event_date,
               user_id,
               count(receiver_id) as messages_sent,
               count(DISTINCT receiver_id) as users_sent
        FROM simulator_20230220.message_actions
        WHERE toDate(time) = today
        GROUP BY event_date, user_id
        ORDER BY user_id
    ) t1
    LEFT OUTER JOIN (
        SELECT toDate(time) as event_date,
               receiver_id as user_id,
               count(user_id) as messages_received,
               count(DISTINCT user_id) as users_received
        FROM simulator_20230220.message_actions
        WHERE toDate(time) = today()
        GROUP BY event_date, user_id
        ORDER BY user_id
    ) t2
    ON t1.user_id = t2.user_id
    ORDER BY user_id
'''

q_slices = '''
    SELECT DISTINCT user_id, os, gender, age
    FROM (
        SELECT DISTINCT user_id, os, gender, age
        FROM simulator_20230220.feed_actions
        WHERE toDate(time) = today() - 1
        ORDER BY user_id
        UNION ALL
        SELECT DISTINCT user_id, os, gender, age
        FROM simulator_20230220.message_actions
        WHERE toDate(time) = today()
        ORDER BY user_id
    )
    ORDER BY user_id
'''

# SQL-запрос для создания таблицы в ClickHouse
query_up_load = '''
CREATE TABLE IF NOT EXISTS test.etl_table_s_krupnov (
    event_date Date,
    dimension String,
    dimension_value String,
    likes UInt64,
    views UInt64,
    messages_sent UInt64,
    users_sent UInt64,
    messages_received UInt64,
    users_received UInt64
) ENGINE = MergeTree()
ORDER BY event_date
'''

# Определение DAG с параметрами
@dag(default_args=default_args, schedule_interval='0 11 * * *', catchup=False)
def dag_s_krupnov():
    """
    Этот DAG выполняет процесс ETL для данных из ClickHouse, выполняя следующие шаги:
    1. Извлечение данных из ClickHouse.
    2. Обработка и объединение данных.
    3. Группировка данных по различным срезам.
    4. Создание окончательной таблицы.
    5. Загрузка данных в ClickHouse.
    """

    # Задача для извлечения данных из ClickHouse
    @task
    def extract(query: str) -> pd.DataFrame:
        """
        Извлекает данные из ClickHouse с использованием предоставленного SQL-запроса.

        Parameters:
        - query (str): SQL-запрос для извлечения данных.

        Returns:
        - pd.DataFrame: Извлеченные данные в форме DataFrame.
        """
        df = pandahouse.read_clickhouse(query, connection=connection)
        return df

    # Задача для извлечения срезов данных из ClickHouse
    @task
    def extract_slices(query: str, slice_col: str) -> pd.DataFrame:
        """
        Извлекает срезы данных из ClickHouse с использованием предоставленного SQL-запроса.

        Parameters:
        - query (str): SQL-запрос для извлечения данных.
        - slice_col (str): Имя столбца, по которому нужно выполнить срез.

        Returns:
        - pd.DataFrame: Извлеченные срезы данных в форме DataFrame.
        """
        df = pandahouse.read_clickhouse(query, connection=connection)
        df = df[['user_id', slice_col]].copy()
        return df

    # Задача для объединения двух DataFrame
    @task
    def join_tables(t1: pd.DataFrame, t2: pd.DataFrame) -> pd.DataFrame:
        """
        Объединяет два DataFrame по столбцу 'user_id'.

        Parameters:
        - t1 (pd.DataFrame): Первый DataFrame.
        - t2 (pd.DataFrame): Второй DataFrame.

        Returns:
        - pd.DataFrame: Объединенный DataFrame.
        """
        df_merge = pd.merge(left=t1, right=t2, how='outer', on=['user_id'])
        df_merge = (
            df_merge.sort_values(by=['user_id'])
            .rename({'event_date_x': 'event_date'}, axis=1)
            .reset_index()
            .drop(columns=['index', 'event_date_y'])
        )
        df_merge['event_date'] = df_merge['event_date'].fillna(
            df_merge['event_date'].mode()[0]
        )
        df_merge.fillna(0, inplace=True)
        df_merge = df_merge.astype({
            'likes': 'int',
            'views': 'int',
            'messages_sent': 'int',
            'users_sent': 'int',
            'messages_received': 'int',
            'users_received': 'int'
        })
        return df_merge

    # Задача для группировки данных по срезу
    @task
    def get_group_slice(df_merge, slice_, column):
        """
        Группирует данные по указанному срезу.

        Parameters:
        - df_merge (pd.DataFrame): DataFrame для группировки.
        - slice_ (pd.DataFrame): DataFrame с данными среза.
        - column (str): Имя столбца для группировки.

        Returns:
        - pd.DataFrame: Группированный DataFrame.
        """
        merge_slice_ = pd.concat([df_merge, slice_], join='outer', axis=1).fillna(0).drop(columns='user_id')
        gr_gender = (
            merge_slice_.groupby(['event_date', column])
            .agg({
                'likes': 'sum',
                'views': 'sum',
                'messages_sent': 'sum',
                'users_sent': 'sum',
                'messages_received': 'sum',
                'users_received': 'sum'
            })
            .reset_index()
            .rename({column: 'dimension_value'}, axis=1)
        )
        gr_gender.insert(1, "dimension", column)
        return gr_gender

    # Задача для создания окончательной таблицы
    @task
    def create_final_table(dfs: list) -> pd.DataFrame:
        """
        Создает окончательную таблицу путем объединения списка DataFrame.

        Parameters:
        - dfs (list): Список DataFrame для объединения.

        Returns:
        - pd.DataFrame: Окончательная таблица.
        """
        df_final = pd.concat(dfs, join='outer', axis=0)
        return df_final

    # Задача для загрузки данных в ClickHouse
    @task
    def load(df):
        """
        Загружает данные в ClickHouse.

        Parameters:
        - df (pd.DataFrame): DataFrame для загрузки.

        Returns:
        - None
        """
        df_feed = extract(q_feed)
        df_message = extract(q_message)
        slice_os = extract_slices(a_slices, 'os')
        slice_gender = extract_slices(q_slices, 'gender')
        slice_age = extract_slices(q_slices, 'age')
        df_merge = join_tables(df_feed, df_message)
        df_gr_os = get_group_slice(df_merge, slice_os, 'os')
        df_gr_age = get_group_slice(df_merge, slice_age, 'age')
        df_gr_gender = get_group_slice(df_merge, slice_gender, 'gender')
        dfs = [df_gr_os, df_gr_age, df_gr_gender]
        final_table = create_final_table(dfs)
        load(final_table)

dag_s_krupnov = dag_s_krupnov()
