<img src='img/i.webp'>

`Цель проекта`: создать DAG в Apache Airflow, который будет по расписанию запускать расчет витрины активности клиентов по сумме и количеству их транзакций.


___
## <a id=100> Содержание</a>
- [Исходные таблицы](#1)
- [Сбор данных](#2)
- [Установка Airflow с WSL](#3)
- [DAG реализация](#4)
- [Результат](#5)

___
## <center> <a id=1> Исходные таблицы</a>

Содержимое таблицы `profit_table`:

|Столбец |Описание |
|-|-|
|`id` |уникальный идентификатор клиента |
|`sum_a` - `sum_j` | сумма транзакций по соответствующим продуктам a - j|
|`count_a` - `count_j` |количество транзакций по соответствующим продуктам a - j|
|`date` | дата транзакций|

In [7]:
import pandas as pd

df_profit = pd.read_csv('data/profit_table.csv')
df_profit[:2]

Unnamed: 0,id,sum_a,sum_b,sum_c,sum_d,sum_e,sum_f,sum_g,sum_h,sum_i,...,count_b,count_c,count_d,count_e,count_f,count_g,count_h,count_i,count_j,date
0,C00313K,0.0,8.77,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2023-10-01
1,C11256K,1.28,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2023-10-01


In [2]:
df_profit.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1270564 entries, 0 to 1270563
Data columns (total 22 columns):
 #   Column   Non-Null Count    Dtype  
---  ------   --------------    -----  
 0   id       1270564 non-null  object 
 1   sum_a    1270564 non-null  float64
 2   sum_b    1270564 non-null  float64
 3   sum_c    1270564 non-null  float64
 4   sum_d    1270564 non-null  float64
 5   sum_e    1270564 non-null  float64
 6   sum_f    1270564 non-null  float64
 7   sum_g    1270564 non-null  float64
 8   sum_h    1270564 non-null  float64
 9   sum_i    1270564 non-null  float64
 10  sum_j    1270564 non-null  float64
 11  count_a  1270564 non-null  float64
 12  count_b  1270564 non-null  float64
 13  count_c  1270564 non-null  float64
 14  count_d  1270564 non-null  float64
 15  count_e  1270564 non-null  float64
 16  count_f  1270564 non-null  float64
 17  count_g  1270564 non-null  float64
 18  count_h  1270564 non-null  float64
 19  count_i  1270564 non-null  float64
 20  co

Содержимое таблицы `flags_activity`:

|Столбец |Описание |
|-|-|
|`id` |уникальный идентификатор клиента |
|`flag_a` - `flag_j` | булево значение: 1 - если в предыдущие 3 месяца у клиента были ненулевая сумма и количество транзакций по продукту|

In [3]:
df_activity = pd.read_csv('data/flags_activity.csv')
df_activity[:2]

Unnamed: 0,id,flag_a,flag_b,flag_c,flag_d,flag_e,flag_f,flag_g,flag_h,flag_i,flag_j
0,C00144K,1,0,0,0,0,0,0,0,0,0
1,C00194K,1,0,1,0,0,0,0,1,0,0


In [4]:
df_activity.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 222166 entries, 0 to 222165
Data columns (total 11 columns):
 #   Column  Non-Null Count   Dtype 
---  ------  --------------   ----- 
 0   id      222166 non-null  object
 1   flag_a  222166 non-null  int64 
 2   flag_b  222166 non-null  int64 
 3   flag_c  222166 non-null  int64 
 4   flag_d  222166 non-null  int64 
 5   flag_e  222166 non-null  int64 
 6   flag_f  222166 non-null  int64 
 7   flag_g  222166 non-null  int64 
 8   flag_h  222166 non-null  int64 
 9   flag_i  222166 non-null  int64 
 10  flag_j  222166 non-null  int64 
dtypes: int64(10), object(1)
memory usage: 18.6+ MB


___
## <center> <a id=2>Сбор данных</a>

In [26]:
import pandas as pd
from tqdm import tqdm


def transfrom(profit_table, date):
    """ Собирает таблицу флагов активности по продуктам
        на основании прибыли и количеству совершёных транзакций
        
        :param profit_table: таблица с суммой и кол-вом транзакций
        :param date: дата расчёта флагоа активности
        
        :return df_tmp: pandas-датафрейм флагов за указанную дату
    """
    start_date = pd.to_datetime(date) - pd.DateOffset(months=2)
    end_date = pd.to_datetime(date) + pd.DateOffset(months=1)
    date_list = pd.date_range(
        start=start_date, end=end_date, freq='M'
    ).strftime('%Y-%m-01')
    
    df_tmp = (
        profit_table[profit_table['date'].isin(date_list)]
        .drop('date', axis=1)
        .groupby('id')
        .sum()
    )
    
    product_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
    for product in tqdm(product_list):
        df_tmp[f'flag_{product}'] = (
            df_tmp.apply(
                lambda x: x[f'sum_{product}'] != 0 and x[f'count_{product}'] != 0,
                axis=1
            ).astype(int)
        )
        
    df_tmp = df_tmp.filter(regex='flag').reset_index()
    
    return df_tmp

date = '2024-03-01'
flags_activity = transfrom(df_profit, date)
flags_activity.to_csv(f'data/flags_activity_test.csv', index=False, mode='a')

100%|██████████| 10/10 [00:07<00:00,  1.31it/s]


___
## <center> <a id=3>Установка Airflow с WSL</a>

[Youtube решение](https://www.youtube.com/watch?v=vmGng8uFvk8)

### <center> <a>Установка PIP</a>

- $\boxed{1}$ Установка `PIP`

In [None]:
sudo apt-get update

In [None]:
sudo apt-get install build-essential python3-dev libpq-dev

### <center> <a >Виртуальное окружение</a>


- `Установка` пакета venv

In [None]:
sudo apt-get install python3-venv

- `Создание` окружения

In [None]:
python3 -m venv .venv

- `Активация` окружения

In [None]:
source .venv/bin/activate

### <center> <a >Установка Airflow</a>


- $\boxed{2}$ Установка `Apache Airflow`

In [None]:
pip install apache-airflow

- Назначение новой папки с конфигурацией

In [None]:
export AIRFLOW_HOME=~/airflow

- $\boxed{3}$ Инициализация `Airflow DB`

In [None]:
airflow db init

- В файле `airflow.cfg` меняем путь к DAG-директории

In [None]:
dags_folder = /home/nazarovmichail/airflowAirflow/dags

load_examples = False

- `Сброс` информации BD

In [None]:
airflow db reset

### <center> <a >Создание пользователя webserver Airflow</a>

In [None]:
airflow users create --username admin --firstname nms --lastname nms --email nazarovmichails@gmail.com --role Admin --password admin

### <center> <a >Запуск веб-сервера Airflow</a>

- Запуск `веб-сервера Apache Airflow`:
    - интерфейс для управления DAGами
    - просмотр логов выполнения задач
    - мониторинг прогресса выполнения

In [None]:
airflow webserver --port 8888 

- Просмотр запущенных процессов
- Закрыть подключение

In [None]:
sudo lsof -i tcp:8888

In [None]:
kill -9 <название PID>

### <center> <a >Планировщик</a>

1. Отрываем новый терминал
2. Активируем окружение
3. Задаем путь к папке конфиграций
4. Запуск планировщика

In [None]:
source .venv/bin/activate

In [None]:
export AIRFLOW_HOME=~/airflow

In [None]:
airflow scheduler

___
## <center> <a id=4>DAG реализация</a>

#### <center> <a id=6>Последовательные задачи</a>

In [None]:
import pandas as pd
import time
import datetime
from tqdm import tqdm
from airflow.decorators import dag, task


default_args = {
    'owner': 'Nazarov Michail'
}

@dag(default_args=default_args, dag_id='etl_pandas_dag', schedule="0 0 5 * *", start_date=datetime.datetime(2024, 3, 31), tags=['etl'])
def etl_task():
    @task()
    def extract():
        extract_dir = '/home/nazarovmichail/airflow/extract data/'
        df_profit = pd.read_csv(extract_dir + 'profit_table.csv')
        return df_profit

    @task()
    def transform(df:pd.DataFrame, date:str) -> pd.DataFrame:

        def transfrom_csv(profit_table, date):
            """ Собирает таблицу флагов активности по продуктам
        на основании прибыли и количеству совершёных транзакций
        
        :param profit_table: таблица с суммой и кол-вом транзакций
        :param date: дата расчёта флагоа активности
        
        :return df_tmp: pandas-датафрейм флагов за указанную дату
        """
            start_date = pd.to_datetime(date) - pd.DateOffset(months=2)
            end_date = pd.to_datetime(date) + pd.DateOffset(months=1)
            date_list = pd.date_range(
                start=start_date, end=end_date, freq='M'
            ).strftime('%Y-%m-01')
            
            df_tmp = (
                profit_table[profit_table['date'].isin(date_list)]
                .drop('date', axis=1)
                .groupby('id')
                .sum()
            )
            
            product_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
            for product in tqdm(product_list):
                df_tmp[f'flag_{product}'] = (
                    df_tmp.apply(
                        lambda x: x[f'sum_{product}'] != 0 and x[f'count_{product}'] != 0,
                        axis=1
                    ).astype(int)
                )
                
            df_tmp = df_tmp.filter(regex='flag').reset_index()
            
            return df_tmp
        df_tmp = transfrom_csv(df, date)
        print(df_tmp)
        return df_tmp

    @task()
    def load(df: pd.DataFrame, df_path):
        df.to_csv(df_path, index=False, mode='a')

    date = time.strftime("%Y-%m-%d")
    
    df_extr = extract()
    df_trans = transform(df_extr, date)
    
    extract_dir = '/home/nazarovmichail/airflow/load data/'
    extract_df = 'flags_activity.csv'
    df_path = extract_dir + extract_df
    load(df_trans, df_path)

etl_dag = etl_task()

#### <center> <a id=6>Параллельные задачи</a>


In [None]:
import pandas as pd
import os
import time
import datetime
from airflow.decorators import dag, task, task_group
from df_transform import transfrom_csv


default_args = {
    'owner': 'Nazarov Michail'
}


@dag(default_args=default_args,
     dag_id='etl_pandas_parallel_dag',
     schedule="0 0 5 * *",
     start_date=datetime.datetime(2024, 3, 31),
     tags=['etl'])
def etl_task():

    date = time.strftime("%Y-%m-%d")

    extract_dir = '/home/nazarovmichail/airflow/extract data/'
    extract_filename = 'profit_table.csv'
    extract_filepath = extract_dir + extract_filename

    transform_dir = '/home/nazarovmichail/airflow/transform data/'

    load_dir = '/home/nazarovmichail/airflow/load data/'
    load_filename = 'flags_activity.csv'
    load_filepath = load_dir + load_filename

    @task()
    def extract(extract_filepath):

        df_extracted = pd.read_csv(extract_filepath)
        return df_extracted

    @task()
    def transform(df: pd.DataFrame,
                  date: str,
                  product: str,
                  transform_dir: str) -> pd.DataFrame:

        df_product = transfrom_csv(df, date, product)
        transform_filename = f'{product}_transform_df.csv'
        transform_filepath = transform_dir + transform_filename
        df_product.to_csv(transform_filepath, index=False)

        print(f'Done: {product}_transform_df.csv')

    @task()
    def load(transform_dir, load_filepath):

        df_products_list = list(os.walk(transform_dir))
        for ind, file in enumerate(sorted(df_products_list[0][2])):
            if ind == 0:
                df_load = pd.read_csv(transform_dir + file, index_col='id')
            else:
                df_product = pd.read_csv(transform_dir + file, index_col='id')
                df_product.index = df_load.index
                df_load = pd.concat((df_load, df_product), axis=1)

        df_load.to_csv(load_filepath, mode='a')

    @task_group()
    def group_transform():
        transform(df_extracted, date, 'a', transform_dir)
        transform(df_extracted, date, 'b', transform_dir)
        transform(df_extracted, date, 'c', transform_dir)
        transform(df_extracted, date, 'd', transform_dir)
        transform(df_extracted, date, 'e', transform_dir)
        transform(df_extracted, date, 'f', transform_dir)
        transform(df_extracted, date, 'g', transform_dir)
        transform(df_extracted, date, 'h', transform_dir)
        transform(df_extracted, date, 'i', transform_dir)
        transform(df_extracted, date, 'j', transform_dir)

    df_extracted = extract(extract_filepath)
    group_transform().set_downstream(load(transform_dir, load_filepath))


etl_dag = etl_task()


___
## <center> <a id=5>Результат</a>

- Главная страница Airflow webserver

<img src='img/DAG main.png'>

- Граф для последовательных задач

<img src='img/Sequential DAG graph.png'>


- Граф для параллельных задач


<img src='img/Parallel DAG graph.png'>


- Календарь планировщика

<img src='img/DAG calendar.png'>


___
### [Содержание](#100)