In [None]:
import requests
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook      #Устанавливает соединение с postgres
from airflow.providers.amazon.aws.hooks.s3 import S3Hook                #Устанавливает соединение с S3
from airflow import DAG, models
import datetime as dt
from airflow.operators.python import PythonOperator
import logging  #Логирование
import json #Сериализация и десириализация json в строку для загрузки в S3
from io import StringIO #Говорит «Это не путь, а строка — обращайся с ней как с файлоподобным объектом».

In [None]:
def upload_to_s3(data, bucket='airflare', key='temp/response.json'):  #Функция загружает данные в S3, data-файл для загрузки, bucket - имя бакета, key - путь и название файла
    logging.info('Создание подключения к S3')
    hook = S3Hook(aws_conn_id='minio')    #Установка подключения к S3
    json_data = data         #Сериализует JSON в строку для передачи в S3(так как в S3 нельзя передавать объекты python)
    
    logging.info('Подключение установлено')
    
    if not hook.check_for_bucket(bucket):           #Проверяет есть ли бакет с таким именем
        hook.create_bucket(bucket_name=bucket)
        
    logging.info('Загрузка данных в S3')
    
    hook.load_string(                   #Загружает строку в S3
        string_data=json_data,      #Сериализованные в строку данные
        key=key,                    #Путь до файла в заданном бакете
        bucket_name=bucket,         #Название бакета
        replace=True
    )
    
    logging.info('Загрузка завершена успешно')
    
    return {'bucket': bucket, 'key': key}       #Возвращает путь до файла и название бакета

In [None]:
def load_from_s3(bucket, key, aws_conn_id='minio'):  #Функция выгружает сериальизованный в строку файл из S3
    logging.info('Создание подключения к S3')
    conn = S3Hook(aws_conn_id=aws_conn_id)      #Задает переменную подключения где aws_conn_id это id подключения к S3 в UI airflow
    
    logging.info('Подключение установлено, загрузка данных из S3')
    
    content = conn.read_key(bucket_name=bucket, key=key)    #Читает сериализованный файл в заданном бакете по заданному пути
    
    data = json.loads(content)                  #Десериализует файл
    logging.info('Данные выгружены из S3')
    return data

In [None]:
def extract(**context):
    headers = {'x-access-token': '167317b476e1808633c659a8bbb35b13'}

    url = "https://api.travelpayouts.com/v2/prices/latest"

    querystring = {"currency": "rub",
                   "origin": "IKT",
                   "destination": "HKT",
                   'beginning_of_period': '2025-09-01',
                   "period_type": "year",
                   'one_way': True,
                   "page": "1",
                   "limit": "1000",
                   "show_to_affiliates": "true",
                   "sorting": "price",
                   "trip_class": "0"}

    logging.info('Начало извлечения данных')

    response = requests.get(url, headers=headers, params=querystring)

    if response.status_code != 200:                         #Проверка на отклик API
        logging.error('Ошибка при чтении данных из API')
        raise ValueError('Ошибка')

    
    path = upload_to_s3(json.dumps(response.json()), key='temp/response.json')      #Полученный ответ оборачивает в JSON затем сериализуется в строку и загружается по заданному пути в S3
    
    logging.info('Данные успешно сохранены во временное хранилище')

    context['ti'].xcom_push(key='df_dirty', value=path)                     #При помощи Xcom мы отправляем в другой DAG данные о местоположении данных по API

    logging.info('Данные успешно извлечены и переданы в Xcom')

In [None]:
def transform(**context):                                       #Функция оставляет только нужные нам данные
    path = context['ti'].xcom_pull(key='df_dirty')              #Выгружает из Xcom путь к файлу по ключу key='df_dirty'

    logging.info('Данные приняты из Xcom, начат процесс очистки')

    file = load_from_s3(bucket=path['bucket'], key=path['key'])     #Выгружает файл из S3 и десериализует его

    df = pd.DataFrame(file['data'])
    df = df[['depart_date', 'origin', 'destination', 'trip_class',
             'value', 'gate', 'duration', 'distance', 'number_of_changes']]

    logging.info('Преобразование завершено, начинается передача DF в S3')
    
    content = df.to_json(index=False, orient='records')                     #Преобразует DF в JSON
    
    path = upload_to_s3(content, bucket=path['bucket'], key='temp/df.json')     #Загружает в S3
    
    logging.info('Данные успешно сохранены в S3, передаю их в Xcom')
    
    context['ti'].xcom_push(key='path', value=path)                         #Отправляет путь до сохраненного Файла

    logging.info('данные успешно очищены и записаны в хранилище')

In [None]:
def load(**context):                                        #Функция загружает данные в БД
    path = context['ti'].xcom_pull(key='path')              #Выгружает путь до файла в бакете из Xcom
    logging.info('Данные из Xcom успешно выгружены, начинаю выгружать данные из S3')
    
    content = load_from_s3(bucket=path['bucket'], key=path['key'])      #Выгружает из S3 файл который внутри содержит список словарей
    df = pd.DataFrame(content)
    logging.info('Данные получены из хранилища, начат процесс отправки в БД')

    hook = PostgresHook(postgres_conn_id='postgres_airfare')  #Указывать имя при соединении с БД которое указано в yaml
    engine = hook.get_sqlalchemy_engine()
    
    logging.info(f'Размер DataFrame: {df.shape}') #Выводит в логи размерность DF
    logging.info(f'Типы данных:\n{df.dtypes}')   #Выводит в логи типы данных каждого столбца DF

    logging.info('Связь с БД установлена')
    try:
        df.to_sql('irk_hkt_year_stats', engine, if_exists='append', index=False)      #Отправляет данные в заданную таблицу
    except Exception as e:                                          #Логирует любую ошибку(исключение) и записывает ее в переменную
        logging.error(f'Ошибка при загрузки в БД {e}')
    logging.info('процесс ETL успешно завершен, данные загружены в БД')

True

In [None]:
default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2024, 1, 1),
    'retries': 2,
    'retry_delay': dt.timedelta(minutes=1)
}

with DAG(dag_id='ETL_airflare_data', default_args=default_args, schedule_interval='0 12 * * *', catchup=False, tags=['cost']) as dag:
    task_extract = PythonOperator(
        task_id='extract',
        python_callable=extract
    )

    task_transform = PythonOperator(
        task_id='transform',
        python_callable=transform
    )

    task_load = PythonOperator(
        task_id='load',
        python_callable=load
    )

    task_extract >> task_transform >> task_load