# **Семинар: Создание автоматизированного конвейера переобучения с помощью Apache Airflow**

## **Цель занятия**
Познакомиться с принципами оркестрации ML-конвейеров и научиться использовать Apache Airflow для управления задачами переобучения модели.

## **Задачи семинара**
- Понять архитектуру Airflow: DAG, операторы, сенсоры, scheduler.  
- Создать простой DAG для обучения и валидации модели.  
- Добавить логирование и уведомления.  
- Интегрировать DAG с Git и CI/CD.  

## **План**
1. Знакомство с Airflow и архитектурой DAG.  
2. Создание конвейера переобучения.  
3. Добавление логирования и уведомлений.  
4. Интеграция с внешними системами.  
5. Практическая работа: запуск DAG в локальной среде.


## **1. Знакомство с Apache Airflow**

**Airflow** — инструмент для оркестрации задач, который описывает процесс в виде *ориентированного ациклического графа* (DAG).  
Каждая вершина DAG — отдельная **task**, а рёбра определяют зависимости между ними.

Пример типичного ML-конвейера:

extract_data → preprocess → train_model → evaluate → deploy_model


Каждая задача выполняется независимо, с контролем времени, логированием и возможностью автоматического перезапуска при сбое.

**Airflow** идеально подходит для автоматизации переобучения моделей: можно запускать DAG по расписанию (например, раз в сутки), собирать новые данные и публиковать обновлённую модель.


## **2. Установка и настройка окружения**
Для локального запуска Airflow достаточно выполнить несколько команд.


In [1]:
!pip install apache-airflow

[0mCollecting apache-airflow
  Downloading apache_airflow-3.1.5-py3-none-any.whl.metadata (36 kB)
Collecting apache-airflow-core==3.1.5 (from apache-airflow)
  Downloading apache_airflow_core-3.1.5-py3-none-any.whl.metadata (6.4 kB)
Collecting apache-airflow-task-sdk==1.1.5 (from apache-airflow)
  Downloading apache_airflow_task_sdk-1.1.5-py3-none-any.whl.metadata (3.9 kB)
Collecting a2wsgi>=1.10.8 (from apache-airflow-core==3.1.5->apache-airflow)
  Downloading a2wsgi-1.10.10-py3-none-any.whl.metadata (4.0 kB)
Collecting aiosqlite>=0.20.0 (from apache-airflow-core==3.1.5->apache-airflow)
  Downloading aiosqlite-0.22.0-py3-none-any.whl.metadata (4.3 kB)
Collecting alembic<2.0,>=1.13.1 (from apache-airflow-core==3.1.5->apache-airflow)
  Using cached alembic-1.17.2-py3-none-any.whl.metadata (7.2 kB)
Collecting apache-airflow-providers-common-compat>=1.7.4 (from apache-airflow-core==3.1.5->apache-airflow)
  Downloading apache_airflow_providers_common_compat-1.10.1-py3-none-any.whl.metadat

In [None]:
!export AIRFLOW__WEBSERVER__WEB_SERVER_PORT=8080

In [None]:
!export AIRFLOW__WEBSERVER__WEB_SERVER_HOST=127.0.0.1

In [4]:
!pip install pyngrok

[0mCollecting pyngrok
  Downloading pyngrok-7.5.0-py3-none-any.whl.metadata (8.1 kB)
Downloading pyngrok-7.5.0-py3-none-any.whl (24 kB)
[0mInstalling collected packages: pyngrok
[0mSuccessfully installed pyngrok-7.5.0


In [20]:
from pyngrok import ngrok

# Пробросить порт 8081 (или 8080, если вы используете стандартный порт)
public_url = ngrok.connect(8080).public_url
print("Airflow UI доступен по ссылке:", public_url)

[2m2025-12-22T21:26:52.687939Z[0m [[32m[1minfo     [0m] [1mOpening tunnel named: http-8080-13c92267-a021-419f-9a92-5d483885db35[0m [[0m[1m[34mpyngrok.ngrok[0m][0m [36mloc[0m=[35mngrok.py:333[0m
[2m2025-12-22T21:26:52.770057Z[0m [[32m[1minfo     [0m] [1mt=2025-12-22T22:26:52+0100 lvl=info msg="no configuration paths supplied"[0m [[0m[1m[34mpyngrok.process.ngrok[0m][0m [36mloc[0m=[35mprocess.py:101[0m
[2m2025-12-22T21:26:52.772245Z[0m [[32m[1minfo     [0m] [1mt=2025-12-22T22:26:52+0100 lvl=info msg="using configuration at default config path" path="/Users/alexey.stafeev/Library/Application Support/ngrok/ngrok.yml"[0m [[0m[1m[34mpyngrok.process.ngrok[0m][0m [36mloc[0m=[35mprocess.py:101[0m
[2m2025-12-22T21:26:52.773621Z[0m [[32m[1minfo     [0m] [1mt=2025-12-22T22:26:52+0100 lvl=info msg="open config file" path="/Users/alexey.stafeev/Library/Application Support/ngrok/ngrok.yml" err=nil[0m [[0m[1m[34mpyngrok.process.ngrok[0m][0m [

PyngrokNgrokError: The ngrok process errored on start: authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n.

In [16]:
!airflow standalone

[97mstandalone[0m | Starting Airflow Standalone
[97mstandalone[0m | Password for the admin user has been previously generated in /Users/alexey.stafeev/airflow/simple_auth_manager_passwords.json.generated. Not echoing it here.
[97mstandalone[0m | Checking database is initialized
[2m2025-12-22T21:09:12.959978Z[0m [[32m[1minfo     [0m] [1mMigrating the Airflow database[0m [[0m[1m[34mairflow.utils.db[0m][0m [36mloc[0m=[35mdb.py:1129[0m
[97mstandalone[0m | Database ready
[34mscheduler [0m | ____________       _____________
[34mscheduler [0m | ____    |__( )_________  __/__  /________      __
[34mscheduler [0m | ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
[34mscheduler [0m | ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
[34mscheduler [0m | _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[36mtriggerer [0m | ____________       _____________
[36mtriggerer [0m | ____    |__( )_________  __/__  /________      __
[36mtriggerer [0m | ____  /| |_  /

In [33]:
# Установка Airflow

!pip install apache-airflow


# Инициализация / миграция базы данных метаданных
!airflow db migrate

# В старой версии Airflow команда `airflow db init` использовалась для инициализации базы данных метаданных.
# !airflow db init


# В вашей версии Airflow команда `airflow users` больше недоступна.
# Проще всего создавать администратора через `airflow standalone`
# (см. предыдущую ячейку) или через веб‑интерфейс Airflow.

# В старой версии Airflow команда `airflow users` использовалась для создания пользователей.
# !airflow users create \
#     --username admin \
#     --password admin \
#     --firstname Admin \
#     --lastname User \
#     --role Admin \
#     --email admin@example.com

[0mDB: sqlite:////Users/alexey.stafeev/airflow/airflow.db
Performing upgrade to the metadata database sqlite:////Users/alexey.stafeev/airflow/airflow.db
[2m2025-12-22T22:30:51.056570Z[0m [[32m[1minfo     [0m] [1mContext impl SQLiteImpl.      [0m [[0m[1m[34malembic.runtime.migration[0m][0m [36mloc[0m=[35mmigration.py:211[0m
[2m2025-12-22T22:30:51.056797Z[0m [[32m[1minfo     [0m] [1mWill assume non-transactional DDL.[0m [[0m[1m[34malembic.runtime.migration[0m][0m [36mloc[0m=[35mmigration.py:214[0m
[2m2025-12-22T22:30:51.059482Z[0m [[32m[1minfo     [0m] [1mMigrating the Airflow database[0m [[0m[1m[34mairflow.utils.db[0m][0m [36mloc[0m=[35mdb.py:1129[0m
[2m2025-12-22T22:30:51.065642Z[0m [[32m[1minfo     [0m] [1mContext impl SQLiteImpl.      [0m [[0m[1m[34malembic.runtime.migration[0m][0m [36mloc[0m=[35mmigration.py:211[0m
[2m2025-12-22T22:30:51.065786Z[0m [[32m[1minfo     [0m] [1mWill assume non-transactional DDL.[0m [

In [None]:
# Запуск сервисов (в отдельных терминалах)
# airflow api-server --port 8080

# В старой версии Airflow команда `airflow webserver` использовалась для запуска веб-сервера.
# !airflow webserver --port 8080 --- IGNORE ---

# airflow scheduler

## **3. Создание DAG для переобучения модели**

DAG (Directed Acyclic Graph) — это структура, описывающая зависимости между задачами.  
Каждая задача реализуется через **Operator** (например, `PythonOperator`, `BashOperator`, `EmailOperator`).

Ниже приведён минимальный пример конвейера обучения модели.


In [34]:
# Файл: dags/retrain_pipeline.py

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Сбор данных из источника...")

def train_model():
    print("Обучение модели...")

def evaluate_model():
    print("Оценка качества модели...")

def deploy_model():
    print("Публикация новой версии модели...")


with DAG(
    "ml_retrain_pipeline",
    start_date=datetime(2025, 1, 1),
    schedule="@hourly",
    catchup=False,
    tags=["mlops", "retrain"]
) as dag:

    extract = PythonOperator(task_id="extract_data", python_callable=extract_data)
    train = PythonOperator(task_id="train_model", python_callable=train_model)
    evaluate = PythonOperator(task_id="evaluate_model", python_callable=evaluate_model)
    deploy = PythonOperator(task_id="deploy_model", python_callable=deploy_model)

extract >> train >> evaluate >> deploy


<Task(PythonOperator): deploy_model>

## **4. Добавление логирования и уведомлений**

Airflow позволяет отправлять уведомления о статусе выполнения задач по email, Slack или Telegram.  
Ниже пример добавления задачи уведомления после успешного завершения пайплайна.


In [36]:
# Добавим в тот же DAG задачу уведомления по email

from airflow.providers.smtp.operators.smtp import EmailOperator
import os

MODEL_VERSION = os.getenv("MODEL_VERSION", "v1.0.0")

with dag:
    notify_email = EmailOperator(
        task_id="notify_success",
        to="tatsiana.kublashvili@gmail.com",
        subject="✅ ML Retrain Pipeline Completed",
        html_content=f"Новая модель {MODEL_VERSION} успешно обучена и развернута.",
        conn_id="my_smtp" # Убедитесь, что у вас настроено подключение SMTP с этим ID
    )

deploy >> notify_email


DuplicateTaskIdFound: Task id 'notify_success' has already been added to the Dag

### Полный пример кода DAG с уведомлением по email:

In [61]:
APP_PASSWORD = "dfbp aike uuvt fibh"

In [81]:
import json
import os
from airflow.models import Connection
from airflow import settings

# Настройка подключения через Python API
# Используем порт 465 (SSL) - это наиболее надежный вариант для Gmail

conn_id = "my_smtp"
session = settings.Session()

# 1. Удаляем старое подключение
existing = session.query(Connection).filter(Connection.conn_id == conn_id).first()
if existing:
    session.delete(existing)
    session.commit()
    print(f"Старое подключение {conn_id} удалено.")

# 2. Создаем новое
# Важно: Для этого конкретного хука (SmtpHook) логика инвертирована.
# use_ssl = not disable_ssl (по умолчанию True)
# smtp_starttls = not disable_tls (по умолчанию True)
# Для порта 465 (SSL) нам нужно: use_ssl=True, starttls=False.
# Поэтому ставим disable_tls=True.
extra_params = {"disable_tls": True, "disable_ssl": False}

conn = Connection(
    conn_id=conn_id,
    conn_type="smtp",
    host="smtp.gmail.com",
    port=465,
    login="tatsiana.kublashvili@gmail.com",
    password=f"{APP_PASSWORD}",  # Используйте пароль приложения Airflow
    extra=json.dumps(extra_params)
)

session.add(conn)
session.commit()
session.close()

print(f"Подключение {conn_id} успешно создано (Port 465, SSL=True, TLS=False).")

# --- ВАЖНОЕ ДОПОЛНЕНИЕ ---
# В Airflow 2.3+ и особенно в Standalone режиме, подключения могут кэшироваться или читаться из переменных окружения.
# Чтобы убедиться, что Airflow видит подключение, мы также экспортируем его как переменную окружения.
# Это "железобетонный" способ заставить Airflow увидеть настройки.

conn_uri = conn.get_uri()
print(f"URI подключения (для отладки): {conn_uri}")

# Устанавливаем переменную окружения для текущей сессии ноутбука
os.environ[f"AIRFLOW_CONN_{conn_id.upper()}"] = conn_uri
print(f"Переменная окружения AIRFLOW_CONN_{conn_id.upper()} установлена.")

Старое подключение my_smtp удалено.
Подключение my_smtp успешно создано (Port 465, SSL=True, TLS=False).
URI подключения (для отладки): smtp://tatsiana.kublashvili%40gmail.com:dfbp%20aike%20uuvt%20fibh@smtp.gmail.com:465/?__extra__=%7B%22disable_tls%22%3A+true%2C+%22disable_ssl%22%3A+false%7D
Переменная окружения AIRFLOW_CONN_MY_SMTP установлена.


In [82]:
# Тестирование подключения с новыми параметрами
from airflow.providers.smtp.hooks.smtp import SmtpHook
from airflow.models import Connection
from airflow import settings

# Принудительно обновляем подключение в сессии (на случай кэширования)
conn_id = "my_smtp"
hook = SmtpHook(smtp_conn_id=conn_id)

# Вручную подгружаем подключение, чтобы проверить параметры до попытки соединения
session = settings.Session()
conn = session.query(Connection).filter(Connection.conn_id == conn_id).first()
if conn:
    hook.smtp_connection = conn # Внедряем подключение в хук
    print(f"Hook Configuration:")
    print(f"  Use SSL (Implicit): {hook.use_ssl} (Expected: True)")
    print(f"  Use STARTTLS:       {hook.smtp_starttls} (Expected: False)")
    
    print("\nAttempting connection...")
    try:
        # Пробуем получить соединение (это вызовет login и starttls если нужно)
        smtp_client = hook.get_conn()
        print("✅ Connection successful!")
        
        # Пробуем отправить тестовое письмо (опционально, но полезно)
        # hook.send_email_smtp(to="...", subject="Test", html_content="Test")
        
    except Exception as e:
        print(f"❌ Connection failed: {e}")
        import traceback
        traceback.print_exc()
else:
    print(f"Connection {conn_id} not found in DB!")
session.close()


Hook Configuration:
  Use SSL (Implicit): True (Expected: True)
  Use STARTTLS:       False (Expected: False)

Attempting connection...
✅ Connection successful!


In [83]:
!cp "/Users/alexey.stafeev/Documents/!Documents/Магистратура_МФТИ/Предметы/2_год/Развёртывание_ML_моделей/HomwWorks/HW5/mlops_hw5_modification_of_dag_in_airflow_for_new_ml_model_to_production_Kublashvili_Tatsiana/dags/retrain_pipeline.py" ~/airflow/dags/retrain_pipeline.py

## **5. Интеграция с Git и CI/CD**

Файлы DAG хранятся в папке `/opt/airflow/dags`.  
Можно автоматически обновлять их при пуше в основную ветку с помощью GitHub Actions.

Пример шага CI для синхронизации DAG на сервере Airflow:


In [None]:
# .github/workflows/sync_dags.yml

name: Sync DAGs to Airflow

on:
  push:
    branches: [ main ]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Sync DAGs to Airflow server
        run: rsync -avz dags/ user@airflow-server:/opt/airflow/dags/


## **6. Интерактивная часть**

Попробуйте выполнить следующие задания:

1. Запустите локальный Airflow и создайте свой DAG для переобучения модели.  
2. Добавьте задачу проверки метрик (например, чтобы метрика accuracy не ухудшилась).  
3. Настройте уведомление в Telegram или Slack при успешном завершении.  
4. Просмотрите журнал выполнения в Airflow UI (`http://localhost:8080`).  
5. Настройте расписание запуска (например, `schedule_interval="@hourly"`).  


## **Итоги семинара**

После выполнения всех шагов вы:

- Поняли принципы построения DAG в Airflow.  
- Создали собственный конвейер переобучения.  
- Добавили уведомления и автоматизацию CI/CD.  
- Научились запускать и отслеживать обновления модели по расписанию.  

Теперь вы готовы перейти к продвинутым сценариям — мониторинг задач, параметры DAG и интеграция с MLFlow или Docker.
