# Description

Notebook to overload Airflow

In [None]:
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

In [None]:
# Параметры доступа и конфигурация
AIRFLOW_HOST = "localhost"
AIRFLOW_PORT = 8080
DAG_ID = "simple-dag-1"
AIRFLOW_USER = "airflow"
# Just a dummy one
AIRFLOW_PASSWORD = "airflow"  # noqa: S105

# Формируем Basic-авторизацию
auth_str = f"{AIRFLOW_USER}:{AIRFLOW_PASSWORD}"
b64_auth_str = base64.b64encode(auth_str.encode()).decode()

headers = {
    "Authorization": f"Basic {b64_auth_str}",
    "Content-Type": "application/json",
    "Accept": "application/json",
}

In [None]:
url = f"http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/dags/{DAG_ID}/dagRuns"

In [None]:
def trigger_dag_run(i: int):
    """
    Функция, запускающая одну итерацию запроса к Airflow.

    Возвращает кортеж (номер, статус_успеха, код_статуса, текст_ответа).
    """
    payload = {}

    response = requests.post(url, headers=headers, json=payload)
    success = response.status_code in (200, 201)

    return i, success, response.status_code, response.text

In [None]:
with ThreadPoolExecutor(max_workers=100) as executor:
    # Запускаем 100 заданий на отправку запросов
    futures = [executor.submit(trigger_dag_run, i) for i in range(1, 101)]

    # По мере готовности результатов обрабатываем их
    for future in as_completed(futures):
        i, success, status_code, resp_text = future.result()
        if success:
            print(f"Запуск #{i} успешно отправлен (Status {status_code}).")
        else:
            print(
                f"Запуск #{i} завершился ошибкой (Status {status_code}). "
                f"Ответ: {resp_text}",
            )