Skip to content

Latest commit

 

History

History
291 lines (224 loc) · 15.7 KB

File metadata and controls

291 lines (224 loc) · 15.7 KB

Модификация ETL и витрины. Реализация идемпотентности.

Задачи Результаты
Модифицировать процессы в пайплайне, чтобы они соответствовали новым задачам бизнеса. Обеспечить обратную совместимость

Создать обновленную витрину данных для исследования возвращаемости клиентов

Модифицировать ETL процесс, чтобы поддерживалась идемпотентность

Модифицировал процессы в пайплайне, чтобы они соответствовали новым задачам бизнеса, и обеспечивалась обратная совместимость

Создал обновленную витрину данных для исследования возвращаемости клиентов

Добавил в ETL процесс поддержку идемпотентности

Реализация

Скрипты изменения и создания объектов БД, миграции данных в новую структуру в migrations

Обновленный скрипт с описанием DAG "realization.py" в папке src/dags

Этап 1 - Модифицировать процессы в пайплайне

Так как в инкрементальных данных появилось новое поле status, то добавил его в таблицу staging.user_order_log.

Сделал новое поле status обязательным и заполнил старые записи значением 'shipped'.

Скрипт /migrations/1_add_staging_uol_field_status.sql:

alter table staging.user_order_log add column status varchar(30) default 'shipped' not null;

Добавил в витрину mart.f_sales новое поле refund_flag типа boolean для контроля заполнения

Скрипт /migrations/2_add_mart_f_sales_field_refund_flag.sql:

alter table mart.f_sales add column refund_flag boolean default false not null;

Не буду изменять в коде алгоритм заполнения refunded в слое staging (менять значения на минусы), чтобы сохранить полностью оригинальные данные.

Буду передавать в запросе формирования данных витрины из staging.user_order_log в mart.f_sales суммы в поля quantity и payment_amount для записей со статусом refunded с минусами.

Для чего создал новый скрипт. Чтобы данные можно было обновлять за выборочные дни без удаления и дублирования, выполню в этом же скрипте запрос удаления ранее загруженных данных за эту дату.

Так как данные за конкретную дату грузятся только этим процессом и не агрегируются в витрине, то нарушений не будет.

Для инкрементов и исторических данных скрипт будет немного различаться, так как в первом случае грузятся данные только за одну дату (условная дата запуска - ds), во втором - за произвольные даты до даты ds.

Скрипт для инкрементов /migrations/mart.f_sales_inc.sql

delete from mart.f_sales
where f_sales.date_id in
    (select dc.date_id from mart.d_calendar where mart.d_calendar.date_actual = '{{ds}}');

insert into mart.f_sales (date_id, item_id, customer_id, city_id, quantity, payment_amount, refund_flag)
select dc.date_id, item_id, customer_id, city_id, 
quantity * (case when uol.status = 'refunded' then -1 else 1 end) quantity, 
payment_amount * (case when uol.status = 'refunded' then -1 else 1 end) payment_amount, 
(uol.status = 'refunded') 
from staging.user_order_log uol
left join mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
where uol.date_time::Date = '{{ds}}';

Скрипт для исторических данных /migrations/mart.f_sales_hist.sql
В исторических данных нет строк со статусом refunded, можно не подставлять минусы, поле refund_flag заполнится по умолчанию значением false.

delete from mart.f_sales
where f_sales.date_id in
    (select d_calendar.date_id from mart.d_calendar where mart.d_calendar.date_actual < '{{ds}}');

insert into mart.f_sales (date_id, item_id, customer_id, city_id, quantity, payment_amount)
select dc.date_id, item_id, customer_id, city_id, quantity, payment_amount 
from staging.user_order_log uol
left join mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
where uol.date_time::Date < '{{ds}}';

В файле настройки DAG создал для загрузки исторических данных второй DAG, в котором создал свой набор задач (с префиксом h_).

Изменю вызов в DAG

update_f_sales = PostgresOperator(
        task_id='update_f_sales',
        postgres_conn_id=postgres_conn_id,
        sql="sql/mart.f_sales_inc.sql", # было f_sales.sql
        parameters={"date": {business_dt}})

Меняю работу функций f_upload_data_to_staging, f_upload_data_to_staging_hist в DAG, чтобы можно было обновлять за выборочные дни без удаления и дублирования: выполню запрос удаления ранее загруженных данных за эту дату в таблице user_order_log:

Для инкрементов:

str_del = f"delete FROM {pg_schema}.{pg_table} WHERE date_time::date = '{date}'" 
engine.execute(str_del)

Для исторических данных

str_del = f"delete FROM {pg_schema}.{pg_table} WHERE date_time::date < '{date}'" 
engine.execute(str_del)

В процедуре загрузки исторических данных f_upload_data_to_staging_hist нужно подставить в URL файла вместо increment_id значение report_id, полученное ранее в f_get_report

increment_id = ti.xcom_pull(key='report_id')

Здесь же для диагностики выведу сгруппированные данные, которые гружу из файла в user_order_log

print (df.groupby(['date_time', 'status']).agg({'status': 'count', 'quantity': 'sum', 'payment_amount': 'sum'}))

Этап 2 - Реализовать новую витрину

Создал витрину по требуемой структуре. Сделал внешний ключ по item_id к mart.d_item.

drop table if exists mart.f_customer_retention ;

create table mart.f_customer_retention (
	id serial4 PRIMARY KEY,  
    new_customers_count int4 not null, 
    returning_customers_count int4 not null, 
    refunded_customer_count int4 not null, 
    period_name varchar(20) not null, 
    period_id varchar(20) not null, 
    item_id int4 not null, 
    new_customers_revenue numeric(12,2) not null, 
    returning_customers_revenue numeric(12,2) not null,
    customers_refunded numeric(12,0) not null,

    CONSTRAINT f_customer_retention_item_id_fkey FOREIGN KEY (item_id)
        REFERENCES mart.d_item (item_id) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE NO ACTION);

	CREATE INDEX IF NOT EXISTS f_cr2
    ON mart.f_customer_retention USING btree
    (item_id ASC NULLS LAST)
    TABLESPACE pg_default;

    CREATE INDEX IF NOT EXISTS f_cr3
    ON mart.f_customer_retention USING btree
    (period_id ASC NULLS LAST)
    TABLESPACE pg_default;
	
    CREATE INDEX IF NOT EXISTS f_cr4
    ON mart.f_customer_retention USING btree
    (period_name ASC NULLS LAST)
    TABLESPACE pg_default;

Для заполнения по историческим данным сделал запрос из /migrations/mart.f_customer_retention_hist.sql

Для заполнения по историческим данным сделал запрос из /migrations/mart.f_customer_retention_inc.sql

В оба DAG добавляю соответствующие задачи типа PostgresOperator:

    h_update_f_customer_retention = PostgresOperator(
        task_id='update_f_customer_retention',
        postgres_conn_id=postgres_conn_id,
        sql="sql/mart.f_customer_retention_hist.sql",
        parameters={"date": {business_dt}} )


    update_f_customer_retention = PostgresOperator(
        task_id='update_f_customer_retention',
        postgres_conn_id=postgres_conn_id,
        sql="sql/mart.f_customer_retention_inc.sql",
        parameters={"date": {business_dt}} )

Вношу их в дерево выполнения:

    (
            h_print_info_task 
            >> h_generate_report
            >> h_get_report
            >> h_upload_user_order
            >> [h_update_d_item_table, h_update_d_city_table, h_update_d_customer_table]
            >> h_null_task
            >> [h_update_f_sales, h_update_f_customer_retention]
    )

...

    (
            print_info_task
             >> generate_report
             >> get_report
             >> get_increment
             >> upload_user_order_inc
             >> [update_d_item_table, update_d_city_table, update_d_customer_table]
             >> null_task
             >> [update_f_sales, update_f_customer_retention]
    )

Этап 3 - Поддержика идемпотентности

В процессе этапов 1 и 2 были реализованы процедуры очистки и заполнения таблиц таким образом, чтобы было возможно независимое удаление и восстановление информации за отдельные дни без затрагивания информации за другие дни.

В частности изменили работу функций f_upload_data_to_staging, f_upload_data_to_staging_hist в DAG, чтобы можно было обновлять за выборочные дни без удаления и дублирования: выполняется запрос удаления ранее загруженных данных за эту дату в таблице user_order_log:

Для инкрементов:

str_del = f"delete FROM {pg_schema}.{pg_table} WHERE date_time::date = '{date}'" 
engine.execute(str_del)

Для исторических данных

str_del = f"delete FROM {pg_schema}.{pg_table} WHERE date_time::date < '{date}'" 
engine.execute(str_del)

Для витрины mart.f_customer_retention выполняется удаление данных за соответствующую неделю

delete from mart.f_customer_retention 
where f_customer_retention.period_id =
   (select substr(d_calendar.week_of_year_iso, 1, 8) from mart.d_calendar where d_calendar.date_actual = '{{ds}}' ) ;
...

и их восстановление с использованием новых данных отчетного дня.

    ...
   	from staging.user_order_log uol2 
	join mart.d_calendar on uol2.date_time::date = d_calendar.date_actual 
			and '{{ ds }}' between d_calendar.first_day_of_week and d_calendar.last_day_of_week