
1. Скачайте файлы booking.csv, client.csv и hotel.csv

2. Создайте новый dag

3. Создайте три оператора для получения данных и загрузите файлы. Передайте датафреймы в оператор трансформации

4. Создайте оператор который будет трансформировать данные:

- Объедините все таблицы в одну

- Приведите даты к одному виду

- Удалите невалидные колонки

- Приведите все валюты к одной

5. Сохраните  в 

6. Запустите dag.

In [None]:
# Код в файле etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd


# Функции для операторов
def fetch_booking(**kwargs):
    ti = kwargs['ti']
    df = pd.read_csv('booking.csv')
    ti.xcom_push(key='booking_df', value=df)


def fetch_client(**kwargs):
    ti = kwargs['ti']
    df = pd.read_csv('client.csv')
    ti.xcom_push(key='client_df', value=df)


def fetch_hotel(**kwargs):
    ti = kwargs['ti']
    df = pd.read_csv('hotel.csv')
    ti.xcom_push(key='hotel_df', value=df)


def transform_data(**kwargs):
    ti = kwargs['ti']
    booking_df = ti.xcom_pull(task_ids='fetch_booking', key='booking_df')
    client_df = ti.xcom_pull(task_ids='fetch_client', key='client_df')
    hotel_df = ti.xcom_pull(task_ids='fetch_hotel', key='hotel_df')

    # Объединение таблиц
    combined_df = booking_df.merge(client_df, on='client_id', how='left')
    combined_df = combined_df.merge(hotel_df, on='hotel_id', how='left')

    # Приведение дат к одному формату
    combined_df['booking_date'] = pd.to_datetime(combined_df['booking_date'], errors='coerce')

    # Удаление невалидных колонок (предполагаем, что 'currency' невалидная)
    combined_df.drop(columns=['currency'], inplace=True)

    # Приведение валют к одной (например, к USD, предполагаем, что все уже в GBP)
    # Для конвертации валют требуется функция конвертации, здесь просто предполагаем, что все значения уже в GBP

    # Сохранение в pandas DataFrame (для примера сохраняем в CSV)
    combined_df.to_csv('transformed_data.csv', index=False)

    # Подтверждение успешного выполнения операции трансформации
    ti.xcom_push(key='transformed_data', value='Data transformed successfully')


# Определение стандартных аргументов DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Определение DAG
dag = DAG(
    'etl_dag',
    default_args=default_args,
    description='An ETL DAG to load, transform, and save booking, client, and hotel data',
    schedule_interval=None,
)

# Определение PythonOperator для чтения файлов
fetch_booking_task = PythonOperator(
    task_id='fetch_booking',
    python_callable=fetch_booking,
    provide_context=True,
    dag=dag,
)

fetch_client_task = PythonOperator(
    task_id='fetch_client',
    python_callable=fetch_client,
    provide_context=True,
    dag=dag,
)

fetch_hotel_task = PythonOperator(
    task_id='fetch_hotel',
    python_callable=fetch_hotel,
    provide_context=True,
    dag=dag,
)

# Определение PythonOperator для трансформации данных
transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True,
    dag=dag,
)

# Определение последовательности выполнения задач
fetch_booking_task >> transform_task
fetch_client_task >> transform_task
fetch_hotel_task >> transform_task